Merge pull request #20 from ibnfirnas/per-process-reductions 0.15.0
authorSiraaj Khandkar <siraaj@khandkar.net>
Sun, 27 Sep 2015 23:32:13 +0000 (19:32 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Sun, 27 Sep 2015 23:32:13 +0000 (19:32 -0400)
Collect per process reductions deltas

include/beam_stats_process.hrl
src/beam_stats.app.src
src/beam_stats.erl
src/beam_stats_delta.erl
src/beam_stats_msg_graphite.erl
src/beam_stats_process.erl
src/beam_stats_processes.erl
src/beam_stats_producer.erl
src/beam_stats_source.erl
test/beam_stats_consumer_statsd_SUITE.erl

index 7d2d951..ec0e49c 100644 (file)
@@ -7,4 +7,5 @@
     , total_heap_size   ::               non_neg_integer()
     , stack_size        ::               non_neg_integer()
     , message_queue_len ::               non_neg_integer()
+    , reductions        ::               non_neg_integer()
     }).
index 4e49f59..6dfb736 100644 (file)
@@ -1,7 +1,7 @@
 {application, beam_stats,
  [
   {description, "Periodic VM stats production and consumption."},
-  {vsn, "0.14.3"},
+  {vsn, "0.15.0"},
   {registered, []},
   {applications,
     [ kernel
index b3e445e..61d8f3b 100644 (file)
@@ -31,5 +31,5 @@ collect(DeltasServer) ->
     , reductions       = beam_stats_delta:of_reductions(DeltasServer)
     , run_queue        = beam_stats_source:erlang_statistics(run_queue)
     , ets              = beam_stats_ets:collect()
-    , processes        = beam_stats_processes:collect()
+    , processes        = beam_stats_processes:collect(DeltasServer)
     }.
index 42f2399..1e3bdfb 100644 (file)
@@ -7,13 +7,16 @@
 -export(
     [ start/0
     , stop/1
+    , gc/1
     , of_context_switches/1
     , of_io/1
     , of_reductions/1
+    , of_process_info_reductions/2
     ]).
 
 -record(?MODULE,
     { erlang_statistics :: ets:tid()
+    , erlang_process_info_reductions :: ets:tid()
     }).
 
 -define(T, #?MODULE).
@@ -30,17 +33,46 @@ start() ->
         ],
     ?T
     { erlang_statistics = ets:new(beam_stats_delta_erlang_statistics, Options)
+    , erlang_process_info_reductions =
+        ets:new(beam_stats_delta_erlang_process_info_reductions, Options)
     }.
 
 -spec stop(t()) ->
     {}.
 stop(?T
     { erlang_statistics = TidErlangStatistics
+    , erlang_process_info_reductions = TidErlangProcessInfoReductions
     }
 ) ->
     true = ets:delete(TidErlangStatistics),
+    true = ets:delete(TidErlangProcessInfoReductions),
     {}.
 
+-spec gc(t()) ->
+    {}.
+gc(?T{erlang_process_info_reductions=Table}=T) ->
+    case ets:first(Table)
+    of  '$end_of_table' ->
+            {}
+    ;   FirstPid when is_pid(FirstPid) ->
+            gc(T, FirstPid)
+    end.
+
+-spec gc(t(), pid()) ->
+    {}.
+gc(?T{erlang_process_info_reductions=Table}=T, Pid) ->
+    Next = ets:next(Table, Pid),
+    case beam_stats_source:erlang_is_process_alive(Pid)
+    of  true  -> true
+    ;   false -> ets:delete(Table, Pid)
+    end,
+    case Next
+    of  '$end_of_table' ->
+            {}
+    ;   NextPid when is_pid(NextPid) ->
+            gc(T, NextPid)
+    end.
+
 -spec of_context_switches(t()) ->
     non_neg_integer().
 of_context_switches(?T{erlang_statistics=Table}) ->
@@ -75,8 +107,20 @@ of_reductions(?T{erlang_statistics=Table}) ->
     {Current, _} = beam_stats_source:erlang_statistics(Key),
     delta(Table, Key, Current).
 
--spec delta(ets:tid(), atom(), non_neg_integer()) ->
-    non_neg_integer().
+-spec of_process_info_reductions(t(), pid()) ->
+    hope_option:t(non_neg_integer()).
+of_process_info_reductions(?T{erlang_process_info_reductions=Table}, Pid) ->
+    case beam_stats_source:erlang_process_info(Pid, reductions)
+    of  undefined ->
+            none
+    ;   {reductions, Current} ->
+            Delta = delta(Table, Pid, Current),
+            {some, Delta}
+    end.
+
+-spec delta(ets:tid(), Key, non_neg_integer()) ->
+    non_neg_integer()
+    when Key :: atom() | pid().
 delta(Table, Key, CurrentTotal) ->
     PreviousTotalOpt = find(Table, Key),
     PreviousTotal = hope_option:get(PreviousTotalOpt, 0),
index 26751a5..6939653 100644 (file)
@@ -205,6 +205,7 @@ of_process(
     , total_heap_size   = TotalHeapSize
     , stack_size        = StackSize
     , message_queue_len = MsgQueueLen
+    , reductions        = Reductions
     }=Process,
     <<NodeID/binary>>,
     Timestamp
@@ -217,6 +218,7 @@ of_process(
     , cons([N, <<"process_total_heap_size">>   , OriginBin], TotalHeapSize , Ts)
     , cons([N, <<"process_stack_size">>        , OriginBin], StackSize     , Ts)
     , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen   , Ts)
+    , cons([N, <<"process_reductions">>        , OriginBin], Reductions    , Ts)
     ].
 
 -spec aggregate_by_path([t()], erlang:timestamp()) ->
index b0b7da4..3f2c9a4 100644 (file)
@@ -11,7 +11,7 @@
     ]).
 
 -export(
-    [ of_pid/1
+    [ of_pid/2
     , get_best_known_origin/1
     , print/1
     ]).
 %% Public API
 %% ============================================================================
 
--spec of_pid(pid()) ->
+-spec of_pid(pid(), beam_stats_delta:t()) ->
       none        % when process is dead
     | {some, t()} % when process is alive
     .
-of_pid(Pid) ->
+of_pid(Pid, DeltasServer) ->
     try
         Dict = pid_info_exn(Pid, dictionary),
         Ancestry =
@@ -65,6 +65,7 @@ of_pid(Pid) ->
             , total_heap_size   = pid_info_exn(Pid, total_heap_size)
             , stack_size        = pid_info_exn(Pid, stack_size)
             , message_queue_len = pid_info_exn(Pid, message_queue_len)
+            , reductions        = pid_info_reductions(Pid, DeltasServer)
             },
         {some, T}
     catch throw:{process_dead, _} ->
