OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
PerProcessMsgsNested = lists:map(OfProcess, Processes),
PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
+ PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
Ts = Timestamp,
N = NodeID,
[ cons([N, <<"processes_count_all">> ], CountAll , Ts)
, cons([N, <<"processes_count_running">> ], CountRunning , Ts)
, cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts)
, cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts)
- | PerProcessMsgsFlattened
+ | PerProcessMsgsAggregates
].
-spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
, cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts)
].
+-spec aggregate_by_path([t()], erlang:timestamp()) ->
+ [t()].
+aggregate_by_path(Msgs, Timestamp) ->
+ Aggregate =
+ fun (?T{path=K, value=V}, ValsByPath) ->
+ dict:update_counter(K, V, ValsByPath)
+ end,
+ ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
+ ValsByPathList = dict:to_list(ValsByPathDict),
+ [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
+
-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
binary().
proc_origin_to_bin({registered_name, Name}) ->
ancestor_to_bin(A) when is_atom(A) ->
atom_to_binary(A, utf8);
ancestor_to_bin(A) when is_pid(A) ->
- pid_to_bin(A).
-
-pid_to_bin(Pid) ->
- PidList = erlang:pid_to_list(Pid),
- PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
- re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
+ <<"PID">>.
-spec mfa_to_bin(mfa()) ->
binary().