Collect reductions per process.
authorSiraaj Khandkar <siraaj@khandkar.net>
Sun, 27 Sep 2015 22:28:28 +0000 (18:28 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Sun, 27 Sep 2015 22:28:28 +0000 (18:28 -0400)
include/beam_stats_process.hrl
src/beam_stats.erl
src/beam_stats_delta.erl
src/beam_stats_msg_graphite.erl
src/beam_stats_process.erl
src/beam_stats_processes.erl
test/beam_stats_consumer_statsd_SUITE.erl

index 7d2d951..ec0e49c 100644 (file)
@@ -7,4 +7,5 @@
     , total_heap_size   ::               non_neg_integer()
     , stack_size        ::               non_neg_integer()
     , message_queue_len ::               non_neg_integer()
+    , reductions        ::               non_neg_integer()
     }).
index b3e445e..61d8f3b 100644 (file)
@@ -31,5 +31,5 @@ collect(DeltasServer) ->
     , reductions       = beam_stats_delta:of_reductions(DeltasServer)
     , run_queue        = beam_stats_source:erlang_statistics(run_queue)
     , ets              = beam_stats_ets:collect()
-    , processes        = beam_stats_processes:collect()
+    , processes        = beam_stats_processes:collect(DeltasServer)
     }.
index 42f2399..45cfdb0 100644 (file)
     , of_context_switches/1
     , of_io/1
     , of_reductions/1
+    , of_process_info_reductions/2
     ]).
 
 -record(?MODULE,
     { erlang_statistics :: ets:tid()
+    , erlang_process_info_reductions :: ets:tid()
     }).
 
 -define(T, #?MODULE).
@@ -30,15 +32,19 @@ start() ->
         ],
     ?T
     { erlang_statistics = ets:new(beam_stats_delta_erlang_statistics, Options)
+    , erlang_process_info_reductions =
+        ets:new(beam_stats_delta_erlang_process_info_reductions, Options)
     }.
 
 -spec stop(t()) ->
     {}.
 stop(?T
     { erlang_statistics = TidErlangStatistics
+    , erlang_process_info_reductions = TidErlangProcessInfoReductions
     }
 ) ->
     true = ets:delete(TidErlangStatistics),
+    true = ets:delete(TidErlangProcessInfoReductions),
     {}.
 
 -spec of_context_switches(t()) ->
@@ -75,8 +81,20 @@ of_reductions(?T{erlang_statistics=Table}) ->
     {Current, _} = beam_stats_source:erlang_statistics(Key),
     delta(Table, Key, Current).
 
--spec delta(ets:tid(), atom(), non_neg_integer()) ->
-    non_neg_integer().
+-spec of_process_info_reductions(t(), pid()) ->
+    hope_option:t(non_neg_integer()).
+of_process_info_reductions(?T{erlang_process_info_reductions=Table}, Pid) ->
+    case beam_stats_source:erlang_process_info(Pid, reductions)
+    of  undefined ->
+            none
+    ;   {reductions, Current} ->
+            Delta = delta(Table, Pid, Current),
+            {some, Delta}
+    end.
+
+-spec delta(ets:tid(), Key, non_neg_integer()) ->
+    non_neg_integer()
+    when Key :: atom() | pid().
 delta(Table, Key, CurrentTotal) ->
     PreviousTotalOpt = find(Table, Key),
     PreviousTotal = hope_option:get(PreviousTotalOpt, 0),
index 26751a5..6939653 100644 (file)
@@ -205,6 +205,7 @@ of_process(
     , total_heap_size   = TotalHeapSize
     , stack_size        = StackSize
     , message_queue_len = MsgQueueLen
+    , reductions        = Reductions
     }=Process,
     <<NodeID/binary>>,
     Timestamp
@@ -217,6 +218,7 @@ of_process(
     , cons([N, <<"process_total_heap_size">>   , OriginBin], TotalHeapSize , Ts)
     , cons([N, <<"process_stack_size">>        , OriginBin], StackSize     , Ts)
     , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen   , Ts)
+    , cons([N, <<"process_reductions">>        , OriginBin], Reductions    , Ts)
     ].
 
 -spec aggregate_by_path([t()], erlang:timestamp()) ->
index b0b7da4..3f2c9a4 100644 (file)
@@ -11,7 +11,7 @@
     ]).
 
 -export(
-    [ of_pid/1
+    [ of_pid/2
     , get_best_known_origin/1
     , print/1
     ]).
 %% Public API
 %% ============================================================================
 
--spec of_pid(pid()) ->
+-spec of_pid(pid(), beam_stats_delta:t()) ->
       none        % when process is dead
     | {some, t()} % when process is alive
     .
-of_pid(Pid) ->
+of_pid(Pid, DeltasServer) ->
     try
         Dict = pid_info_exn(Pid, dictionary),
         Ancestry =
@@ -65,6 +65,7 @@ of_pid(Pid) ->
             , total_heap_size   = pid_info_exn(Pid, total_heap_size)
             , stack_size        = pid_info_exn(Pid, stack_size)
             , message_queue_len = pid_info_exn(Pid, message_queue_len)
+            , reductions        = pid_info_reductions(Pid, DeltasServer)
             },
         {some, T}
     catch throw:{process_dead, _} ->
