Improve spec of list interleave.
[beam_stats.git] / src / beam_stats_msg_graphite.erl
index de18c0d..5b8bd43 100644 (file)
@@ -14,9 +14,8 @@
 -export(
     [ of_beam_stats/1
     , of_beam_stats/2
-    , to_bin/1
-    , path_to_bin/1
-    , add_path_prefix/2
+    , to_iolist/1
+    , path_to_iolist/1
     , node_id_to_bin/1
     ]).
 
@@ -53,40 +52,37 @@ of_beam_stats(#beam_stats
 ) ->
     Ts = Timestamp,
     N = NodeID,
-    [ cons([N, <<"io">>               , <<"bytes_in">> ], IOBytesIn      , Ts)
-    , cons([N, <<"io">>               , <<"bytes_out">>], IOBytesOut     , Ts)
-    , cons([N, <<"context_switches">>                  ], ContextSwitches, Ts)
-    , cons([N, <<"reductions">>                        ], Reductions     , Ts)
-    , cons([N, <<"run_queue">>                         ], RunQueue       , Ts)
-    | of_memory(Memory, NodeID, Ts)
-    ]
-    ++ of_ets(ETS, NodeID, Ts)
-    ++ of_processes(Processes, NodeID, Ts).
+    Msgs =
+        [ cons([N, <<"io">>               , <<"bytes_in">> ], IOBytesIn      , Ts)
+        , cons([N, <<"io">>               , <<"bytes_out">>], IOBytesOut     , Ts)
+        , cons([N, <<"context_switches">>                  ], ContextSwitches, Ts)
+        , cons([N, <<"reductions">>                        ], Reductions     , Ts)
+        , cons([N, <<"run_queue">>                         ], RunQueue       , Ts)
+        | of_memory(Memory, NodeID, Ts)
+        ]
+        ++ of_ets(ETS, NodeID, Ts)
+        ++ of_processes(Processes, NodeID, Ts),
+    lists:map(fun path_prefix_schema_version/1, Msgs).
 
--spec to_bin(t()) ->
-    binary().
-to_bin(
+-spec to_iolist(t()) ->
+    iolist().
+to_iolist(
     ?T
     { path      = Path
     , value     = Value
     , timestamp = Timestamp
     }
 ) ->
-    PathBin = path_to_bin(Path),
+    PathIOList = path_to_iolist(Path),
     ValueBin = integer_to_binary(Value),
     TimestampInt = timestamp_to_integer(Timestamp),
     TimestampBin = integer_to_binary(TimestampInt),
-    <<PathBin/binary, " ", ValueBin/binary, " ", TimestampBin/binary>>.
-
--spec add_path_prefix(t(), binary()) ->
-    t().
-add_path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
-    T?T{path = [Prefix | Path]}.
+    [PathIOList, <<" ">>, ValueBin, <<" ">>, TimestampBin].
 
--spec path_to_bin([binary()]) ->
-    binary().
-path_to_bin(Path) ->
-    bin_join(Path, <<".">>).
+-spec path_to_iolist([binary()]) ->
+    iolist().
+path_to_iolist(Path) ->
+    interleave(Path, <<".">>).
 
 -spec node_id_to_bin(node()) ->
     binary().
@@ -98,13 +94,27 @@ node_id_to_bin(NodeID) ->
 %% Helpers
 %% ============================================================================
 
--spec bin_join([binary()], binary()) ->
+-spec path_prefix_schema_version(t()) ->
+    t().
+path_prefix_schema_version(?T{}=T) ->
+    path_prefix(T, schema_version()).
+
+-spec path_prefix(t(), binary()) ->
+    t().
+path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
+    T?T{path = [Prefix | Path]}.
+
+-spec schema_version() ->
     binary().
-bin_join([]                         , <<_/binary>>  ) -> <<>>;
-bin_join([<<B/binary>> | []]        , <<_/binary>>  ) -> B;
-bin_join([<<B/binary>> | [_|_]=Bins], <<Sep/binary>>) ->
-    BinsBin = bin_join(Bins, Sep),
-    <<B/binary, Sep/binary, BinsBin/binary>>.
+schema_version() ->
+    <<"beam_stats_v0">>.
+
+-spec interleave([A], A) ->
+    [A].
+interleave([], _) -> [];
+interleave([X], _) -> [X];
+interleave([X|Xs], Sep) ->
+    [X, Sep | interleave(Xs, Sep)].
 
 -spec timestamp_to_integer(erlang:timestamp()) ->
     non_neg_integer().
@@ -125,8 +135,9 @@ of_memory(Memory, <<NodeID/binary>>, Timestamp) ->
     [t()].
 of_ets(PerTableStats, <<NodeID/binary>>, Timestamp) ->
     OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end,
-    NestedMsgs = lists:map(OfEtsTable, PerTableStats),
-    lists:append(NestedMsgs).
+    MsgsNested = lists:map(OfEtsTable, PerTableStats),
+    MsgsFlattened = lists:append(MsgsNested),
+    aggregate_by_path(MsgsFlattened, Timestamp).
 
 -spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
     [t()].
@@ -139,9 +150,13 @@ of_ets_table(#beam_stats_ets_table
     <<NodeID/binary>>,
     Timestamp
 ) ->
-    IDBin     = beam_stats_ets_table:id_to_bin(ID),
+    IDType =
+        case ID =:= Name
+        of  true  -> <<"NAMED">>
+        ;   false -> <<"TID">>
+        end,
     NameBin   = atom_to_binary(Name, latin1),
-    NameAndID = [NameBin, IDBin],
+    NameAndID = [NameBin, IDType],
     [ cons([NodeID, <<"ets_table">>, <<"size">>   | NameAndID], Size  , Timestamp)
     , cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp)
     ].