@@ -87,6 +88,7 @@ print(
     , total_heap_size   = TotalHeapSize
     , stack_size        = StackSize
     , message_queue_len = MsgQueueLen
+    , reductions        = Reductions
     }=T
 ) ->
     BestKnownOrigin = get_best_known_origin(T),
@@ -103,6 +105,7 @@ print(
         "TotalHeapSize     : ~p~n"
         "StackSize         : ~p~n"
         "MsgQueueLen       : ~p~n"
+        "Reductions        : ~p~n"
         "~n",
         [ Pid
         , BestKnownOrigin
@@ -115,6 +118,7 @@ print(
         , TotalHeapSize
         , StackSize
         , MsgQueueLen
+        , Reductions
         ]
     ).
 
@@ -122,6 +126,16 @@ print(
 %% Private helpers
 %% ============================================================================
 
+-spec pid_info_reductions(pid(), beam_stats_delta:t()) ->
+    non_neg_integer().
+pid_info_reductions(Pid, DeltasServer) ->
+    case beam_stats_delta:of_process_info_reductions(DeltasServer, Pid)
+    of  {some, Reductions} ->
+            Reductions
+    ;   none ->
+            throw({process_dead, Pid})
+    end.
+
 pid_info_exn(Pid, Key) ->
     {some, Value} = pid_info_opt(Pid, Key),
     Value.
index 1b3fcb2..d369314 100644 (file)
@@ -8,8 +8,8 @@
     ]).
 
 -export(
-    [ collect/0
-    , collect_and_print/0
+    [ collect/1
+    , collect_and_print/1
     , print/1
     ]).
 
 -type t() ::
     ?T{}.
 
--spec collect() ->
+-spec collect(beam_stats_delta:t()) ->
     t().
-collect() ->
+collect(DeltasServer) ->
     Pids = beam_stats_source:erlang_processes(),
-    PsOpts = [beam_stats_process:of_pid(P) || P <- Pids],
+    PsOpts = [beam_stats_process:of_pid(P, DeltasServer) || P <- Pids],
     Ps = [P || {some, P} <- PsOpts],
     ?T
     { individual_stats
@@ -45,8 +45,10 @@ collect() ->
         = length([P || P <- Ps, P#beam_stats_process.status =:= waiting])
     }.
 
-collect_and_print() ->
-    print(collect()).
+-spec collect_and_print(beam_stats_delta:t()) ->
+    ok.
+collect_and_print(DeltasServer) ->
+    print(collect(DeltasServer)).
 
 -spec print(t()) ->
     ok.
index dd52692..e27f9a3 100644 (file)
@@ -122,6 +122,7 @@ produce(
     ConsumersList = ordsets:to_list(ConsumersSet),
     Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
     ok = lists:foreach(Send, ConsumersList),
+    beam_stats_delta:gc(DeltasServer),
     State.
 
 -spec schedule_first_production() ->
index eb0d7d1..1cba827 100644 (file)
@@ -1,7 +1,8 @@
 -module(beam_stats_source).
 
 -export(
-    [ erlang_memory/0
+    [ erlang_is_process_alive/1
+    , erlang_memory/0
     , erlang_node/0
     , erlang_process_info/2
     , erlang_processes/0
@@ -13,6 +14,9 @@
     , os_timestamp/0
     ]).
 
+erlang_is_process_alive(Pid) ->
+    erlang:is_process_alive(Pid).
+
 erlang_memory() ->
     erlang:memory().
 
index a5b3779..553bc72 100644 (file)
@@ -14,6 +14,7 @@
 %% Test cases
 -export(
     [ t_full_cycle/1
+    , t_deltas_gc/1
     ]).
 
 -define(GROUP, beam_stats_consumer_statsd).
@@ -28,6 +29,7 @@ all() ->
 groups() ->
     Tests =
         [ t_full_cycle
+        , t_deltas_gc
         ],
     Properties = [],
     [{?GROUP, Properties, Tests}].
@@ -36,6 +38,32 @@ groups() ->
 %%  Test cases
 %% ============================================================================
 
+t_deltas_gc(_Cfg) ->
+    Pid1 = list_to_pid("<0.101.0>"),
+    Pid2 = list_to_pid("<0.102.0>"),
+    Pid3 = list_to_pid("<0.103.0>"),
+    meck:new(beam_stats_source),
+    meck:expect(beam_stats_source, erlang_process_info,
+        fun (P, reductions) when P == Pid1 -> {reductions, 1}
+        ;   (P, reductions) when P == Pid2 -> {reductions, 2}
+        ;   (P, reductions) when P == Pid3 -> {reductions, 3}
+        end
+    ),
+    DeltasServer = beam_stats_delta:start(),
+    {some, 1} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid1),
+    {some, 2} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid2),
+    {some, 3} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid3),
+    {some, 0} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid1),
+    {some, 0} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid2),
+    {some, 0} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid3),
+    meck:expect(beam_stats_source, erlang_is_process_alive, fun (_) -> false end),
+    {} = beam_stats_delta:gc(DeltasServer),
+    {some, 1} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid1),
+    {some, 2} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid2),
+    {some, 3} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid3),
+    {} = beam_stats_delta:stop(DeltasServer),
+    meck:unload(beam_stats_source).
+
 t_full_cycle(_Cfg) ->
     meck:new(beam_stats_source),
     _BEAMStatsExpected = meck_expect_beam_stats(),
