, total_heap_size :: non_neg_integer()
, stack_size :: non_neg_integer()
, message_queue_len :: non_neg_integer()
+ , reductions :: non_neg_integer()
}).
, reductions = beam_stats_delta:of_reductions(DeltasServer)
, run_queue = beam_stats_source:erlang_statistics(run_queue)
, ets = beam_stats_ets:collect()
- , processes = beam_stats_processes:collect()
+ , processes = beam_stats_processes:collect(DeltasServer)
}.
, of_context_switches/1
, of_io/1
, of_reductions/1
+ , of_process_info_reductions/2
]).
-record(?MODULE,
{ erlang_statistics :: ets:tid()
+ , erlang_process_info_reductions :: ets:tid()
}).
-define(T, #?MODULE).
],
?T
{ erlang_statistics = ets:new(beam_stats_delta_erlang_statistics, Options)
+ , erlang_process_info_reductions =
+ ets:new(beam_stats_delta_erlang_process_info_reductions, Options)
}.
-spec stop(t()) ->
{}.
stop(?T
{ erlang_statistics = TidErlangStatistics
+ , erlang_process_info_reductions = TidErlangProcessInfoReductions
}
) ->
true = ets:delete(TidErlangStatistics),
+ true = ets:delete(TidErlangProcessInfoReductions),
{}.
-spec of_context_switches(t()) ->
{Current, _} = beam_stats_source:erlang_statistics(Key),
delta(Table, Key, Current).
--spec delta(ets:tid(), atom(), non_neg_integer()) ->
- non_neg_integer().
+-spec of_process_info_reductions(t(), pid()) ->
+ hope_option:t(non_neg_integer()).
+of_process_info_reductions(?T{erlang_process_info_reductions=Table}, Pid) ->
+ case beam_stats_source:erlang_process_info(Pid, reductions)
+ of undefined ->
+ none
+ ; {reductions, Current} ->
+ Delta = delta(Table, Pid, Current),
+ {some, Delta}
+ end.
+
+-spec delta(ets:tid(), Key, non_neg_integer()) ->
+ non_neg_integer()
+ when Key :: atom() | pid().
delta(Table, Key, CurrentTotal) ->
PreviousTotalOpt = find(Table, Key),
PreviousTotal = hope_option:get(PreviousTotalOpt, 0),
, total_heap_size = TotalHeapSize
, stack_size = StackSize
, message_queue_len = MsgQueueLen
+ , reductions = Reductions
}=Process,
<<NodeID/binary>>,
Timestamp
, cons([N, <<"process_total_heap_size">> , OriginBin], TotalHeapSize , Ts)
, cons([N, <<"process_stack_size">> , OriginBin], StackSize , Ts)
, cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts)
+ , cons([N, <<"process_reductions">> , OriginBin], Reductions , Ts)
].
-spec aggregate_by_path([t()], erlang:timestamp()) ->
]).
-export(
- [ of_pid/1
+ [ of_pid/2
, get_best_known_origin/1
, print/1
]).
%% Public API
%% ============================================================================
--spec of_pid(pid()) ->
+-spec of_pid(pid(), beam_stats_delta:t()) ->
none % when process is dead
| {some, t()} % when process is alive
.
-of_pid(Pid) ->
+of_pid(Pid, DeltasServer) ->
try
Dict = pid_info_exn(Pid, dictionary),
Ancestry =
, total_heap_size = pid_info_exn(Pid, total_heap_size)
, stack_size = pid_info_exn(Pid, stack_size)
, message_queue_len = pid_info_exn(Pid, message_queue_len)
+ , reductions = pid_info_reductions(Pid, DeltasServer)
},
{some, T}
catch throw:{process_dead, _} ->
, total_heap_size = TotalHeapSize
, stack_size = StackSize
, message_queue_len = MsgQueueLen
+ , reductions = Reductions
}=T
) ->
BestKnownOrigin = get_best_known_origin(T),
"TotalHeapSize : ~p~n"
"StackSize : ~p~n"
"MsgQueueLen : ~p~n"
+ "Reductions : ~p~n"
"~n",
[ Pid
, BestKnownOrigin
, TotalHeapSize
, StackSize
, MsgQueueLen
+ , Reductions
]
).
%% Private helpers
%% ============================================================================
+-spec pid_info_reductions(pid(), beam_stats_delta:t()) ->
+ non_neg_integer().
+pid_info_reductions(Pid, DeltasServer) ->
+ case beam_stats_delta:of_process_info_reductions(DeltasServer, Pid)
+ of {some, Reductions} ->
+ Reductions
+ ; none ->
+ throw({process_dead, Pid})
+ end.
+
pid_info_exn(Pid, Key) ->
{some, Value} = pid_info_opt(Pid, Key),
Value.
]).
-export(
- [ collect/0
- , collect_and_print/0
+ [ collect/1
+ , collect_and_print/1
, print/1
]).
-type t() ::
?T{}.
--spec collect() ->
+-spec collect(beam_stats_delta:t()) ->
t().
-collect() ->
+collect(DeltasServer) ->
Pids = beam_stats_source:erlang_processes(),
- PsOpts = [beam_stats_process:of_pid(P) || P <- Pids],
+ PsOpts = [beam_stats_process:of_pid(P, DeltasServer) || P <- Pids],
Ps = [P || {some, P} <- PsOpts],
?T
{ individual_stats
= length([P || P <- Ps, P#beam_stats_process.status =:= waiting])
}.
-collect_and_print() ->
- print(collect()).
+-spec collect_and_print(beam_stats_delta:t()) ->
+ ok.
+collect_and_print(DeltasServer) ->
+ print(collect(DeltasServer)).
-spec print(t()) ->
ok.
, <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.named--reg_name_foo:25|g">>
, <<"beam_stats_v0.node_foo_host_bar.process_stack_size.named--reg_name_foo:10|g">>
, <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.named--reg_name_foo:0|g">>
+ , <<"beam_stats_v0.node_foo_host_bar.process_reductions.named--reg_name_foo:0|g">>
% Process 2
, <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--bar_mod-bar_fun-1--NONE--NONE:25|g">>
, <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:35|g">>
, <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:40|g">>
, <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--bar_mod-bar_fun-1--NONE--NONE:5|g">>
+ , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--bar_mod-bar_fun-1--NONE--NONE:0|g">>
% Process 3 and 4, aggregated by origin
, <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:30|g">>
, <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:45|g">>
, <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:55|g">>
, <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:1|g">>
+ , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:0|g">>
],
MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]),
RemoveExpectedFromReceived =
true = lists:member(Expected, Received),
Received -- [Expected]
end,
- [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected),
+ MsgsRemaining = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected),
+ ct:log("MsgsRemaining: ~p", [MsgsRemaining]),
+ [] = MsgsRemaining,
meck:unload(beam_stats_source).
meck_expect_beam_stats() ->
; total_heap_size -> {K, 25}
; stack_size -> {K, 10}
; message_queue_len -> {K, 0}
+ ; reductions -> {K, 1}
end
; (P, K) when P == Pid2 ->
case K
; total_heap_size -> {K, 35}
; stack_size -> {K, 40}
; message_queue_len -> {K, 5}
+ ; reductions -> {K, 2}
end
; (P, K) when P == Pid3 ->
Dict =
; total_heap_size -> {K, 35}
; stack_size -> {K, 40}
; message_queue_len -> {K, 1}
+ ; reductions -> {K, 3}
end
; (P, K) when P == Pid4 ->
Dict =
; total_heap_size -> {K, 10}
; stack_size -> {K, 15}
; message_queue_len -> {K, 0}
+ ; reductions -> {K, 4}
end
end
),