feat: aggregate anonymous process data by origin
[beam_stats.git] / src / beam_stats_msg_graphite.erl
CommitLineData
a9ed8751
SK
1-module(beam_stats_msg_graphite).
2
3-include("include/beam_stats.hrl").
6e1a5b00 4-include("include/beam_stats_ets_table.hrl").
a9ed8751 5-include("include/beam_stats_msg_graphite.hrl").
4bb8ddfe
SK
6-include("include/beam_stats_process.hrl").
7-include("include/beam_stats_process_ancestry.hrl").
8-include("include/beam_stats_processes.hrl").
a9ed8751
SK
9
10-export_type(
11 [ t/0
12 ]).
13
14-export(
15 [ of_beam_stats/1
ece99ea3 16 , of_beam_stats/2
67948b18 17 , to_bin/1
9b9299d9 18 , path_to_bin/1
ece99ea3
SK
19 , add_path_prefix/2
20 , node_id_to_bin/1
a9ed8751
SK
21 ]).
22
23-define(T, #?MODULE).
24
25-type t() ::
26 ?T{}.
27
67948b18
SK
28%% ============================================================================
29%% API
30%% ============================================================================
31
a9ed8751
SK
32-spec of_beam_stats(beam_stats:t()) ->
33 [t()].
34of_beam_stats(#beam_stats{node_id=NodeID}=BeamStats) ->
35 NodeIDBin = node_id_to_bin(NodeID),
36 of_beam_stats(BeamStats, NodeIDBin).
37
38-spec of_beam_stats(beam_stats:t(), binary()) ->
39 [t()].
40of_beam_stats(#beam_stats
41 { timestamp = Timestamp
42 , node_id = _
43 , memory = Memory
a37cd038
SK
44 , io_bytes_in = IOBytesIn
45 , io_bytes_out = IOBytesOut
46 , context_switches = ContextSwitches
47 , reductions = Reductions
48 , run_queue = RunQueue
6e1a5b00 49 , ets = ETS
4bb8ddfe 50 , processes = Processes
a9ed8751
SK
51 },
52 <<NodeID/binary>>
53) ->
a37cd038
SK
54 Ts = Timestamp,
55 N = NodeID,
56 [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts)
57 , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts)
58 , cons([N, <<"context_switches">> ], ContextSwitches, Ts)
59 , cons([N, <<"reductions">> ], Reductions , Ts)
60 , cons([N, <<"run_queue">> ], RunQueue , Ts)
61 | of_memory(Memory, NodeID, Ts)
6e1a5b00
SK
62 ]
63 ++ of_ets(ETS, NodeID, Ts)
4bb8ddfe 64 ++ of_processes(Processes, NodeID, Ts).
a9ed8751 65
67948b18
SK
66-spec to_bin(t()) ->
67 binary().
68to_bin(
69 ?T
70 { path = Path
71 , value = Value
72 , timestamp = Timestamp
73 }
74) ->
9b9299d9 75 PathBin = path_to_bin(Path),
67948b18
SK
76 ValueBin = integer_to_binary(Value),
77 TimestampInt = timestamp_to_integer(Timestamp),
78 TimestampBin = integer_to_binary(TimestampInt),
79 <<PathBin/binary, " ", ValueBin/binary, " ", TimestampBin/binary>>.
80
ece99ea3
SK
81-spec add_path_prefix(t(), binary()) ->
82 t().
83add_path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
84 T?T{path = [Prefix | Path]}.
85
9b9299d9
SK
86-spec path_to_bin([binary()]) ->
87 binary().
88path_to_bin(Path) ->
89 bin_join(Path, <<".">>).
90
ece99ea3
SK
91-spec node_id_to_bin(node()) ->
92 binary().
93node_id_to_bin(NodeID) ->
94 NodeIDBin = atom_to_binary(NodeID, utf8),
95 re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
96
67948b18
SK
97%% ============================================================================
98%% Helpers
99%% ============================================================================
100
101-spec bin_join([binary()], binary()) ->
102 binary().
103bin_join([] , <<_/binary>> ) -> <<>>;
104bin_join([<<B/binary>> | []] , <<_/binary>> ) -> B;
105bin_join([<<B/binary>> | [_|_]=Bins], <<Sep/binary>>) ->
106 BinsBin = bin_join(Bins, Sep),
107 <<B/binary, Sep/binary, BinsBin/binary>>.
108
109-spec timestamp_to_integer(erlang:timestamp()) ->
110 non_neg_integer().
111timestamp_to_integer({Megaseconds, Seconds, _}) ->
112 Megaseconds * 1000000 + Seconds.
113
a9ed8751
SK
114-spec of_memory([{atom(), non_neg_integer()}], binary(), erlang:timestamp()) ->
115 [t()].
116of_memory(Memory, <<NodeID/binary>>, Timestamp) ->
117 ComponentToMessage =
118 fun ({Key, Value}) ->
119 KeyBin = atom_to_binary(Key, latin1),
d07a494b 120 cons([NodeID, <<"memory">>, KeyBin], Value, Timestamp)
a9ed8751
SK
121 end,
122 lists:map(ComponentToMessage, Memory).
123
7581255c 124-spec of_ets(beam_stats_ets:t(), binary(), erlang:timestamp()) ->
6e1a5b00
SK
125 [t()].
126of_ets(PerTableStats, <<NodeID/binary>>, Timestamp) ->
127 OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end,
128 NestedMsgs = lists:map(OfEtsTable, PerTableStats),
129 lists:append(NestedMsgs).
130
131-spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
132 [t()].
133of_ets_table(#beam_stats_ets_table
134 { id = ID
135 , name = Name
136 , size = Size
137 , memory = Memory
138 },
139 <<NodeID/binary>>,
140 Timestamp
141) ->
142 IDBin = beam_stats_ets_table:id_to_bin(ID),
143 NameBin = atom_to_binary(Name, latin1),
144 NameAndID = [NameBin, IDBin],
145 [ cons([NodeID, <<"ets_table">>, <<"size">> | NameAndID], Size , Timestamp)
146 , cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp)
147 ].
148
4bb8ddfe
SK
149-spec of_processes(beam_stats_processes:t(), binary(), erlang:timestamp()) ->
150 [t()].
151of_processes(
152 #beam_stats_processes
153 { individual_stats = Processes
154 , count_all = CountAll
155 , count_exiting = CountExiting
156 , count_garbage_collecting = CountGarbageCollecting
157 , count_registered = CountRegistered
158 , count_runnable = CountRunnable
159 , count_running = CountRunning
160 , count_suspended = CountSuspended
161 , count_waiting = CountWaiting
162 },
163 <<NodeID/binary>>,
164 Timestamp
165) ->
166 OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
167 PerProcessMsgsNested = lists:map(OfProcess, Processes),
168 PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
697c496d 169 PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
4bb8ddfe
SK
170 Ts = Timestamp,
171 N = NodeID,
172 [ cons([N, <<"processes_count_all">> ], CountAll , Ts)
173 , cons([N, <<"processes_count_exiting">> ], CountExiting , Ts)
174 , cons([N, <<"processes_count_garbage_collecting">>], CountGarbageCollecting, Ts)
175 , cons([N, <<"processes_count_registered">> ], CountRegistered , Ts)
176 , cons([N, <<"processes_count_runnable">> ], CountRunnable , Ts)
177 , cons([N, <<"processes_count_running">> ], CountRunning , Ts)
178 , cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts)
179 , cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts)
697c496d 180 | PerProcessMsgsAggregates
4bb8ddfe
SK
181 ].
182
183-spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
184 [t()].
185of_process(
186 #beam_stats_process
f65a3845 187 { pid = _
4bb8ddfe
SK
188 , memory = Memory
189 , total_heap_size = TotalHeapSize
190 , stack_size = StackSize
191 , message_queue_len = MsgQueueLen
192 }=Process,
193 <<NodeID/binary>>,
194 Timestamp
195) ->
196 Origin = beam_stats_process:get_best_known_origin(Process),
197 OriginBin = proc_origin_to_bin(Origin),
4bb8ddfe
SK
198 Ts = Timestamp,
199 N = NodeID,
f65a3845
SK
200 [ cons([N, <<"process_memory">> , OriginBin], Memory , Ts)
201 , cons([N, <<"process_total_heap_size">> , OriginBin], TotalHeapSize , Ts)
202 , cons([N, <<"process_stack_size">> , OriginBin], StackSize , Ts)
203 , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts)
4bb8ddfe
SK
204 ].
205
697c496d
SK
206-spec aggregate_by_path([t()], erlang:timestamp()) ->
207 [t()].
208aggregate_by_path(Msgs, Timestamp) ->
209 Aggregate =
210 fun (?T{path=K, value=V}, ValsByPath) ->
211 dict:update_counter(K, V, ValsByPath)
212 end,
213 ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
214 ValsByPathList = dict:to_list(ValsByPathDict),
215 [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
216
4bb8ddfe
SK
217-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
218 binary().
219proc_origin_to_bin({registered_name, Name}) ->
f65a3845
SK
220 NameBin = atom_to_binary(Name, utf8),
221 <<"named--", NameBin/binary>>;
4bb8ddfe
SK
222proc_origin_to_bin({ancestry, Ancestry}) ->
223 #beam_stats_process_ancestry
224 { raw_initial_call = InitCallRaw
225 , otp_initial_call = InitCallOTPOpt
226 , otp_ancestors = AncestorsOpt
227 } = Ancestry,
228 Blank = <<"NONE">>,
229 InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1),
230 InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank),
231 AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1),
232 AncestorsBin = hope_option:get(AncestorsBinOpt , Blank),
233 InitCallRawBin = mfa_to_bin(InitCallRaw),
f65a3845
SK
234 << "spawned-via--"
235 , InitCallRawBin/binary
4bb8ddfe
SK
236 , "--"
237 , InitCallOTPBin/binary
238 , "--"
239 , AncestorsBin/binary
240 >>.
241
242ancestors_to_bin([]) ->
243 <<>>;
244ancestors_to_bin([A | Ancestors]) ->
245 ABin = ancestor_to_bin(A),
246 case ancestors_to_bin(Ancestors)
247 of <<>> ->
248 ABin
249 ; <<AncestorsBin/binary>> ->
250 <<ABin/binary, "-", AncestorsBin/binary>>
251 end.
252
253ancestor_to_bin(A) when is_atom(A) ->
254 atom_to_binary(A, utf8);
255ancestor_to_bin(A) when is_pid(A) ->
256 pid_to_bin(A).
257
258pid_to_bin(Pid) ->
259 PidList = erlang:pid_to_list(Pid),
260 PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
261 re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
262
263-spec mfa_to_bin(mfa()) ->
264 binary().
265mfa_to_bin({Module, Function, Arity}) ->
266 ModuleBin = atom_to_binary(Module , utf8),
267 FunctionBin = atom_to_binary(Function, utf8),
268 ArityBin = erlang:integer_to_binary(Arity),
269 <<ModuleBin/binary, "-", FunctionBin/binary, "-", ArityBin/binary>>.
270
a37cd038
SK
271-spec cons([binary()], integer(), erlang:timestamp()) ->
272 t().
273cons(Path, Value, Timestamp) ->
274 ?T
275 { path = Path
276 , value = Value
277 , timestamp = Timestamp
278 }.
This page took 0.036428 seconds and 4 git commands to generate.