X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_msg_graphite.erl;h=5b8bd435098606b619b13ef696eed6ddd4d4693f;hb=e1221e429959c3184223b8e174ceb503e59781ba;hp=e8d0632c3771b54722c1671a0d789f5ddd9ae839;hpb=f65a38458dc079d26b6e6f2f0f7ddfe9ce39252f;p=beam_stats.git diff --git a/src/beam_stats_msg_graphite.erl b/src/beam_stats_msg_graphite.erl index e8d0632..5b8bd43 100644 --- a/src/beam_stats_msg_graphite.erl +++ b/src/beam_stats_msg_graphite.erl @@ -14,9 +14,8 @@ -export( [ of_beam_stats/1 , of_beam_stats/2 - , to_bin/1 - , path_to_bin/1 - , add_path_prefix/2 + , to_iolist/1 + , path_to_iolist/1 , node_id_to_bin/1 ]). @@ -53,40 +52,37 @@ of_beam_stats(#beam_stats ) -> Ts = Timestamp, N = NodeID, - [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts) - , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts) - , cons([N, <<"context_switches">> ], ContextSwitches, Ts) - , cons([N, <<"reductions">> ], Reductions , Ts) - , cons([N, <<"run_queue">> ], RunQueue , Ts) - | of_memory(Memory, NodeID, Ts) - ] - ++ of_ets(ETS, NodeID, Ts) - ++ of_processes(Processes, NodeID, Ts). + Msgs = + [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts) + , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts) + , cons([N, <<"context_switches">> ], ContextSwitches, Ts) + , cons([N, <<"reductions">> ], Reductions , Ts) + , cons([N, <<"run_queue">> ], RunQueue , Ts) + | of_memory(Memory, NodeID, Ts) + ] + ++ of_ets(ETS, NodeID, Ts) + ++ of_processes(Processes, NodeID, Ts), + lists:map(fun path_prefix_schema_version/1, Msgs). --spec to_bin(t()) -> - binary(). -to_bin( +-spec to_iolist(t()) -> + iolist(). +to_iolist( ?T { path = Path , value = Value , timestamp = Timestamp } ) -> - PathBin = path_to_bin(Path), + PathIOList = path_to_iolist(Path), ValueBin = integer_to_binary(Value), TimestampInt = timestamp_to_integer(Timestamp), TimestampBin = integer_to_binary(TimestampInt), - <>. - --spec add_path_prefix(t(), binary()) -> - t(). -add_path_prefix(?T{path=Path}=T, <>) -> - T?T{path = [Prefix | Path]}. + [PathIOList, <<" ">>, ValueBin, <<" ">>, TimestampBin]. --spec path_to_bin([binary()]) -> - binary(). -path_to_bin(Path) -> - bin_join(Path, <<".">>). +-spec path_to_iolist([binary()]) -> + iolist(). +path_to_iolist(Path) -> + interleave(Path, <<".">>). -spec node_id_to_bin(node()) -> binary(). @@ -98,13 +94,27 @@ node_id_to_bin(NodeID) -> %% Helpers %% ============================================================================ --spec bin_join([binary()], binary()) -> +-spec path_prefix_schema_version(t()) -> + t(). +path_prefix_schema_version(?T{}=T) -> + path_prefix(T, schema_version()). + +-spec path_prefix(t(), binary()) -> + t(). +path_prefix(?T{path=Path}=T, <>) -> + T?T{path = [Prefix | Path]}. + +-spec schema_version() -> binary(). -bin_join([] , <<_/binary>> ) -> <<>>; -bin_join([<> | []] , <<_/binary>> ) -> B; -bin_join([<> | [_|_]=Bins], <>) -> - BinsBin = bin_join(Bins, Sep), - <>. +schema_version() -> + <<"beam_stats_v0">>. + +-spec interleave([A], A) -> + [A]. +interleave([], _) -> []; +interleave([X], _) -> [X]; +interleave([X|Xs], Sep) -> + [X, Sep | interleave(Xs, Sep)]. -spec timestamp_to_integer(erlang:timestamp()) -> non_neg_integer(). @@ -125,8 +135,9 @@ of_memory(Memory, <>, Timestamp) -> [t()]. of_ets(PerTableStats, <>, Timestamp) -> OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end, - NestedMsgs = lists:map(OfEtsTable, PerTableStats), - lists:append(NestedMsgs). + MsgsNested = lists:map(OfEtsTable, PerTableStats), + MsgsFlattened = lists:append(MsgsNested), + aggregate_by_path(MsgsFlattened, Timestamp). -spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) -> [t()]. @@ -139,9 +150,13 @@ of_ets_table(#beam_stats_ets_table <>, Timestamp ) -> - IDBin = beam_stats_ets_table:id_to_bin(ID), + IDType = + case ID =:= Name + of true -> <<"NAMED">> + ; false -> <<"TID">> + end, NameBin = atom_to_binary(Name, latin1), - NameAndID = [NameBin, IDBin], + NameAndID = [NameBin, IDType], [ cons([NodeID, <<"ets_table">>, <<"size">> | NameAndID], Size , Timestamp) , cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp) ]. @@ -166,6 +181,7 @@ of_processes( OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end, PerProcessMsgsNested = lists:map(OfProcess, Processes), PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested), + PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp), Ts = Timestamp, N = NodeID, [ cons([N, <<"processes_count_all">> ], CountAll , Ts) @@ -176,7 +192,7 @@ of_processes( , cons([N, <<"processes_count_running">> ], CountRunning , Ts) , cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts) , cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts) - | PerProcessMsgsFlattened + | PerProcessMsgsAggregates ]. -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) -> @@ -188,6 +204,7 @@ of_process( , total_heap_size = TotalHeapSize , stack_size = StackSize , message_queue_len = MsgQueueLen + , reductions = Reductions }=Process, <>, Timestamp @@ -200,8 +217,20 @@ of_process( , cons([N, <<"process_total_heap_size">> , OriginBin], TotalHeapSize , Ts) , cons([N, <<"process_stack_size">> , OriginBin], StackSize , Ts) , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts) + , cons([N, <<"process_reductions">> , OriginBin], Reductions , Ts) ]. +-spec aggregate_by_path([t()], erlang:timestamp()) -> + [t()]. +aggregate_by_path(Msgs, Timestamp) -> + Aggregate = + fun (?T{path=K, value=V}, ValsByPath) -> + dict:update_counter(K, V, ValsByPath) + end, + ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs), + ValsByPathList = dict:to_list(ValsByPathDict), + [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList]. + -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> binary(). proc_origin_to_bin({registered_name, Name}) -> @@ -241,12 +270,7 @@ ancestors_to_bin([A | Ancestors]) -> ancestor_to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8); ancestor_to_bin(A) when is_pid(A) -> - pid_to_bin(A). - -pid_to_bin(Pid) -> - PidList = erlang:pid_to_list(Pid), - PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]), - re:replace(PidBin , "[><]", "" , [global, {return, binary}]). + <<"PID">>. -spec mfa_to_bin(mfa()) -> binary().