feat: aggregate anonymous process data by origin
[beam_stats.git] / src / beam_stats_msg_graphite.erl
index e8d0632..22990a4 100644 (file)
@@ -166,6 +166,7 @@ of_processes(
     OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
     PerProcessMsgsNested = lists:map(OfProcess, Processes),
     PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
+    PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
     Ts = Timestamp,
     N  = NodeID,
     [ cons([N, <<"processes_count_all">>               ], CountAll              , Ts)
@@ -176,7 +177,7 @@ of_processes(
     , cons([N, <<"processes_count_running">>           ], CountRunning          , Ts)
     , cons([N, <<"processes_count_suspended">>         ], CountSuspended        , Ts)
     , cons([N, <<"processes_count_waiting">>           ], CountWaiting          , Ts)
-    | PerProcessMsgsFlattened
+    | PerProcessMsgsAggregates
     ].
 
 -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
@@ -202,6 +203,17 @@ of_process(
     , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen   , Ts)
     ].
 
+-spec aggregate_by_path([t()], erlang:timestamp()) ->
+    [t()].
+aggregate_by_path(Msgs, Timestamp) ->
+    Aggregate =
+        fun (?T{path=K, value=V}, ValsByPath) ->
+            dict:update_counter(K, V, ValsByPath)
+        end,
+    ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
+    ValsByPathList = dict:to_list(ValsByPathDict),
+    [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
+
 -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
     binary().
 proc_origin_to_bin({registered_name, Name}) ->
This page took 0.018078 seconds and 4 git commands to generate.