feat: aggregate anonymous process data by origin
[beam_stats.git] / src / beam_stats_msg_graphite.erl
index 6883ff1..22990a4 100644 (file)
 
 -export(
     [ of_beam_stats/1
+    , of_beam_stats/2
     , to_bin/1
     , path_to_bin/1
+    , add_path_prefix/2
+    , node_id_to_bin/1
     ]).
 
 -define(T, #?MODULE).
@@ -38,7 +41,6 @@ of_beam_stats(#beam_stats
     { timestamp = Timestamp
     , node_id   = _
     , memory    = Memory
-    % TODO: Handle the rest of data points
     , io_bytes_in      = IOBytesIn
     , io_bytes_out     = IOBytesOut
     , context_switches = ContextSwitches
@@ -76,11 +78,22 @@ to_bin(
     TimestampBin = integer_to_binary(TimestampInt),
     <<PathBin/binary, " ", ValueBin/binary, " ", TimestampBin/binary>>.
 
+-spec add_path_prefix(t(), binary()) ->
+    t().
+add_path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
+    T?T{path = [Prefix | Path]}.
+
 -spec path_to_bin([binary()]) ->
     binary().
 path_to_bin(Path) ->
     bin_join(Path, <<".">>).
 
+-spec node_id_to_bin(node()) ->
+    binary().
+node_id_to_bin(NodeID) ->
+    NodeIDBin = atom_to_binary(NodeID, utf8),
+    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
+
 %% ============================================================================
 %% Helpers
 %% ============================================================================
@@ -108,7 +121,7 @@ of_memory(Memory, <<NodeID/binary>>, Timestamp) ->
         end,
     lists:map(ComponentToMessage, Memory).
 
--spec of_ets(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
+-spec of_ets(beam_stats_ets:t(), binary(), erlang:timestamp()) ->
     [t()].
 of_ets(PerTableStats, <<NodeID/binary>>, Timestamp) ->
     OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end,
@@ -153,6 +166,7 @@ of_processes(
     OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
     PerProcessMsgsNested = lists:map(OfProcess, Processes),
     PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
+    PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
     Ts = Timestamp,
     N  = NodeID,
     [ cons([N, <<"processes_count_all">>               ], CountAll              , Ts)
@@ -163,14 +177,14 @@ of_processes(
     , cons([N, <<"processes_count_running">>           ], CountRunning          , Ts)
     , cons([N, <<"processes_count_suspended">>         ], CountSuspended        , Ts)
     , cons([N, <<"processes_count_waiting">>           ], CountWaiting          , Ts)
-    | PerProcessMsgsFlattened
+    | PerProcessMsgsAggregates
     ].
 
 -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
     [t()].
 of_process(
     #beam_stats_process
-    { pid               = Pid
+    { pid               = _
     , memory            = Memory
     , total_heap_size   = TotalHeapSize
     , stack_size        = StackSize
@@ -181,20 +195,30 @@ of_process(
 ) ->
     Origin = beam_stats_process:get_best_known_origin(Process),
     OriginBin = proc_origin_to_bin(Origin),
-    PidBin = pid_to_bin(Pid),
-    OriginAndPid = [OriginBin, PidBin],
     Ts = Timestamp,
     N  = NodeID,
-    [ cons([N, <<"process_memory">>            | OriginAndPid], Memory        , Ts)
-    , cons([N, <<"process_total_heap_size">>   | OriginAndPid], TotalHeapSize , Ts)
-    , cons([N, <<"process_stack_size">>        | OriginAndPid], StackSize     , Ts)
-    , cons([N, <<"process_message_queue_len">> | OriginAndPid], MsgQueueLen   , Ts)
+    [ cons([N, <<"process_memory">>            , OriginBin], Memory        , Ts)
+    , cons([N, <<"process_total_heap_size">>   , OriginBin], TotalHeapSize , Ts)
+    , cons([N, <<"process_stack_size">>        , OriginBin], StackSize     , Ts)
+    , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen   , Ts)
     ].
 
+-spec aggregate_by_path([t()], erlang:timestamp()) ->
+    [t()].
+aggregate_by_path(Msgs, Timestamp) ->
+    Aggregate =
+        fun (?T{path=K, value=V}, ValsByPath) ->
+            dict:update_counter(K, V, ValsByPath)
+        end,
+    ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
+    ValsByPathList = dict:to_list(ValsByPathDict),
+    [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
+
 -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
     binary().
 proc_origin_to_bin({registered_name, Name}) ->
-    atom_to_binary(Name, utf8);
+    NameBin = atom_to_binary(Name, utf8),
+    <<"named--", NameBin/binary>>;
 proc_origin_to_bin({ancestry, Ancestry}) ->
     #beam_stats_process_ancestry
     { raw_initial_call  = InitCallRaw
@@ -207,7 +231,8 @@ proc_origin_to_bin({ancestry, Ancestry}) ->
     AncestorsBinOpt   = hope_option:map(AncestorsOpt     , fun ancestors_to_bin/1),
     AncestorsBin      = hope_option:get(AncestorsBinOpt  , Blank),
     InitCallRawBin    = mfa_to_bin(InitCallRaw),
-    << InitCallRawBin/binary
+    << "spawned-via--"
+     , InitCallRawBin/binary
      , "--"
      , InitCallOTPBin/binary
      , "--"
@@ -251,9 +276,3 @@ cons(Path, Value, Timestamp) ->
     , value     = Value
     , timestamp = Timestamp
     }.
-
--spec node_id_to_bin(node()) ->
-    binary().
-node_id_to_bin(NodeID) ->
-    NodeIDBin = atom_to_binary(NodeID, utf8),
-    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
This page took 0.026603 seconds and 4 git commands to generate.