feat: prefix Graphite paths with current schema version
[beam_stats.git] / src / beam_stats_msg_graphite.erl
index e8d0632..bb89320 100644 (file)
@@ -16,7 +16,6 @@
     , of_beam_stats/2
     , to_bin/1
     , path_to_bin/1
-    , add_path_prefix/2
     , node_id_to_bin/1
     ]).
 
@@ -53,15 +52,17 @@ of_beam_stats(#beam_stats
 ) ->
     Ts = Timestamp,
     N = NodeID,
-    [ cons([N, <<"io">>               , <<"bytes_in">> ], IOBytesIn      , Ts)
-    , cons([N, <<"io">>               , <<"bytes_out">>], IOBytesOut     , Ts)
-    , cons([N, <<"context_switches">>                  ], ContextSwitches, Ts)
-    , cons([N, <<"reductions">>                        ], Reductions     , Ts)
-    , cons([N, <<"run_queue">>                         ], RunQueue       , Ts)
-    | of_memory(Memory, NodeID, Ts)
-    ]
-    ++ of_ets(ETS, NodeID, Ts)
-    ++ of_processes(Processes, NodeID, Ts).
+    Msgs =
+        [ cons([N, <<"io">>               , <<"bytes_in">> ], IOBytesIn      , Ts)
+        , cons([N, <<"io">>               , <<"bytes_out">>], IOBytesOut     , Ts)
+        , cons([N, <<"context_switches">>                  ], ContextSwitches, Ts)
+        , cons([N, <<"reductions">>                        ], Reductions     , Ts)
+        , cons([N, <<"run_queue">>                         ], RunQueue       , Ts)
+        | of_memory(Memory, NodeID, Ts)
+        ]
+        ++ of_ets(ETS, NodeID, Ts)
+        ++ of_processes(Processes, NodeID, Ts),
+    lists:map(fun path_prefix_schema_version/1, Msgs).
 
 -spec to_bin(t()) ->
     binary().
@@ -78,11 +79,6 @@ to_bin(
     TimestampBin = integer_to_binary(TimestampInt),
     <<PathBin/binary, " ", ValueBin/binary, " ", TimestampBin/binary>>.
 
--spec add_path_prefix(t(), binary()) ->
-    t().
-add_path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
-    T?T{path = [Prefix | Path]}.
-
 -spec path_to_bin([binary()]) ->
     binary().
 path_to_bin(Path) ->
@@ -98,6 +94,21 @@ node_id_to_bin(NodeID) ->
 %% Helpers
 %% ============================================================================
 
+-spec path_prefix_schema_version(t()) ->
+    t().
+path_prefix_schema_version(?T{}=T) ->
+    path_prefix(T, schema_version()).
+
+-spec path_prefix(t(), binary()) ->
+    t().
+path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
+    T?T{path = [Prefix | Path]}.
+
+-spec schema_version() ->
+    binary().
+schema_version() ->
+    <<"beam_stats_v0">>.
+
 -spec bin_join([binary()], binary()) ->
     binary().
 bin_join([]                         , <<_/binary>>  ) -> <<>>;
@@ -166,6 +177,7 @@ of_processes(
     OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
     PerProcessMsgsNested = lists:map(OfProcess, Processes),
     PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
+    PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
     Ts = Timestamp,
     N  = NodeID,
     [ cons([N, <<"processes_count_all">>               ], CountAll              , Ts)
@@ -176,7 +188,7 @@ of_processes(
     , cons([N, <<"processes_count_running">>           ], CountRunning          , Ts)
     , cons([N, <<"processes_count_suspended">>         ], CountSuspended        , Ts)
     , cons([N, <<"processes_count_waiting">>           ], CountWaiting          , Ts)
-    | PerProcessMsgsFlattened
+    | PerProcessMsgsAggregates
     ].
 
 -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
@@ -202,6 +214,17 @@ of_process(
     , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen   , Ts)
     ].
 
+-spec aggregate_by_path([t()], erlang:timestamp()) ->
+    [t()].
+aggregate_by_path(Msgs, Timestamp) ->
+    Aggregate =
+        fun (?T{path=K, value=V}, ValsByPath) ->
+            dict:update_counter(K, V, ValsByPath)
+        end,
+    ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
+    ValsByPathList = dict:to_list(ValsByPathDict),
+    [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
+
 -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
     binary().
 proc_origin_to_bin({registered_name, Name}) ->
@@ -241,12 +264,7 @@ ancestors_to_bin([A | Ancestors]) ->
 ancestor_to_bin(A) when is_atom(A) ->
     atom_to_binary(A, utf8);
 ancestor_to_bin(A) when is_pid(A) ->
-    pid_to_bin(A).
-
-pid_to_bin(Pid) ->
-    PidList = erlang:pid_to_list(Pid),
-    PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
-             re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
+    <<"PID">>.
 
 -spec mfa_to_bin(mfa()) ->
     binary().
This page took 0.033429 seconds and 4 git commands to generate.