@@ -87,6 +88,7 @@ print(
     , total_heap_size   = TotalHeapSize
     , stack_size        = StackSize
     , message_queue_len = MsgQueueLen
+    , reductions        = Reductions
     }=T
 ) ->
     BestKnownOrigin = get_best_known_origin(T),
@@ -103,6 +105,7 @@ print(
         "TotalHeapSize     : ~p~n"
         "StackSize         : ~p~n"
         "MsgQueueLen       : ~p~n"
+        "Reductions        : ~p~n"
         "~n",
         [ Pid
         , BestKnownOrigin
@@ -115,6 +118,7 @@ print(
         , TotalHeapSize
         , StackSize
         , MsgQueueLen
+        , Reductions
         ]
     ).
 
@@ -122,6 +126,16 @@ print(
 %% Private helpers
 %% ============================================================================
 
+-spec pid_info_reductions(pid(), beam_stats_delta:t()) ->
+    non_neg_integer().
+pid_info_reductions(Pid, DeltasServer) ->
+    case beam_stats_delta:of_process_info_reductions(DeltasServer, Pid)
+    of  {some, Reductions} ->
+            Reductions
+    ;   none ->
+            throw({process_dead, Pid})
+    end.
+
 pid_info_exn(Pid, Key) ->
     {some, Value} = pid_info_opt(Pid, Key),
     Value.
index 1b3fcb2..d369314 100644 (file)
@@ -8,8 +8,8 @@
     ]).
 
 -export(
-    [ collect/0
-    , collect_and_print/0
+    [ collect/1
+    , collect_and_print/1
     , print/1
     ]).
 
 -type t() ::
     ?T{}.
 
--spec collect() ->
+-spec collect(beam_stats_delta:t()) ->
     t().
-collect() ->
+collect(DeltasServer) ->
     Pids = beam_stats_source:erlang_processes(),
-    PsOpts = [beam_stats_process:of_pid(P) || P <- Pids],
+    PsOpts = [beam_stats_process:of_pid(P, DeltasServer) || P <- Pids],
     Ps = [P || {some, P} <- PsOpts],
     ?T
     { individual_stats
@@ -45,8 +45,10 @@ collect() ->
         = length([P || P <- Ps, P#beam_stats_process.status =:= waiting])
     }.
 
-collect_and_print() ->
-    print(collect()).
+-spec collect_and_print(beam_stats_delta:t()) ->
+    ok.
+collect_and_print(DeltasServer) ->
+    print(collect(DeltasServer)).
 
 -spec print(t()) ->
     ok.
index a5b3779..ac21f7a 100644 (file)
@@ -118,18 +118,21 @@ t_full_cycle(_Cfg) ->
         , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.named--reg_name_foo:25|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.named--reg_name_foo:10|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.named--reg_name_foo:0|g">>
+        , <<"beam_stats_v0.node_foo_host_bar.process_reductions.named--reg_name_foo:0|g">>
 
         % Process 2
         , <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--bar_mod-bar_fun-1--NONE--NONE:25|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:35|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:40|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--bar_mod-bar_fun-1--NONE--NONE:5|g">>
+        , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--bar_mod-bar_fun-1--NONE--NONE:0|g">>
 
         % Process 3 and 4, aggregated by origin
         , <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:30|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:45|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:55|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:1|g">>
+        , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:0|g">>
         ],
     MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]),
     RemoveExpectedFromReceived =
@@ -141,7 +144,9 @@ t_full_cycle(_Cfg) ->
             true = lists:member(Expected, Received),
             Received -- [Expected]
         end,
-    [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected),
+    MsgsRemaining = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected),
+    ct:log("MsgsRemaining: ~p", [MsgsRemaining]),
+    [] = MsgsRemaining,
     meck:unload(beam_stats_source).
 
 meck_expect_beam_stats() ->
@@ -305,6 +310,7 @@ meck_expect_beam_stats(Overrides) ->
                 ;   total_heap_size   -> {K, 25}
                 ;   stack_size        -> {K, 10}
                 ;   message_queue_len -> {K, 0}
+                ;   reductions        -> {K, 1}
                 end
         ;   (P, K) when P == Pid2 ->
                 case K
@@ -316,6 +322,7 @@ meck_expect_beam_stats(Overrides) ->
                 ;   total_heap_size   -> {K, 35}
                 ;   stack_size        -> {K, 40}
                 ;   message_queue_len -> {K, 5}
+                ;   reductions        -> {K, 2}
                 end
         ;   (P, K) when P == Pid3 ->
                 Dict =
@@ -331,6 +338,7 @@ meck_expect_beam_stats(Overrides) ->
                 ;   total_heap_size   -> {K, 35}
                 ;   stack_size        -> {K, 40}
                 ;   message_queue_len -> {K, 1}
+                ;   reductions        -> {K, 3}
                 end
         ;   (P, K) when P == Pid4 ->
                 Dict =
@@ -346,6 +354,7 @@ meck_expect_beam_stats(Overrides) ->
                 ;   total_heap_size   -> {K, 10}
                 ;   stack_size        -> {K, 15}
                 ;   message_queue_len -> {K, 0}
+                ;   reductions        -> {K, 4}
                 end
         end
     ),
This page took 0.028739 seconds and 4 git commands to generate.