From 697c496d9970a9855c4e6b9ff9212564051b8290 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Thu, 24 Sep 2015 01:25:58 -0400 Subject: [PATCH] feat: aggregate anonymous process data by origin --- src/beam_stats.app.src | 2 +- src/beam_stats_msg_graphite.erl | 14 ++++++- test/beam_stats_consumer_statsd_SUITE.erl | 51 +++++++++++++++++++---- 3 files changed, 56 insertions(+), 11 deletions(-) diff --git a/src/beam_stats.app.src b/src/beam_stats.app.src index e9dcfff..56f2133 100644 --- a/src/beam_stats.app.src +++ b/src/beam_stats.app.src @@ -1,7 +1,7 @@ {application, beam_stats, [ {description, "Periodic VM stats production and consumption."}, - {vsn, "0.10.0"}, + {vsn, "0.11.0"}, {registered, []}, {applications, [ kernel diff --git a/src/beam_stats_msg_graphite.erl b/src/beam_stats_msg_graphite.erl index e8d0632..22990a4 100644 --- a/src/beam_stats_msg_graphite.erl +++ b/src/beam_stats_msg_graphite.erl @@ -166,6 +166,7 @@ of_processes( OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end, PerProcessMsgsNested = lists:map(OfProcess, Processes), PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested), + PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp), Ts = Timestamp, N = NodeID, [ cons([N, <<"processes_count_all">> ], CountAll , Ts) @@ -176,7 +177,7 @@ of_processes( , cons([N, <<"processes_count_running">> ], CountRunning , Ts) , cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts) , cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts) - | PerProcessMsgsFlattened + | PerProcessMsgsAggregates ]. -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) -> @@ -202,6 +203,17 @@ of_process( , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts) ]. +-spec aggregate_by_path([t()], erlang:timestamp()) -> + [t()]. +aggregate_by_path(Msgs, Timestamp) -> + Aggregate = + fun (?T{path=K, value=V}, ValsByPath) -> + dict:update_counter(K, V, ValsByPath) + end, + ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs), + ValsByPathList = dict:to_list(ValsByPathDict), + [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList]. + -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> binary(). proc_origin_to_bin({registered_name, Name}) -> diff --git a/test/beam_stats_consumer_statsd_SUITE.erl b/test/beam_stats_consumer_statsd_SUITE.erl index 0318b39..43c1a1f 100644 --- a/test/beam_stats_consumer_statsd_SUITE.erl +++ b/test/beam_stats_consumer_statsd_SUITE.erl @@ -107,14 +107,14 @@ t_full_cycle(_Cfg) -> , <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:64|g">> % Processes totals - , <<"beam_stats.node_foo_host_bar.processes_count_all:3|g">> + , <<"beam_stats.node_foo_host_bar.processes_count_all:4|g">> , <<"beam_stats.node_foo_host_bar.processes_count_exiting:0|g">> , <<"beam_stats.node_foo_host_bar.processes_count_garbage_collecting:0|g">> , <<"beam_stats.node_foo_host_bar.processes_count_registered:1|g">> , <<"beam_stats.node_foo_host_bar.processes_count_runnable:0|g">> , <<"beam_stats.node_foo_host_bar.processes_count_running:3|g">> , <<"beam_stats.node_foo_host_bar.processes_count_suspended:0|g">> - , <<"beam_stats.node_foo_host_bar.processes_count_waiting:0|g">> + , <<"beam_stats.node_foo_host_bar.processes_count_waiting:1|g">> % Process 1 , <<"beam_stats.node_foo_host_bar.process_memory.named--reg_name_foo:15|g">> @@ -128,10 +128,10 @@ t_full_cycle(_Cfg) -> , <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:40|g">> , <<"beam_stats.node_foo_host_bar.process_message_queue_len.spawned-via--bar_mod-bar_fun-1--NONE--NONE:5|g">> - % Process 3 - , <<"beam_stats.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:25|g">> - , <<"beam_stats.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:35|g">> - , <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:40|g">> + % Process 3 and 4, aggregated by origin + , <<"beam_stats.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:30|g">> + , <<"beam_stats.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:45|g">> + , <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:55|g">> , <<"beam_stats.node_foo_host_bar.process_message_queue_len.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:1|g">> ], MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]), @@ -158,6 +158,7 @@ meck_expect_beam_stats(Overrides) -> Pid1 = list_to_pid("<0.1.0>"), Pid2 = list_to_pid("<0.2.0>"), Pid3 = list_to_pid("<0.3.0>"), + Pid4 = list_to_pid("<0.4.0>"), Process1 = #beam_stats_process { pid = Pid1 @@ -206,21 +207,38 @@ meck_expect_beam_stats(Overrides) -> , stack_size = 40 , message_queue_len = 1 }, + Process4 = + #beam_stats_process + { pid = Pid4 + , registered_name = none + , ancestry = + #beam_stats_process_ancestry + { raw_initial_call = {baz_mod, baz_fun, 3} + , otp_initial_call = {some, {baz_otp_mod, baz_otp_fun, 2}} + , otp_ancestors = {some, [Pid0, Pid1]} + } + , status = waiting + , memory = 5 + , total_heap_size = 10 + , stack_size = 15 + , message_queue_len = 0 + }, Processes = #beam_stats_processes { individual_stats = [ Process1 , Process2 , Process3 + , Process4 ] - , count_all = 3 + , count_all = 4 , count_exiting = 0 , count_garbage_collecting = 0 , count_registered = 1 , count_runnable = 0 , count_running = 3 , count_suspended = 0 - , count_waiting = 0 + , count_waiting = 1 }, ETSTableStatsFoo = #beam_stats_ets_table @@ -263,7 +281,7 @@ meck_expect_beam_stats(Overrides) -> end ), meck:expect(beam_stats_source, erlang_processes, - fun () -> [Pid1, Pid2, Pid3] end), + fun () -> [Pid1, Pid2, Pid3, Pid4] end), meck:expect(beam_stats_source, os_timestamp, fun () -> {1, 2, 3} end), meck:expect(beam_stats_source, erlang_process_info, @@ -304,6 +322,21 @@ meck_expect_beam_stats(Overrides) -> ; stack_size -> {K, 40} ; message_queue_len -> {K, 1} end + ; (P, K) when P == Pid4 -> + Dict = + [ {'$initial_call', {baz_otp_mod, baz_otp_fun, 2}} + , {'$ancestors' , [Pid0, Pid1]} + ], + case K + of dictionary -> {K, Dict} + ; initial_call -> {K, {baz_mod, baz_fun, 3}} + ; registered_name -> [] + ; status -> {K, waiting} + ; memory -> {K, 5} + ; total_heap_size -> {K, 10} + ; stack_size -> {K, 15} + ; message_queue_len -> {K, 0} + end end ), #beam_stats -- 2.20.1