Merge pull request #13 from ibnfirnas/aggregate-anonymous-process-data 0.11.0
authorSiraaj Khandkar <siraaj@khandkar.net>
Thu, 24 Sep 2015 05:41:17 +0000 (01:41 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Thu, 24 Sep 2015 05:41:17 +0000 (01:41 -0400)
Aggregate anonymous process data by process origin

src/beam_stats.app.src
src/beam_stats_msg_graphite.erl
test/beam_stats_consumer_statsd_SUITE.erl

index e9dcfff..56f2133 100644 (file)
@@ -1,7 +1,7 @@
 {application, beam_stats,
  [
   {description, "Periodic VM stats production and consumption."},
-  {vsn, "0.10.0"},
+  {vsn, "0.11.0"},
   {registered, []},
   {applications,
     [ kernel
index de18c0d..22990a4 100644 (file)
@@ -166,6 +166,7 @@ of_processes(
     OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
     PerProcessMsgsNested = lists:map(OfProcess, Processes),
     PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
+    PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
     Ts = Timestamp,
     N  = NodeID,
     [ cons([N, <<"processes_count_all">>               ], CountAll              , Ts)
@@ -176,14 +177,14 @@ of_processes(
     , cons([N, <<"processes_count_running">>           ], CountRunning          , Ts)
     , cons([N, <<"processes_count_suspended">>         ], CountSuspended        , Ts)
     , cons([N, <<"processes_count_waiting">>           ], CountWaiting          , Ts)
-    | PerProcessMsgsFlattened
+    | PerProcessMsgsAggregates
     ].
 
 -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
     [t()].
 of_process(
     #beam_stats_process
-    { pid               = Pid
+    { pid               = _
     , memory            = Memory
     , total_heap_size   = TotalHeapSize
     , stack_size        = StackSize
@@ -194,20 +195,30 @@ of_process(
 ) ->
     Origin = beam_stats_process:get_best_known_origin(Process),
     OriginBin = proc_origin_to_bin(Origin),
-    PidBin = pid_to_bin(Pid),
-    OriginAndPid = [OriginBin, PidBin],
     Ts = Timestamp,
     N  = NodeID,
-    [ cons([N, <<"process_memory">>            | OriginAndPid], Memory        , Ts)
-    , cons([N, <<"process_total_heap_size">>   | OriginAndPid], TotalHeapSize , Ts)
-    , cons([N, <<"process_stack_size">>        | OriginAndPid], StackSize     , Ts)
-    , cons([N, <<"process_message_queue_len">> | OriginAndPid], MsgQueueLen   , Ts)
+    [ cons([N, <<"process_memory">>            , OriginBin], Memory        , Ts)
+    , cons([N, <<"process_total_heap_size">>   , OriginBin], TotalHeapSize , Ts)
+    , cons([N, <<"process_stack_size">>        , OriginBin], StackSize     , Ts)
+    , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen   , Ts)
     ].
 
+-spec aggregate_by_path([t()], erlang:timestamp()) ->
+    [t()].
+aggregate_by_path(Msgs, Timestamp) ->
+    Aggregate =
+        fun (?T{path=K, value=V}, ValsByPath) ->
+            dict:update_counter(K, V, ValsByPath)
+        end,
+    ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
+    ValsByPathList = dict:to_list(ValsByPathDict),
+    [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
+
 -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
     binary().
 proc_origin_to_bin({registered_name, Name}) ->
-    atom_to_binary(Name, utf8);
+    NameBin = atom_to_binary(Name, utf8),
+    <<"named--", NameBin/binary>>;
 proc_origin_to_bin({ancestry, Ancestry}) ->
     #beam_stats_process_ancestry
     { raw_initial_call  = InitCallRaw
@@ -220,7 +231,8 @@ proc_origin_to_bin({ancestry, Ancestry}) ->
     AncestorsBinOpt   = hope_option:map(AncestorsOpt     , fun ancestors_to_bin/1),
     AncestorsBin      = hope_option:get(AncestorsBinOpt  , Blank),
     InitCallRawBin    = mfa_to_bin(InitCallRaw),
-    << InitCallRawBin/binary
+    << "spawned-via--"
+     , InitCallRawBin/binary
      , "--"
      , InitCallOTPBin/binary
      , "--"
index 33d78c9..43c1a1f 100644 (file)
@@ -107,32 +107,32 @@ t_full_cycle(_Cfg) ->
         , <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:64|g">>
 
         % Processes totals
-        , <<"beam_stats.node_foo_host_bar.processes_count_all:3|g">>
+        , <<"beam_stats.node_foo_host_bar.processes_count_all:4|g">>
         , <<"beam_stats.node_foo_host_bar.processes_count_exiting:0|g">>
         , <<"beam_stats.node_foo_host_bar.processes_count_garbage_collecting:0|g">>
         , <<"beam_stats.node_foo_host_bar.processes_count_registered:1|g">>
         , <<"beam_stats.node_foo_host_bar.processes_count_runnable:0|g">>
         , <<"beam_stats.node_foo_host_bar.processes_count_running:3|g">>
         , <<"beam_stats.node_foo_host_bar.processes_count_suspended:0|g">>
-        , <<"beam_stats.node_foo_host_bar.processes_count_waiting:0|g">>
+        , <<"beam_stats.node_foo_host_bar.processes_count_waiting:1|g">>
 
         % Process 1
-        , <<"beam_stats.node_foo_host_bar.process_memory.reg_name_foo.0_1_0:15|g">>
-        , <<"beam_stats.node_foo_host_bar.process_total_heap_size.reg_name_foo.0_1_0:25|g">>
-        , <<"beam_stats.node_foo_host_bar.process_stack_size.reg_name_foo.0_1_0:10|g">>
-        , <<"beam_stats.node_foo_host_bar.process_message_queue_len.reg_name_foo.0_1_0:0|g">>
+        , <<"beam_stats.node_foo_host_bar.process_memory.named--reg_name_foo:15|g">>
+        , <<"beam_stats.node_foo_host_bar.process_total_heap_size.named--reg_name_foo:25|g">>
+        , <<"beam_stats.node_foo_host_bar.process_stack_size.named--reg_name_foo:10|g">>
+        , <<"beam_stats.node_foo_host_bar.process_message_queue_len.named--reg_name_foo:0|g">>
 
         % Process 2
-        , <<"beam_stats.node_foo_host_bar.process_memory.bar_mod-bar_fun-1--NONE--NONE.0_2_0:25|g">>
-        , <<"beam_stats.node_foo_host_bar.process_total_heap_size.bar_mod-bar_fun-1--NONE--NONE.0_2_0:35|g">>
-        , <<"beam_stats.node_foo_host_bar.process_stack_size.bar_mod-bar_fun-1--NONE--NONE.0_2_0:40|g">>
-        , <<"beam_stats.node_foo_host_bar.process_message_queue_len.bar_mod-bar_fun-1--NONE--NONE.0_2_0:5|g">>
+        , <<"beam_stats.node_foo_host_bar.process_memory.spawned-via--bar_mod-bar_fun-1--NONE--NONE:25|g">>
+        , <<"beam_stats.node_foo_host_bar.process_total_heap_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:35|g">>
+        , <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:40|g">>
+        , <<"beam_stats.node_foo_host_bar.process_message_queue_len.spawned-via--bar_mod-bar_fun-1--NONE--NONE:5|g">>
 
-        % Process 3
-        , <<"beam_stats.node_foo_host_bar.process_memory.baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0.0_3_0:25|g">>
-        , <<"beam_stats.node_foo_host_bar.process_total_heap_size.baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0.0_3_0:35|g">>
-        , <<"beam_stats.node_foo_host_bar.process_stack_size.baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0.0_3_0:40|g">>
-        , <<"beam_stats.node_foo_host_bar.process_message_queue_len.baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0.0_3_0:1|g">>
+        % Process 3 and 4, aggregated by origin
+        , <<"beam_stats.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:30|g">>
+        , <<"beam_stats.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:45|g">>
+        , <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:55|g">>
+        , <<"beam_stats.node_foo_host_bar.process_message_queue_len.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:1|g">>
         ],
     MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]),
     RemoveExpectedFromReceived =
@@ -158,6 +158,7 @@ meck_expect_beam_stats(Overrides) ->
     Pid1 = list_to_pid("<0.1.0>"),
     Pid2 = list_to_pid("<0.2.0>"),
     Pid3 = list_to_pid("<0.3.0>"),
+    Pid4 = list_to_pid("<0.4.0>"),
     Process1 =
         #beam_stats_process
         { pid               = Pid1
@@ -206,21 +207,38 @@ meck_expect_beam_stats(Overrides) ->
         , stack_size        = 40
         , message_queue_len = 1
         },
+    Process4 =
+        #beam_stats_process
+        { pid               = Pid4
+        , registered_name   = none
+        , ancestry =
+              #beam_stats_process_ancestry
+              { raw_initial_call  = {baz_mod, baz_fun, 3}
+              , otp_initial_call  = {some, {baz_otp_mod, baz_otp_fun, 2}}
+              , otp_ancestors     = {some, [Pid0, Pid1]}
+              }
+        , status            = waiting
+        , memory            = 5
+        , total_heap_size   = 10
+        , stack_size        = 15
+        , message_queue_len = 0
+        },
     Processes =
         #beam_stats_processes
         { individual_stats =
             [ Process1
             , Process2
             , Process3
+            , Process4
             ]
-        , count_all                = 3
+        , count_all                = 4
         , count_exiting            = 0
         , count_garbage_collecting = 0
         , count_registered         = 1
         , count_runnable           = 0
         , count_running            = 3
         , count_suspended          = 0
-        , count_waiting            = 0
+        , count_waiting            = 1
         },
     ETSTableStatsFoo =
         #beam_stats_ets_table
@@ -263,7 +281,7 @@ meck_expect_beam_stats(Overrides) ->
         end
     ),
     meck:expect(beam_stats_source, erlang_processes,
-        fun () -> [Pid1, Pid2, Pid3] end),
+        fun () -> [Pid1, Pid2, Pid3, Pid4] end),
     meck:expect(beam_stats_source, os_timestamp,
         fun () -> {1, 2, 3} end),
     meck:expect(beam_stats_source, erlang_process_info,
@@ -304,6 +322,21 @@ meck_expect_beam_stats(Overrides) ->
                 ;   stack_size        -> {K, 40}
                 ;   message_queue_len -> {K, 1}
                 end
+        ;   (P, K) when P == Pid4 ->
+                Dict =
+                    [ {'$initial_call', {baz_otp_mod, baz_otp_fun, 2}}
+                    , {'$ancestors'   , [Pid0, Pid1]}
+                    ],
+                case K
+                of  dictionary        -> {K, Dict}
+                ;   initial_call      -> {K, {baz_mod, baz_fun, 3}}
+                ;   registered_name   -> []
+                ;   status            -> {K, waiting}
+                ;   memory            -> {K, 5}
+                ;   total_heap_size   -> {K, 10}
+                ;   stack_size        -> {K, 15}
+                ;   message_queue_len -> {K, 0}
+                end
         end
     ),
     #beam_stats
This page took 0.043529 seconds and 4 git commands to generate.