X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_msg_graphite.erl;h=69396539b363cdf8c593d62a57a4f9b7d14d3fe0;hb=7dbc59b67d2cebe0e498ac4fd167cbaabed0e55b;hp=87dae3e0ff8ef14bc0e670a7e247817f3610b5d7;hpb=10bc7b7109e3f124ac8228af0fa2c8395a407350;p=beam_stats.git diff --git a/src/beam_stats_msg_graphite.erl b/src/beam_stats_msg_graphite.erl index 87dae3e..6939653 100644 --- a/src/beam_stats_msg_graphite.erl +++ b/src/beam_stats_msg_graphite.erl @@ -16,7 +16,6 @@ , of_beam_stats/2 , to_bin/1 , path_to_bin/1 - , add_path_prefix/2 , node_id_to_bin/1 ]). @@ -53,15 +52,17 @@ of_beam_stats(#beam_stats ) -> Ts = Timestamp, N = NodeID, - [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts) - , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts) - , cons([N, <<"context_switches">> ], ContextSwitches, Ts) - , cons([N, <<"reductions">> ], Reductions , Ts) - , cons([N, <<"run_queue">> ], RunQueue , Ts) - | of_memory(Memory, NodeID, Ts) - ] - ++ of_ets(ETS, NodeID, Ts) - ++ of_processes(Processes, NodeID, Ts). + Msgs = + [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts) + , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts) + , cons([N, <<"context_switches">> ], ContextSwitches, Ts) + , cons([N, <<"reductions">> ], Reductions , Ts) + , cons([N, <<"run_queue">> ], RunQueue , Ts) + | of_memory(Memory, NodeID, Ts) + ] + ++ of_ets(ETS, NodeID, Ts) + ++ of_processes(Processes, NodeID, Ts), + lists:map(fun path_prefix_schema_version/1, Msgs). -spec to_bin(t()) -> binary(). @@ -78,11 +79,6 @@ to_bin( TimestampBin = integer_to_binary(TimestampInt), <>. --spec add_path_prefix(t(), binary()) -> - t(). -add_path_prefix(?T{path=Path}=T, <>) -> - T?T{path = [Prefix | Path]}. - -spec path_to_bin([binary()]) -> binary(). path_to_bin(Path) -> @@ -98,6 +94,21 @@ node_id_to_bin(NodeID) -> %% Helpers %% ============================================================================ +-spec path_prefix_schema_version(t()) -> + t(). +path_prefix_schema_version(?T{}=T) -> + path_prefix(T, schema_version()). + +-spec path_prefix(t(), binary()) -> + t(). +path_prefix(?T{path=Path}=T, <>) -> + T?T{path = [Prefix | Path]}. + +-spec schema_version() -> + binary(). +schema_version() -> + <<"beam_stats_v0">>. + -spec bin_join([binary()], binary()) -> binary(). bin_join([] , <<_/binary>> ) -> <<>>; @@ -125,8 +136,9 @@ of_memory(Memory, <>, Timestamp) -> [t()]. of_ets(PerTableStats, <>, Timestamp) -> OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end, - NestedMsgs = lists:map(OfEtsTable, PerTableStats), - lists:append(NestedMsgs). + MsgsNested = lists:map(OfEtsTable, PerTableStats), + MsgsFlattened = lists:append(MsgsNested), + aggregate_by_path(MsgsFlattened, Timestamp). -spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) -> [t()]. @@ -139,9 +151,13 @@ of_ets_table(#beam_stats_ets_table <>, Timestamp ) -> - IDBin = beam_stats_ets_table:id_to_bin(ID), + IDType = + case ID =:= Name + of true -> <<"NAMED">> + ; false -> <<"TID">> + end, NameBin = atom_to_binary(Name, latin1), - NameAndID = [NameBin, IDBin], + NameAndID = [NameBin, IDType], [ cons([NodeID, <<"ets_table">>, <<"size">> | NameAndID], Size , Timestamp) , cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp) ]. @@ -189,6 +205,7 @@ of_process( , total_heap_size = TotalHeapSize , stack_size = StackSize , message_queue_len = MsgQueueLen + , reductions = Reductions }=Process, <>, Timestamp @@ -201,6 +218,7 @@ of_process( , cons([N, <<"process_total_heap_size">> , OriginBin], TotalHeapSize , Ts) , cons([N, <<"process_stack_size">> , OriginBin], StackSize , Ts) , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts) + , cons([N, <<"process_reductions">> , OriginBin], Reductions , Ts) ]. -spec aggregate_by_path([t()], erlang:timestamp()) ->