From: Siraaj Khandkar Date: Thu, 24 Sep 2015 05:41:17 +0000 (-0400) Subject: Merge pull request #13 from ibnfirnas/aggregate-anonymous-process-data X-Git-Tag: 0.11.0^0 X-Git-Url: https://git.xandkar.net/?p=beam_stats.git;a=commitdiff_plain;h=95fd75219d86139f45d3858c360c6acf2966b5c0;hp=b8e10327d7c189769c32e439ec2b1cf9773afade Merge pull request #13 from ibnfirnas/aggregate-anonymous-process-data Aggregate anonymous process data by process origin --- diff --git a/src/beam_stats.app.src b/src/beam_stats.app.src index e9dcfff..56f2133 100644 --- a/src/beam_stats.app.src +++ b/src/beam_stats.app.src @@ -1,7 +1,7 @@ {application, beam_stats, [ {description, "Periodic VM stats production and consumption."}, - {vsn, "0.10.0"}, + {vsn, "0.11.0"}, {registered, []}, {applications, [ kernel diff --git a/src/beam_stats_msg_graphite.erl b/src/beam_stats_msg_graphite.erl index de18c0d..22990a4 100644 --- a/src/beam_stats_msg_graphite.erl +++ b/src/beam_stats_msg_graphite.erl @@ -166,6 +166,7 @@ of_processes( OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end, PerProcessMsgsNested = lists:map(OfProcess, Processes), PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested), + PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp), Ts = Timestamp, N = NodeID, [ cons([N, <<"processes_count_all">> ], CountAll , Ts) @@ -176,14 +177,14 @@ of_processes( , cons([N, <<"processes_count_running">> ], CountRunning , Ts) , cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts) , cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts) - | PerProcessMsgsFlattened + | PerProcessMsgsAggregates ]. -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) -> [t()]. of_process( #beam_stats_process - { pid = Pid + { pid = _ , memory = Memory , total_heap_size = TotalHeapSize , stack_size = StackSize @@ -194,20 +195,30 @@ of_process( ) -> Origin = beam_stats_process:get_best_known_origin(Process), OriginBin = proc_origin_to_bin(Origin), - PidBin = pid_to_bin(Pid), - OriginAndPid = [OriginBin, PidBin], Ts = Timestamp, N = NodeID, - [ cons([N, <<"process_memory">> | OriginAndPid], Memory , Ts) - , cons([N, <<"process_total_heap_size">> | OriginAndPid], TotalHeapSize , Ts) - , cons([N, <<"process_stack_size">> | OriginAndPid], StackSize , Ts) - , cons([N, <<"process_message_queue_len">> | OriginAndPid], MsgQueueLen , Ts) + [ cons([N, <<"process_memory">> , OriginBin], Memory , Ts) + , cons([N, <<"process_total_heap_size">> , OriginBin], TotalHeapSize , Ts) + , cons([N, <<"process_stack_size">> , OriginBin], StackSize , Ts) + , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts) ]. +-spec aggregate_by_path([t()], erlang:timestamp()) -> + [t()]. +aggregate_by_path(Msgs, Timestamp) -> + Aggregate = + fun (?T{path=K, value=V}, ValsByPath) -> + dict:update_counter(K, V, ValsByPath) + end, + ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs), + ValsByPathList = dict:to_list(ValsByPathDict), + [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList]. + -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> binary(). proc_origin_to_bin({registered_name, Name}) -> - atom_to_binary(Name, utf8); + NameBin = atom_to_binary(Name, utf8), + <<"named--", NameBin/binary>>; proc_origin_to_bin({ancestry, Ancestry}) -> #beam_stats_process_ancestry { raw_initial_call = InitCallRaw @@ -220,7 +231,8 @@ proc_origin_to_bin({ancestry, Ancestry}) -> AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1), AncestorsBin = hope_option:get(AncestorsBinOpt , Blank), InitCallRawBin = mfa_to_bin(InitCallRaw), - << InitCallRawBin/binary + << "spawned-via--" + , InitCallRawBin/binary , "--" , InitCallOTPBin/binary , "--" diff --git a/test/beam_stats_consumer_statsd_SUITE.erl b/test/beam_stats_consumer_statsd_SUITE.erl index 33d78c9..43c1a1f 100644 --- a/test/beam_stats_consumer_statsd_SUITE.erl +++ b/test/beam_stats_consumer_statsd_SUITE.erl @@ -107,32 +107,32 @@ t_full_cycle(_Cfg) -> , <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:64|g">> % Processes totals - , <<"beam_stats.node_foo_host_bar.processes_count_all:3|g">> + , <<"beam_stats.node_foo_host_bar.processes_count_all:4|g">> , <<"beam_stats.node_foo_host_bar.processes_count_exiting:0|g">> , <<"beam_stats.node_foo_host_bar.processes_count_garbage_collecting:0|g">> , <<"beam_stats.node_foo_host_bar.processes_count_registered:1|g">> , <<"beam_stats.node_foo_host_bar.processes_count_runnable:0|g">> , <<"beam_stats.node_foo_host_bar.processes_count_running:3|g">> , <<"beam_stats.node_foo_host_bar.processes_count_suspended:0|g">> - , <<"beam_stats.node_foo_host_bar.processes_count_waiting:0|g">> + , <<"beam_stats.node_foo_host_bar.processes_count_waiting:1|g">> % Process 1 - , <<"beam_stats.node_foo_host_bar.process_memory.reg_name_foo.0_1_0:15|g">> - , <<"beam_stats.node_foo_host_bar.process_total_heap_size.reg_name_foo.0_1_0:25|g">> - , <<"beam_stats.node_foo_host_bar.process_stack_size.reg_name_foo.0_1_0:10|g">> - , <<"beam_stats.node_foo_host_bar.process_message_queue_len.reg_name_foo.0_1_0:0|g">> + , <<"beam_stats.node_foo_host_bar.process_memory.named--reg_name_foo:15|g">> + , <<"beam_stats.node_foo_host_bar.process_total_heap_size.named--reg_name_foo:25|g">> + , <<"beam_stats.node_foo_host_bar.process_stack_size.named--reg_name_foo:10|g">> + , <<"beam_stats.node_foo_host_bar.process_message_queue_len.named--reg_name_foo:0|g">> % Process 2 - , <<"beam_stats.node_foo_host_bar.process_memory.bar_mod-bar_fun-1--NONE--NONE.0_2_0:25|g">> - , <<"beam_stats.node_foo_host_bar.process_total_heap_size.bar_mod-bar_fun-1--NONE--NONE.0_2_0:35|g">> - , <<"beam_stats.node_foo_host_bar.process_stack_size.bar_mod-bar_fun-1--NONE--NONE.0_2_0:40|g">> - , <<"beam_stats.node_foo_host_bar.process_message_queue_len.bar_mod-bar_fun-1--NONE--NONE.0_2_0:5|g">> + , <<"beam_stats.node_foo_host_bar.process_memory.spawned-via--bar_mod-bar_fun-1--NONE--NONE:25|g">> + , <<"beam_stats.node_foo_host_bar.process_total_heap_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:35|g">> + , <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:40|g">> + , <<"beam_stats.node_foo_host_bar.process_message_queue_len.spawned-via--bar_mod-bar_fun-1--NONE--NONE:5|g">> - % Process 3 - , <<"beam_stats.node_foo_host_bar.process_memory.baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0.0_3_0:25|g">> - , <<"beam_stats.node_foo_host_bar.process_total_heap_size.baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0.0_3_0:35|g">> - , <<"beam_stats.node_foo_host_bar.process_stack_size.baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0.0_3_0:40|g">> - , <<"beam_stats.node_foo_host_bar.process_message_queue_len.baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0.0_3_0:1|g">> + % Process 3 and 4, aggregated by origin + , <<"beam_stats.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:30|g">> + , <<"beam_stats.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:45|g">> + , <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:55|g">> + , <<"beam_stats.node_foo_host_bar.process_message_queue_len.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:1|g">> ], MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]), RemoveExpectedFromReceived = @@ -158,6 +158,7 @@ meck_expect_beam_stats(Overrides) -> Pid1 = list_to_pid("<0.1.0>"), Pid2 = list_to_pid("<0.2.0>"), Pid3 = list_to_pid("<0.3.0>"), + Pid4 = list_to_pid("<0.4.0>"), Process1 = #beam_stats_process { pid = Pid1 @@ -206,21 +207,38 @@ meck_expect_beam_stats(Overrides) -> , stack_size = 40 , message_queue_len = 1 }, + Process4 = + #beam_stats_process + { pid = Pid4 + , registered_name = none + , ancestry = + #beam_stats_process_ancestry + { raw_initial_call = {baz_mod, baz_fun, 3} + , otp_initial_call = {some, {baz_otp_mod, baz_otp_fun, 2}} + , otp_ancestors = {some, [Pid0, Pid1]} + } + , status = waiting + , memory = 5 + , total_heap_size = 10 + , stack_size = 15 + , message_queue_len = 0 + }, Processes = #beam_stats_processes { individual_stats = [ Process1 , Process2 , Process3 + , Process4 ] - , count_all = 3 + , count_all = 4 , count_exiting = 0 , count_garbage_collecting = 0 , count_registered = 1 , count_runnable = 0 , count_running = 3 , count_suspended = 0 - , count_waiting = 0 + , count_waiting = 1 }, ETSTableStatsFoo = #beam_stats_ets_table @@ -263,7 +281,7 @@ meck_expect_beam_stats(Overrides) -> end ), meck:expect(beam_stats_source, erlang_processes, - fun () -> [Pid1, Pid2, Pid3] end), + fun () -> [Pid1, Pid2, Pid3, Pid4] end), meck:expect(beam_stats_source, os_timestamp, fun () -> {1, 2, 3} end), meck:expect(beam_stats_source, erlang_process_info, @@ -304,6 +322,21 @@ meck_expect_beam_stats(Overrides) -> ; stack_size -> {K, 40} ; message_queue_len -> {K, 1} end + ; (P, K) when P == Pid4 -> + Dict = + [ {'$initial_call', {baz_otp_mod, baz_otp_fun, 2}} + , {'$ancestors' , [Pid0, Pid1]} + ], + case K + of dictionary -> {K, Dict} + ; initial_call -> {K, {baz_mod, baz_fun, 3}} + ; registered_name -> [] + ; status -> {K, waiting} + ; memory -> {K, 5} + ; total_heap_size -> {K, 10} + ; stack_size -> {K, 15} + ; message_queue_len -> {K, 0} + end end ), #beam_stats