@@ -118,18 +146,21 @@ t_full_cycle(_Cfg) ->
         , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.named--reg_name_foo:25|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.named--reg_name_foo:10|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.named--reg_name_foo:0|g">>
+        , <<"beam_stats_v0.node_foo_host_bar.process_reductions.named--reg_name_foo:0|g">>
 
         % Process 2
         , <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--bar_mod-bar_fun-1--NONE--NONE:25|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:35|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:40|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--bar_mod-bar_fun-1--NONE--NONE:5|g">>
+        , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--bar_mod-bar_fun-1--NONE--NONE:0|g">>
 
         % Process 3 and 4, aggregated by origin
         , <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:30|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:45|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:55|g">>
         , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:1|g">>
+        , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:0|g">>
         ],
     MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]),
     RemoveExpectedFromReceived =
@@ -141,7 +172,9 @@ t_full_cycle(_Cfg) ->
             true = lists:member(Expected, Received),
             Received -- [Expected]
         end,
-    [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected),
+    MsgsRemaining = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected),
+    ct:log("MsgsRemaining: ~p", [MsgsRemaining]),
+    [] = MsgsRemaining,
     meck:unload(beam_stats_source).
 
 meck_expect_beam_stats() ->
@@ -259,6 +292,8 @@ meck_expect_beam_stats(Overrides) ->
         , size   = 8
         , memory = 64
         },
+    meck:expect(beam_stats_source, erlang_is_process_alive,
+        fun (_) -> true end),
     meck:expect(beam_stats_source, erlang_memory,
         fun () -> [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}] end),
     meck:expect(beam_stats_source, erlang_node,
@@ -305,6 +340,7 @@ meck_expect_beam_stats(Overrides) ->
                 ;   total_heap_size   -> {K, 25}
                 ;   stack_size        -> {K, 10}
                 ;   message_queue_len -> {K, 0}
+                ;   reductions        -> {K, 1}
                 end
         ;   (P, K) when P == Pid2 ->
                 case K
@@ -316,6 +352,7 @@ meck_expect_beam_stats(Overrides) ->
                 ;   total_heap_size   -> {K, 35}
                 ;   stack_size        -> {K, 40}
                 ;   message_queue_len -> {K, 5}
+                ;   reductions        -> {K, 2}
                 end
         ;   (P, K) when P == Pid3 ->
                 Dict =
@@ -331,6 +368,7 @@ meck_expect_beam_stats(Overrides) ->
                 ;   total_heap_size   -> {K, 35}
                 ;   stack_size        -> {K, 40}
                 ;   message_queue_len -> {K, 1}
+                ;   reductions        -> {K, 3}
                 end
         ;   (P, K) when P == Pid4 ->
                 Dict =
@@ -346,6 +384,7 @@ meck_expect_beam_stats(Overrides) ->
                 ;   total_heap_size   -> {K, 10}
                 ;   stack_size        -> {K, 15}
                 ;   message_queue_len -> {K, 0}
+                ;   reductions        -> {K, 4}
                 end
         end
     ),
This page took 0.055821 seconds and 4 git commands to generate.