, of_beam_stats/2
, to_bin/1
, path_to_bin/1
- , add_path_prefix/2
, node_id_to_bin/1
]).
{ timestamp = Timestamp
, node_id = _
, memory = Memory
- % TODO: Handle the rest of data points
, io_bytes_in = IOBytesIn
, io_bytes_out = IOBytesOut
, context_switches = ContextSwitches
) ->
Ts = Timestamp,
N = NodeID,
- [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts)
- , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts)
- , cons([N, <<"context_switches">> ], ContextSwitches, Ts)
- , cons([N, <<"reductions">> ], Reductions , Ts)
- , cons([N, <<"run_queue">> ], RunQueue , Ts)
- | of_memory(Memory, NodeID, Ts)
- ]
- ++ of_ets(ETS, NodeID, Ts)
- ++ of_processes(Processes, NodeID, Ts).
+ Msgs =
+ [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts)
+ , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts)
+ , cons([N, <<"context_switches">> ], ContextSwitches, Ts)
+ , cons([N, <<"reductions">> ], Reductions , Ts)
+ , cons([N, <<"run_queue">> ], RunQueue , Ts)
+ | of_memory(Memory, NodeID, Ts)
+ ]
+ ++ of_ets(ETS, NodeID, Ts)
+ ++ of_processes(Processes, NodeID, Ts),
+ lists:map(fun path_prefix_schema_version/1, Msgs).
-spec to_bin(t()) ->
binary().
TimestampBin = integer_to_binary(TimestampInt),
<<PathBin/binary, " ", ValueBin/binary, " ", TimestampBin/binary>>.
--spec add_path_prefix(t(), binary()) ->
- t().
-add_path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
- T?T{path = [Prefix | Path]}.
-
-spec path_to_bin([binary()]) ->
binary().
path_to_bin(Path) ->
%% Helpers
%% ============================================================================
+-spec path_prefix_schema_version(t()) ->
+ t().
+path_prefix_schema_version(?T{}=T) ->
+ path_prefix(T, schema_version()).
+
+-spec path_prefix(t(), binary()) ->
+ t().
+path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
+ T?T{path = [Prefix | Path]}.
+
+-spec schema_version() ->
+ binary().
+schema_version() ->
+ <<"beam_stats_v0">>.
+
-spec bin_join([binary()], binary()) ->
binary().
bin_join([] , <<_/binary>> ) -> <<>>;
[t()].
of_ets(PerTableStats, <<NodeID/binary>>, Timestamp) ->
OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end,
- NestedMsgs = lists:map(OfEtsTable, PerTableStats),
- lists:append(NestedMsgs).
+ MsgsNested = lists:map(OfEtsTable, PerTableStats),
+ MsgsFlattened = lists:append(MsgsNested),
+ aggregate_by_path(MsgsFlattened, Timestamp).
-spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
[t()].
<<NodeID/binary>>,
Timestamp
) ->
- IDBin = beam_stats_ets_table:id_to_bin(ID),
+ IDType =
+ case ID =:= Name
+ of true -> <<"NAMED">>
+ ; false -> <<"TID">>
+ end,
NameBin = atom_to_binary(Name, latin1),
- NameAndID = [NameBin, IDBin],
+ NameAndID = [NameBin, IDType],
[ cons([NodeID, <<"ets_table">>, <<"size">> | NameAndID], Size , Timestamp)
, cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp)
].
OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
PerProcessMsgsNested = lists:map(OfProcess, Processes),
PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
+ PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
Ts = Timestamp,
N = NodeID,
[ cons([N, <<"processes_count_all">> ], CountAll , Ts)
, cons([N, <<"processes_count_running">> ], CountRunning , Ts)
, cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts)
, cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts)
- | PerProcessMsgsFlattened
+ | PerProcessMsgsAggregates
].
-spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
[t()].
of_process(
#beam_stats_process
- { pid = Pid
+ { pid = _
, memory = Memory
, total_heap_size = TotalHeapSize
, stack_size = StackSize
, message_queue_len = MsgQueueLen
+ , reductions = Reductions
}=Process,
<<NodeID/binary>>,
Timestamp
) ->
Origin = beam_stats_process:get_best_known_origin(Process),
OriginBin = proc_origin_to_bin(Origin),
- PidBin = pid_to_bin(Pid),
- OriginAndPid = [OriginBin, PidBin],
Ts = Timestamp,
N = NodeID,
- [ cons([N, <<"process_memory">> | OriginAndPid], Memory , Ts)
- , cons([N, <<"process_total_heap_size">> | OriginAndPid], TotalHeapSize , Ts)
- , cons([N, <<"process_stack_size">> | OriginAndPid], StackSize , Ts)
- , cons([N, <<"process_message_queue_len">> | OriginAndPid], MsgQueueLen , Ts)
+ [ cons([N, <<"process_memory">> , OriginBin], Memory , Ts)
+ , cons([N, <<"process_total_heap_size">> , OriginBin], TotalHeapSize , Ts)
+ , cons([N, <<"process_stack_size">> , OriginBin], StackSize , Ts)
+ , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts)
+ , cons([N, <<"process_reductions">> , OriginBin], Reductions , Ts)
].
+-spec aggregate_by_path([t()], erlang:timestamp()) ->
+ [t()].
+aggregate_by_path(Msgs, Timestamp) ->
+ Aggregate =
+ fun (?T{path=K, value=V}, ValsByPath) ->
+ dict:update_counter(K, V, ValsByPath)
+ end,
+ ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
+ ValsByPathList = dict:to_list(ValsByPathDict),
+ [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
+
-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
binary().
proc_origin_to_bin({registered_name, Name}) ->
- atom_to_binary(Name, utf8);
+ NameBin = atom_to_binary(Name, utf8),
+ <<"named--", NameBin/binary>>;
proc_origin_to_bin({ancestry, Ancestry}) ->
#beam_stats_process_ancestry
{ raw_initial_call = InitCallRaw
AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1),
AncestorsBin = hope_option:get(AncestorsBinOpt , Blank),
InitCallRawBin = mfa_to_bin(InitCallRaw),
- << InitCallRawBin/binary
+ << "spawned-via--"
+ , InitCallRawBin/binary
, "--"
, InitCallOTPBin/binary
, "--"
ancestor_to_bin(A) when is_atom(A) ->
atom_to_binary(A, utf8);
ancestor_to_bin(A) when is_pid(A) ->
- pid_to_bin(A).
-
-pid_to_bin(Pid) ->
- PidList = erlang:pid_to_list(Pid),
- PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
- re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
+ <<"PID">>.
-spec mfa_to_bin(mfa()) ->
binary().