From 7dbc59b67d2cebe0e498ac4fd167cbaabed0e55b Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sun, 27 Sep 2015 18:28:28 -0400 Subject: [PATCH] Collect reductions per process. --- include/beam_stats_process.hrl | 1 + src/beam_stats.erl | 2 +- src/beam_stats_delta.erl | 22 ++++++++++++++++++++-- src/beam_stats_msg_graphite.erl | 2 ++ src/beam_stats_process.erl | 20 +++++++++++++++++--- src/beam_stats_processes.erl | 16 +++++++++------- test/beam_stats_consumer_statsd_SUITE.erl | 11 ++++++++++- 7 files changed, 60 insertions(+), 14 deletions(-) diff --git a/include/beam_stats_process.hrl b/include/beam_stats_process.hrl index 7d2d951..ec0e49c 100644 --- a/include/beam_stats_process.hrl +++ b/include/beam_stats_process.hrl @@ -7,4 +7,5 @@ , total_heap_size :: non_neg_integer() , stack_size :: non_neg_integer() , message_queue_len :: non_neg_integer() + , reductions :: non_neg_integer() }). diff --git a/src/beam_stats.erl b/src/beam_stats.erl index b3e445e..61d8f3b 100644 --- a/src/beam_stats.erl +++ b/src/beam_stats.erl @@ -31,5 +31,5 @@ collect(DeltasServer) -> , reductions = beam_stats_delta:of_reductions(DeltasServer) , run_queue = beam_stats_source:erlang_statistics(run_queue) , ets = beam_stats_ets:collect() - , processes = beam_stats_processes:collect() + , processes = beam_stats_processes:collect(DeltasServer) }. diff --git a/src/beam_stats_delta.erl b/src/beam_stats_delta.erl index 42f2399..45cfdb0 100644 --- a/src/beam_stats_delta.erl +++ b/src/beam_stats_delta.erl @@ -10,10 +10,12 @@ , of_context_switches/1 , of_io/1 , of_reductions/1 + , of_process_info_reductions/2 ]). -record(?MODULE, { erlang_statistics :: ets:tid() + , erlang_process_info_reductions :: ets:tid() }). -define(T, #?MODULE). @@ -30,15 +32,19 @@ start() -> ], ?T { erlang_statistics = ets:new(beam_stats_delta_erlang_statistics, Options) + , erlang_process_info_reductions = + ets:new(beam_stats_delta_erlang_process_info_reductions, Options) }. -spec stop(t()) -> {}. stop(?T { erlang_statistics = TidErlangStatistics + , erlang_process_info_reductions = TidErlangProcessInfoReductions } ) -> true = ets:delete(TidErlangStatistics), + true = ets:delete(TidErlangProcessInfoReductions), {}. -spec of_context_switches(t()) -> @@ -75,8 +81,20 @@ of_reductions(?T{erlang_statistics=Table}) -> {Current, _} = beam_stats_source:erlang_statistics(Key), delta(Table, Key, Current). --spec delta(ets:tid(), atom(), non_neg_integer()) -> - non_neg_integer(). +-spec of_process_info_reductions(t(), pid()) -> + hope_option:t(non_neg_integer()). +of_process_info_reductions(?T{erlang_process_info_reductions=Table}, Pid) -> + case beam_stats_source:erlang_process_info(Pid, reductions) + of undefined -> + none + ; {reductions, Current} -> + Delta = delta(Table, Pid, Current), + {some, Delta} + end. + +-spec delta(ets:tid(), Key, non_neg_integer()) -> + non_neg_integer() + when Key :: atom() | pid(). delta(Table, Key, CurrentTotal) -> PreviousTotalOpt = find(Table, Key), PreviousTotal = hope_option:get(PreviousTotalOpt, 0), diff --git a/src/beam_stats_msg_graphite.erl b/src/beam_stats_msg_graphite.erl index 26751a5..6939653 100644 --- a/src/beam_stats_msg_graphite.erl +++ b/src/beam_stats_msg_graphite.erl @@ -205,6 +205,7 @@ of_process( , total_heap_size = TotalHeapSize , stack_size = StackSize , message_queue_len = MsgQueueLen + , reductions = Reductions }=Process, <>, Timestamp @@ -217,6 +218,7 @@ of_process( , cons([N, <<"process_total_heap_size">> , OriginBin], TotalHeapSize , Ts) , cons([N, <<"process_stack_size">> , OriginBin], StackSize , Ts) , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts) + , cons([N, <<"process_reductions">> , OriginBin], Reductions , Ts) ]. -spec aggregate_by_path([t()], erlang:timestamp()) -> diff --git a/src/beam_stats_process.erl b/src/beam_stats_process.erl index b0b7da4..3f2c9a4 100644 --- a/src/beam_stats_process.erl +++ b/src/beam_stats_process.erl @@ -11,7 +11,7 @@ ]). -export( - [ of_pid/1 + [ of_pid/2 , get_best_known_origin/1 , print/1 ]). @@ -42,11 +42,11 @@ %% Public API %% ============================================================================ --spec of_pid(pid()) -> +-spec of_pid(pid(), beam_stats_delta:t()) -> none % when process is dead | {some, t()} % when process is alive . -of_pid(Pid) -> +of_pid(Pid, DeltasServer) -> try Dict = pid_info_exn(Pid, dictionary), Ancestry = @@ -65,6 +65,7 @@ of_pid(Pid) -> , total_heap_size = pid_info_exn(Pid, total_heap_size) , stack_size = pid_info_exn(Pid, stack_size) , message_queue_len = pid_info_exn(Pid, message_queue_len) + , reductions = pid_info_reductions(Pid, DeltasServer) }, {some, T} catch throw:{process_dead, _} -> @@ -87,6 +88,7 @@ print( , total_heap_size = TotalHeapSize , stack_size = StackSize , message_queue_len = MsgQueueLen + , reductions = Reductions }=T ) -> BestKnownOrigin = get_best_known_origin(T), @@ -103,6 +105,7 @@ print( "TotalHeapSize : ~p~n" "StackSize : ~p~n" "MsgQueueLen : ~p~n" + "Reductions : ~p~n" "~n", [ Pid , BestKnownOrigin @@ -115,6 +118,7 @@ print( , TotalHeapSize , StackSize , MsgQueueLen + , Reductions ] ). @@ -122,6 +126,16 @@ print( %% Private helpers %% ============================================================================ +-spec pid_info_reductions(pid(), beam_stats_delta:t()) -> + non_neg_integer(). +pid_info_reductions(Pid, DeltasServer) -> + case beam_stats_delta:of_process_info_reductions(DeltasServer, Pid) + of {some, Reductions} -> + Reductions + ; none -> + throw({process_dead, Pid}) + end. + pid_info_exn(Pid, Key) -> {some, Value} = pid_info_opt(Pid, Key), Value. diff --git a/src/beam_stats_processes.erl b/src/beam_stats_processes.erl index 1b3fcb2..d369314 100644 --- a/src/beam_stats_processes.erl +++ b/src/beam_stats_processes.erl @@ -8,8 +8,8 @@ ]). -export( - [ collect/0 - , collect_and_print/0 + [ collect/1 + , collect_and_print/1 , print/1 ]). @@ -18,11 +18,11 @@ -type t() :: ?T{}. --spec collect() -> +-spec collect(beam_stats_delta:t()) -> t(). -collect() -> +collect(DeltasServer) -> Pids = beam_stats_source:erlang_processes(), - PsOpts = [beam_stats_process:of_pid(P) || P <- Pids], + PsOpts = [beam_stats_process:of_pid(P, DeltasServer) || P <- Pids], Ps = [P || {some, P} <- PsOpts], ?T { individual_stats @@ -45,8 +45,10 @@ collect() -> = length([P || P <- Ps, P#beam_stats_process.status =:= waiting]) }. -collect_and_print() -> - print(collect()). +-spec collect_and_print(beam_stats_delta:t()) -> + ok. +collect_and_print(DeltasServer) -> + print(collect(DeltasServer)). -spec print(t()) -> ok. diff --git a/test/beam_stats_consumer_statsd_SUITE.erl b/test/beam_stats_consumer_statsd_SUITE.erl index a5b3779..ac21f7a 100644 --- a/test/beam_stats_consumer_statsd_SUITE.erl +++ b/test/beam_stats_consumer_statsd_SUITE.erl @@ -118,18 +118,21 @@ t_full_cycle(_Cfg) -> , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.named--reg_name_foo:25|g">> , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.named--reg_name_foo:10|g">> , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.named--reg_name_foo:0|g">> + , <<"beam_stats_v0.node_foo_host_bar.process_reductions.named--reg_name_foo:0|g">> % Process 2 , <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--bar_mod-bar_fun-1--NONE--NONE:25|g">> , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:35|g">> , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:40|g">> , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--bar_mod-bar_fun-1--NONE--NONE:5|g">> + , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--bar_mod-bar_fun-1--NONE--NONE:0|g">> % Process 3 and 4, aggregated by origin , <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:30|g">> , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:45|g">> , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:55|g">> , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:1|g">> + , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:0|g">> ], MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]), RemoveExpectedFromReceived = @@ -141,7 +144,9 @@ t_full_cycle(_Cfg) -> true = lists:member(Expected, Received), Received -- [Expected] end, - [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected), + MsgsRemaining = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected), + ct:log("MsgsRemaining: ~p", [MsgsRemaining]), + [] = MsgsRemaining, meck:unload(beam_stats_source). meck_expect_beam_stats() -> @@ -305,6 +310,7 @@ meck_expect_beam_stats(Overrides) -> ; total_heap_size -> {K, 25} ; stack_size -> {K, 10} ; message_queue_len -> {K, 0} + ; reductions -> {K, 1} end ; (P, K) when P == Pid2 -> case K @@ -316,6 +322,7 @@ meck_expect_beam_stats(Overrides) -> ; total_heap_size -> {K, 35} ; stack_size -> {K, 40} ; message_queue_len -> {K, 5} + ; reductions -> {K, 2} end ; (P, K) when P == Pid3 -> Dict = @@ -331,6 +338,7 @@ meck_expect_beam_stats(Overrides) -> ; total_heap_size -> {K, 35} ; stack_size -> {K, 40} ; message_queue_len -> {K, 1} + ; reductions -> {K, 3} end ; (P, K) when P == Pid4 -> Dict = @@ -346,6 +354,7 @@ meck_expect_beam_stats(Overrides) -> ; total_heap_size -> {K, 10} ; stack_size -> {K, 15} ; message_queue_len -> {K, 0} + ; reductions -> {K, 4} end end ), -- 2.20.1