OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
PerProcessMsgsNested = lists:map(OfProcess, Processes),
PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
+ PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
Ts = Timestamp,
N = NodeID,
[ cons([N, <<"processes_count_all">> ], CountAll , Ts)
, cons([N, <<"processes_count_running">> ], CountRunning , Ts)
, cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts)
, cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts)
- | PerProcessMsgsFlattened
+ | PerProcessMsgsAggregates
].
-spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
, cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts)
].
+-spec aggregate_by_path([t()], erlang:timestamp()) ->
+ [t()].
+aggregate_by_path(Msgs, Timestamp) ->
+ Aggregate =
+ fun (?T{path=K, value=V}, ValsByPath) ->
+ dict:update_counter(K, V, ValsByPath)
+ end,
+ ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
+ ValsByPathList = dict:to_list(ValsByPathDict),
+ [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
+
-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
binary().
proc_origin_to_bin({registered_name, Name}) ->
, <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:64|g">>
% Processes totals
- , <<"beam_stats.node_foo_host_bar.processes_count_all:3|g">>
+ , <<"beam_stats.node_foo_host_bar.processes_count_all:4|g">>
, <<"beam_stats.node_foo_host_bar.processes_count_exiting:0|g">>
, <<"beam_stats.node_foo_host_bar.processes_count_garbage_collecting:0|g">>
, <<"beam_stats.node_foo_host_bar.processes_count_registered:1|g">>
, <<"beam_stats.node_foo_host_bar.processes_count_runnable:0|g">>
, <<"beam_stats.node_foo_host_bar.processes_count_running:3|g">>
, <<"beam_stats.node_foo_host_bar.processes_count_suspended:0|g">>
- , <<"beam_stats.node_foo_host_bar.processes_count_waiting:0|g">>
+ , <<"beam_stats.node_foo_host_bar.processes_count_waiting:1|g">>
% Process 1
, <<"beam_stats.node_foo_host_bar.process_memory.named--reg_name_foo:15|g">>
, <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:40|g">>
, <<"beam_stats.node_foo_host_bar.process_message_queue_len.spawned-via--bar_mod-bar_fun-1--NONE--NONE:5|g">>
- % Process 3
- , <<"beam_stats.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:25|g">>
- , <<"beam_stats.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:35|g">>
- , <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:40|g">>
+ % Process 3 and 4, aggregated by origin
+ , <<"beam_stats.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:30|g">>
+ , <<"beam_stats.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:45|g">>
+ , <<"beam_stats.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:55|g">>
, <<"beam_stats.node_foo_host_bar.process_message_queue_len.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--0_0_0-0_1_0:1|g">>
],
MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]),
Pid1 = list_to_pid("<0.1.0>"),
Pid2 = list_to_pid("<0.2.0>"),
Pid3 = list_to_pid("<0.3.0>"),
+ Pid4 = list_to_pid("<0.4.0>"),
Process1 =
#beam_stats_process
{ pid = Pid1
, stack_size = 40
, message_queue_len = 1
},
+ Process4 =
+ #beam_stats_process
+ { pid = Pid4
+ , registered_name = none
+ , ancestry =
+ #beam_stats_process_ancestry
+ { raw_initial_call = {baz_mod, baz_fun, 3}
+ , otp_initial_call = {some, {baz_otp_mod, baz_otp_fun, 2}}
+ , otp_ancestors = {some, [Pid0, Pid1]}
+ }
+ , status = waiting
+ , memory = 5
+ , total_heap_size = 10
+ , stack_size = 15
+ , message_queue_len = 0
+ },
Processes =
#beam_stats_processes
{ individual_stats =
[ Process1
, Process2
, Process3
+ , Process4
]
- , count_all = 3
+ , count_all = 4
, count_exiting = 0
, count_garbage_collecting = 0
, count_registered = 1
, count_runnable = 0
, count_running = 3
, count_suspended = 0
- , count_waiting = 0
+ , count_waiting = 1
},
ETSTableStatsFoo =
#beam_stats_ets_table
end
),
meck:expect(beam_stats_source, erlang_processes,
- fun () -> [Pid1, Pid2, Pid3] end),
+ fun () -> [Pid1, Pid2, Pid3, Pid4] end),
meck:expect(beam_stats_source, os_timestamp,
fun () -> {1, 2, 3} end),
meck:expect(beam_stats_source, erlang_process_info,
; stack_size -> {K, 40}
; message_queue_len -> {K, 1}
end
+ ; (P, K) when P == Pid4 ->
+ Dict =
+ [ {'$initial_call', {baz_otp_mod, baz_otp_fun, 2}}
+ , {'$ancestors' , [Pid0, Pid1]}
+ ],
+ case K
+ of dictionary -> {K, Dict}
+ ; initial_call -> {K, {baz_mod, baz_fun, 3}}
+ ; registered_name -> []
+ ; status -> {K, waiting}
+ ; memory -> {K, 5}
+ ; total_heap_size -> {K, 10}
+ ; stack_size -> {K, 15}
+ ; message_queue_len -> {K, 0}
+ end
end
),
#beam_stats