X-Git-Url: https://git.xandkar.net/?p=beam_stats.git;a=blobdiff_plain;f=src%2Fbeam_stats_msg_graphite.erl;h=22990a44e0fd308d8ea032f4a35251fb436cdef3;hp=6883ff17beca508c21a53f0f46198942cf05f98c;hb=697c496d9970a9855c4e6b9ff9212564051b8290;hpb=8dd2cb56d2a0baabacfbab3795e8c7bffb5c06b5 diff --git a/src/beam_stats_msg_graphite.erl b/src/beam_stats_msg_graphite.erl index 6883ff1..22990a4 100644 --- a/src/beam_stats_msg_graphite.erl +++ b/src/beam_stats_msg_graphite.erl @@ -13,8 +13,11 @@ -export( [ of_beam_stats/1 + , of_beam_stats/2 , to_bin/1 , path_to_bin/1 + , add_path_prefix/2 + , node_id_to_bin/1 ]). -define(T, #?MODULE). @@ -38,7 +41,6 @@ of_beam_stats(#beam_stats { timestamp = Timestamp , node_id = _ , memory = Memory - % TODO: Handle the rest of data points , io_bytes_in = IOBytesIn , io_bytes_out = IOBytesOut , context_switches = ContextSwitches @@ -76,11 +78,22 @@ to_bin( TimestampBin = integer_to_binary(TimestampInt), <>. +-spec add_path_prefix(t(), binary()) -> + t(). +add_path_prefix(?T{path=Path}=T, <>) -> + T?T{path = [Prefix | Path]}. + -spec path_to_bin([binary()]) -> binary(). path_to_bin(Path) -> bin_join(Path, <<".">>). +-spec node_id_to_bin(node()) -> + binary(). +node_id_to_bin(NodeID) -> + NodeIDBin = atom_to_binary(NodeID, utf8), + re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). + %% ============================================================================ %% Helpers %% ============================================================================ @@ -108,7 +121,7 @@ of_memory(Memory, <>, Timestamp) -> end, lists:map(ComponentToMessage, Memory). --spec of_ets(beam_stats_ets_table:t(), binary(), erlang:timestamp()) -> +-spec of_ets(beam_stats_ets:t(), binary(), erlang:timestamp()) -> [t()]. of_ets(PerTableStats, <>, Timestamp) -> OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end, @@ -153,6 +166,7 @@ of_processes( OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end, PerProcessMsgsNested = lists:map(OfProcess, Processes), PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested), + PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp), Ts = Timestamp, N = NodeID, [ cons([N, <<"processes_count_all">> ], CountAll , Ts) @@ -163,14 +177,14 @@ of_processes( , cons([N, <<"processes_count_running">> ], CountRunning , Ts) , cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts) , cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts) - | PerProcessMsgsFlattened + | PerProcessMsgsAggregates ]. -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) -> [t()]. of_process( #beam_stats_process - { pid = Pid + { pid = _ , memory = Memory , total_heap_size = TotalHeapSize , stack_size = StackSize @@ -181,20 +195,30 @@ of_process( ) -> Origin = beam_stats_process:get_best_known_origin(Process), OriginBin = proc_origin_to_bin(Origin), - PidBin = pid_to_bin(Pid), - OriginAndPid = [OriginBin, PidBin], Ts = Timestamp, N = NodeID, - [ cons([N, <<"process_memory">> | OriginAndPid], Memory , Ts) - , cons([N, <<"process_total_heap_size">> | OriginAndPid], TotalHeapSize , Ts) - , cons([N, <<"process_stack_size">> | OriginAndPid], StackSize , Ts) - , cons([N, <<"process_message_queue_len">> | OriginAndPid], MsgQueueLen , Ts) + [ cons([N, <<"process_memory">> , OriginBin], Memory , Ts) + , cons([N, <<"process_total_heap_size">> , OriginBin], TotalHeapSize , Ts) + , cons([N, <<"process_stack_size">> , OriginBin], StackSize , Ts) + , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts) ]. +-spec aggregate_by_path([t()], erlang:timestamp()) -> + [t()]. +aggregate_by_path(Msgs, Timestamp) -> + Aggregate = + fun (?T{path=K, value=V}, ValsByPath) -> + dict:update_counter(K, V, ValsByPath) + end, + ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs), + ValsByPathList = dict:to_list(ValsByPathDict), + [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList]. + -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> binary(). proc_origin_to_bin({registered_name, Name}) -> - atom_to_binary(Name, utf8); + NameBin = atom_to_binary(Name, utf8), + <<"named--", NameBin/binary>>; proc_origin_to_bin({ancestry, Ancestry}) -> #beam_stats_process_ancestry { raw_initial_call = InitCallRaw @@ -207,7 +231,8 @@ proc_origin_to_bin({ancestry, Ancestry}) -> AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1), AncestorsBin = hope_option:get(AncestorsBinOpt , Blank), InitCallRawBin = mfa_to_bin(InitCallRaw), - << InitCallRawBin/binary + << "spawned-via--" + , InitCallRawBin/binary , "--" , InitCallOTPBin/binary , "--" @@ -251,9 +276,3 @@ cons(Path, Value, Timestamp) -> , value = Value , timestamp = Timestamp }. - --spec node_id_to_bin(node()) -> - binary(). -node_id_to_bin(NodeID) -> - NodeIDBin = atom_to_binary(NodeID, utf8), - re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).