@@ -166,6 +181,7 @@ of_processes(
     OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
     PerProcessMsgsNested = lists:map(OfProcess, Processes),
     PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
+    PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
     Ts = Timestamp,
     N  = NodeID,
     [ cons([N, <<"processes_count_all">>               ], CountAll              , Ts)
@@ -176,38 +192,50 @@ of_processes(
     , cons([N, <<"processes_count_running">>           ], CountRunning          , Ts)
     , cons([N, <<"processes_count_suspended">>         ], CountSuspended        , Ts)
     , cons([N, <<"processes_count_waiting">>           ], CountWaiting          , Ts)
-    | PerProcessMsgsFlattened
+    | PerProcessMsgsAggregates
     ].
 
 -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
     [t()].
 of_process(
     #beam_stats_process
-    { pid               = Pid
+    { pid               = _
     , memory            = Memory
     , total_heap_size   = TotalHeapSize
     , stack_size        = StackSize
     , message_queue_len = MsgQueueLen
+    , reductions        = Reductions
     }=Process,
     <<NodeID/binary>>,
     Timestamp
 ) ->
     Origin = beam_stats_process:get_best_known_origin(Process),
     OriginBin = proc_origin_to_bin(Origin),
-    PidBin = pid_to_bin(Pid),
-    OriginAndPid = [OriginBin, PidBin],
     Ts = Timestamp,
     N  = NodeID,
-    [ cons([N, <<"process_memory">>            | OriginAndPid], Memory        , Ts)
-    , cons([N, <<"process_total_heap_size">>   | OriginAndPid], TotalHeapSize , Ts)
-    , cons([N, <<"process_stack_size">>        | OriginAndPid], StackSize     , Ts)
-    , cons([N, <<"process_message_queue_len">> | OriginAndPid], MsgQueueLen   , Ts)
+    [ cons([N, <<"process_memory">>            , OriginBin], Memory        , Ts)
+    , cons([N, <<"process_total_heap_size">>   , OriginBin], TotalHeapSize , Ts)
+    , cons([N, <<"process_stack_size">>        , OriginBin], StackSize     , Ts)
+    , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen   , Ts)
+    , cons([N, <<"process_reductions">>        , OriginBin], Reductions    , Ts)
     ].
 
+-spec aggregate_by_path([t()], erlang:timestamp()) ->
+    [t()].
+aggregate_by_path(Msgs, Timestamp) ->
+    Aggregate =
+        fun (?T{path=K, value=V}, ValsByPath) ->
+            dict:update_counter(K, V, ValsByPath)
+        end,
+    ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
+    ValsByPathList = dict:to_list(ValsByPathDict),
+    [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
+
 -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
     binary().
 proc_origin_to_bin({registered_name, Name}) ->
-    atom_to_binary(Name, utf8);
+    NameBin = atom_to_binary(Name, utf8),
+    <<"named--", NameBin/binary>>;
 proc_origin_to_bin({ancestry, Ancestry}) ->
     #beam_stats_process_ancestry
     { raw_initial_call  = InitCallRaw
@@ -220,7 +248,8 @@ proc_origin_to_bin({ancestry, Ancestry}) ->
     AncestorsBinOpt   = hope_option:map(AncestorsOpt     , fun ancestors_to_bin/1),
     AncestorsBin      = hope_option:get(AncestorsBinOpt  , Blank),
     InitCallRawBin    = mfa_to_bin(InitCallRaw),
-    << InitCallRawBin/binary
+    << "spawned-via--"
+     , InitCallRawBin/binary
      , "--"
      , InitCallOTPBin/binary
      , "--"
@@ -241,12 +270,7 @@ ancestors_to_bin([A | Ancestors]) ->
 ancestor_to_bin(A) when is_atom(A) ->
     atom_to_binary(A, utf8);
 ancestor_to_bin(A) when is_pid(A) ->
-    pid_to_bin(A).
-
-pid_to_bin(Pid) ->
-    PidList = erlang:pid_to_list(Pid),
-    PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
-             re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
+    <<"PID">>.
 
 -spec mfa_to_bin(mfa()) ->
     binary().
This page took 0.023054 seconds and 4 git commands to generate.