d33e45054dc8f6e67a95f60e7e627d4470e481a5
[beam_stats.git] / src / beam_stats_msg_graphite.erl
1 -module(beam_stats_msg_graphite).
2
3 -include("include/beam_stats.hrl").
4 -include("include/beam_stats_ets_table.hrl").
5 -include("include/beam_stats_msg_graphite.hrl").
6 -include("include/beam_stats_process.hrl").
7 -include("include/beam_stats_process_ancestry.hrl").
8 -include("include/beam_stats_processes.hrl").
9
10 -export_type(
11 [ t/0
12 ]).
13
14 -export(
15 [ of_beam_stats/1
16 , of_beam_stats/2
17 , to_iolist/1
18 , path_to_iolist/1
19 , node_id_to_bin/1
20 ]).
21
22 -define(T, #?MODULE).
23
24 -type t() ::
25 ?T{}.
26
27 %% ============================================================================
28 %% API
29 %% ============================================================================
30
31 -spec of_beam_stats(beam_stats:t()) ->
32 [t()].
33 of_beam_stats(#beam_stats{node_id=NodeID}=BeamStats) ->
34 NodeIDBin = node_id_to_bin(NodeID),
35 of_beam_stats(BeamStats, NodeIDBin).
36
37 -spec of_beam_stats(beam_stats:t(), binary()) ->
38 [t()].
39 of_beam_stats(#beam_stats
40 { timestamp = Timestamp
41 , node_id = _
42 , memory = Memory
43 , io_bytes_in = IOBytesIn
44 , io_bytes_out = IOBytesOut
45 , context_switches = ContextSwitches
46 , reductions = Reductions
47 , run_queue = RunQueue
48 , ets = ETS
49 , processes = Processes
50 },
51 <<NodeID/binary>>
52 ) ->
53 Ts = Timestamp,
54 N = NodeID,
55 Msgs =
56 [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts)
57 , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts)
58 , cons([N, <<"context_switches">> ], ContextSwitches, Ts)
59 , cons([N, <<"reductions">> ], Reductions , Ts)
60 , cons([N, <<"run_queue">> ], RunQueue , Ts)
61 | of_memory(Memory, NodeID, Ts)
62 ]
63 ++ of_ets(ETS, NodeID, Ts)
64 ++ of_processes(Processes, NodeID, Ts),
65 lists:map(fun path_prefix_schema_version/1, Msgs).
66
67 -spec to_iolist(t()) ->
68 iolist().
69 to_iolist(
70 ?T
71 { path = Path
72 , value = Value
73 , timestamp = Timestamp
74 }
75 ) ->
76 PathIOList = path_to_iolist(Path),
77 ValueBin = integer_to_binary(Value),
78 TimestampInt = timestamp_to_integer(Timestamp),
79 TimestampBin = integer_to_binary(TimestampInt),
80 [PathIOList, <<" ">>, ValueBin, <<" ">>, TimestampBin].
81
82 -spec path_to_iolist([binary()]) ->
83 iolist().
84 path_to_iolist(Path) ->
85 interleave(Path, <<".">>).
86
87 -spec node_id_to_bin(node()) ->
88 binary().
89 node_id_to_bin(NodeID) ->
90 NodeIDBin = atom_to_binary(NodeID, utf8),
91 re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
92
93 %% ============================================================================
94 %% Helpers
95 %% ============================================================================
96
97 -spec path_prefix_schema_version(t()) ->
98 t().
99 path_prefix_schema_version(?T{}=T) ->
100 path_prefix(T, schema_version()).
101
102 -spec path_prefix(t(), binary()) ->
103 t().
104 path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
105 T?T{path = [Prefix | Path]}.
106
107 -spec schema_version() ->
108 binary().
109 schema_version() ->
110 <<"beam_stats_v0">>.
111
112 -spec interleave(iolist(), iodata()) ->
113 iolist().
114 interleave([], _) -> [];
115 interleave([X], _) -> [X];
116 interleave([X|Xs], Sep) ->
117 [X, Sep | interleave(Xs, Sep)].
118
119 -spec timestamp_to_integer(erlang:timestamp()) ->
120 non_neg_integer().
121 timestamp_to_integer({Megaseconds, Seconds, _}) ->
122 Megaseconds * 1000000 + Seconds.
123
124 -spec of_memory([{atom(), non_neg_integer()}], binary(), erlang:timestamp()) ->
125 [t()].
126 of_memory(Memory, <<NodeID/binary>>, Timestamp) ->
127 ComponentToMessage =
128 fun ({Key, Value}) ->
129 KeyBin = atom_to_binary(Key, latin1),
130 cons([NodeID, <<"memory">>, KeyBin], Value, Timestamp)
131 end,
132 lists:map(ComponentToMessage, Memory).
133
134 -spec of_ets(beam_stats_ets:t(), binary(), erlang:timestamp()) ->
135 [t()].
136 of_ets(PerTableStats, <<NodeID/binary>>, Timestamp) ->
137 OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end,
138 MsgsNested = lists:map(OfEtsTable, PerTableStats),
139 MsgsFlattened = lists:append(MsgsNested),
140 aggregate_by_path(MsgsFlattened, Timestamp).
141
142 -spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
143 [t()].
144 of_ets_table(#beam_stats_ets_table
145 { id = ID
146 , name = Name
147 , size = Size
148 , memory = Memory
149 },
150 <<NodeID/binary>>,
151 Timestamp
152 ) ->
153 IDType =
154 case ID =:= Name
155 of true -> <<"NAMED">>
156 ; false -> <<"TID">>
157 end,
158 NameBin = atom_to_binary(Name, latin1),
159 NameAndID = [NameBin, IDType],
160 [ cons([NodeID, <<"ets_table">>, <<"size">> | NameAndID], Size , Timestamp)
161 , cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp)
162 ].
163
164 -spec of_processes(beam_stats_processes:t(), binary(), erlang:timestamp()) ->
165 [t()].
166 of_processes(
167 #beam_stats_processes
168 { individual_stats = Processes
169 , count_all = CountAll
170 , count_exiting = CountExiting
171 , count_garbage_collecting = CountGarbageCollecting
172 , count_registered = CountRegistered
173 , count_runnable = CountRunnable
174 , count_running = CountRunning
175 , count_suspended = CountSuspended
176 , count_waiting = CountWaiting
177 },
178 <<NodeID/binary>>,
179 Timestamp
180 ) ->
181 OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
182 PerProcessMsgsNested = lists:map(OfProcess, Processes),
183 PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
184 PerProcessMsgsAggregates = aggregate_by_path(PerProcessMsgsFlattened, Timestamp),
185 Ts = Timestamp,
186 N = NodeID,
187 [ cons([N, <<"processes_count_all">> ], CountAll , Ts)
188 , cons([N, <<"processes_count_exiting">> ], CountExiting , Ts)
189 , cons([N, <<"processes_count_garbage_collecting">>], CountGarbageCollecting, Ts)
190 , cons([N, <<"processes_count_registered">> ], CountRegistered , Ts)
191 , cons([N, <<"processes_count_runnable">> ], CountRunnable , Ts)
192 , cons([N, <<"processes_count_running">> ], CountRunning , Ts)
193 , cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts)
194 , cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts)
195 | PerProcessMsgsAggregates
196 ].
197
198 -spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
199 [t()].
200 of_process(
201 #beam_stats_process
202 { pid = _
203 , memory = Memory
204 , total_heap_size = TotalHeapSize
205 , stack_size = StackSize
206 , message_queue_len = MsgQueueLen
207 , reductions = Reductions
208 }=Process,
209 <<NodeID/binary>>,
210 Timestamp
211 ) ->
212 Origin = beam_stats_process:get_best_known_origin(Process),
213 OriginBin = proc_origin_to_bin(Origin),
214 Ts = Timestamp,
215 N = NodeID,
216 [ cons([N, <<"process_memory">> , OriginBin], Memory , Ts)
217 , cons([N, <<"process_total_heap_size">> , OriginBin], TotalHeapSize , Ts)
218 , cons([N, <<"process_stack_size">> , OriginBin], StackSize , Ts)
219 , cons([N, <<"process_message_queue_len">> , OriginBin], MsgQueueLen , Ts)
220 , cons([N, <<"process_reductions">> , OriginBin], Reductions , Ts)
221 ].
222
223 -spec aggregate_by_path([t()], erlang:timestamp()) ->
224 [t()].
225 aggregate_by_path(Msgs, Timestamp) ->
226 Aggregate =
227 fun (?T{path=K, value=V}, ValsByPath) ->
228 dict:update_counter(K, V, ValsByPath)
229 end,
230 ValsByPathDict = lists:foldl(Aggregate, dict:new(), Msgs),
231 ValsByPathList = dict:to_list(ValsByPathDict),
232 [cons(Path, Value, Timestamp) || {Path, Value} <- ValsByPathList].
233
234 -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
235 binary().
236 proc_origin_to_bin({registered_name, Name}) ->
237 NameBin = atom_to_binary(Name, utf8),
238 <<"named--", NameBin/binary>>;
239 proc_origin_to_bin({ancestry, Ancestry}) ->
240 #beam_stats_process_ancestry
241 { raw_initial_call = InitCallRaw
242 , otp_initial_call = InitCallOTPOpt
243 , otp_ancestors = AncestorsOpt
244 } = Ancestry,
245 Blank = <<"NONE">>,
246 InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1),
247 InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank),
248 AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1),
249 AncestorsBin = hope_option:get(AncestorsBinOpt , Blank),
250 InitCallRawBin = mfa_to_bin(InitCallRaw),
251 << "spawned-via--"
252 , InitCallRawBin/binary
253 , "--"
254 , InitCallOTPBin/binary
255 , "--"
256 , AncestorsBin/binary
257 >>.
258
259 ancestors_to_bin([]) ->
260 <<>>;
261 ancestors_to_bin([A | Ancestors]) ->
262 ABin = ancestor_to_bin(A),
263 case ancestors_to_bin(Ancestors)
264 of <<>> ->
265 ABin
266 ; <<AncestorsBin/binary>> ->
267 <<ABin/binary, "-", AncestorsBin/binary>>
268 end.
269
270 ancestor_to_bin(A) when is_atom(A) ->
271 atom_to_binary(A, utf8);
272 ancestor_to_bin(A) when is_pid(A) ->
273 <<"PID">>.
274
275 -spec mfa_to_bin(mfa()) ->
276 binary().
277 mfa_to_bin({Module, Function, Arity}) ->
278 ModuleBin = atom_to_binary(Module , utf8),
279 FunctionBin = atom_to_binary(Function, utf8),
280 ArityBin = erlang:integer_to_binary(Arity),
281 <<ModuleBin/binary, "-", FunctionBin/binary, "-", ArityBin/binary>>.
282
283 -spec cons([binary()], integer(), erlang:timestamp()) ->
284 t().
285 cons(Path, Value, Timestamp) ->
286 ?T
287 { path = Path
288 , value = Value
289 , timestamp = Timestamp
290 }.
This page took 0.075646 seconds and 3 git commands to generate.