Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
a8d431d1 | 4 | -include("include/beam_stats_ets_table.hrl"). |
69ebab97 SK |
5 | -include("include/beam_stats_process.hrl"). |
6 | -include("include/beam_stats_process_ancestry.hrl"). | |
7 | -include("include/beam_stats_processes.hrl"). | |
f079a56c | 8 | -include("beam_stats_logging.hrl"). |
70939664 SK |
9 | |
10 | -behaviour(beam_stats_consumer). | |
11 | ||
12 | -export_type( | |
13 | [ option/0 | |
14 | ]). | |
15 | ||
b0ab6ee2 | 16 | %% Consumer interface |
70939664 SK |
17 | -export( |
18 | [ init/1 | |
19 | , consume/2 | |
20 | , terminate/1 | |
21 | ]). | |
22 | ||
23 | -type option() :: | |
76aefffb | 24 | {consumption_interval , non_neg_integer()} |
70939664 SK |
25 | | {dst_host , inet:ip_address() | inet:hostname()} |
26 | | {dst_port , inet:port_number()} | |
27 | | {src_port , inet:port_number()} | |
4d24f3b7 | 28 | | {num_msgs_per_packet , non_neg_integer()} |
5b6519f3 | 29 | | {static_node_name , binary()} |
70939664 SK |
30 | . |
31 | ||
32 | -define(DEFAULT_DST_HOST, "localhost"). | |
33 | -define(DEFAULT_DST_PORT, 8125). | |
34 | -define(DEFAULT_SRC_PORT, 8124). | |
35 | ||
36 | -type metric_type() :: | |
37 | % TODO: Add other metric types | |
38 | gauge. | |
39 | ||
40 | -record(statsd_msg, | |
41 | { name :: binary() | |
42 | , value :: non_neg_integer() | |
43 | , type :: metric_type() | |
44 | }). | |
45 | ||
46 | -type statsd_msg() :: | |
47 | #statsd_msg{}. | |
48 | ||
49 | -record(state, | |
50 | { sock :: hope_option:t(gen_udp:socket()) | |
51 | , dst_host :: inet:ip_address() | inet:hostname() | |
52 | , dst_port :: inet:port_number() | |
53 | , src_port :: inet:port_number() | |
4d24f3b7 | 54 | , num_msgs_per_packet :: non_neg_integer() |
5b6519f3 | 55 | , static_node_name :: hope_option:t(binary()) |
70939664 SK |
56 | }). |
57 | ||
58 | -type state() :: | |
59 | #state{}. | |
60 | ||
61 | -define(PATH_PREFIX, "beam_stats"). | |
62 | ||
63 | %% ============================================================================ | |
64 | %% Consumer implementation | |
65 | %% ============================================================================ | |
66 | ||
67 | -spec init([option()]) -> | |
76aefffb | 68 | {non_neg_integer(), state()}. |
70939664 SK |
69 | init(Options) -> |
70 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
71 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
72 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
73 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
4d24f3b7 | 74 | NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), |
5b6519f3 | 75 | StaticNodeNameOpt = hope_kv_list:get(Options, static_node_name), |
70939664 SK |
76 | State = #state |
77 | { sock = none | |
78 | , dst_host = DstHost | |
79 | , dst_port = DstPort | |
80 | , src_port = SrcPort | |
4d24f3b7 | 81 | , num_msgs_per_packet = NumMsgsPerPacket |
5b6519f3 | 82 | , static_node_name = StaticNodeNameOpt |
70939664 SK |
83 | }, |
84 | {ConsumptionInterval, State}. | |
85 | ||
86 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
87 | state(). | |
5b6519f3 SK |
88 | consume( |
89 | Q, | |
90 | #state | |
91 | { num_msgs_per_packet = NumMsgsPerPacket | |
92 | , static_node_name = StaticNodeNameOpt | |
93 | }=State | |
94 | ) -> | |
95 | Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt), | |
4d24f3b7 | 96 | lists:foldl(fun try_to_connect_and_send/2, State, Packets). |
70939664 SK |
97 | |
98 | -spec terminate(state()) -> | |
99 | {}. | |
100 | terminate(#state{sock=SockOpt}) -> | |
101 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
102 | {}. | |
103 | ||
104 | %% ============================================================================ | |
105 | %% Transport | |
106 | %% ============================================================================ | |
107 | ||
4d24f3b7 SK |
108 | -spec try_to_connect_and_send(binary(), state()) -> |
109 | state(). | |
110 | try_to_connect_and_send(<<Payload/binary>>, #state{}=State1) -> | |
111 | State2 = try_to_connect_if_no_socket(State1), | |
112 | try_to_send(State2, Payload). | |
113 | ||
70939664 SK |
114 | -spec try_to_send(state(), binary()) -> |
115 | state(). | |
116 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 117 | ?log_error("Sending failed. No socket in state."), |
70939664 SK |
118 | % TODO: Maybe schedule retry? |
119 | State; | |
120 | try_to_send( | |
121 | #state | |
122 | { sock = {some, Sock} | |
123 | , dst_host = DstHost | |
124 | , dst_port = DstPort | |
125 | }=State, | |
126 | Payload | |
127 | ) -> | |
128 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
129 | of ok -> | |
130 | State | |
131 | ; {error, _}=Error -> | |
f079a56c SK |
132 | ?log_error( |
133 | "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", | |
134 | [Sock, DstHost, DstPort, Error] | |
135 | ), | |
70939664 SK |
136 | % TODO: Do something with unsent messages? |
137 | ok = gen_udp:close(Sock), | |
138 | State#state{sock=none} | |
139 | end. | |
140 | ||
141 | -spec try_to_connect_if_no_socket(state()) -> | |
142 | state(). | |
143 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
144 | State; | |
145 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
146 | case gen_udp:open(SrcPort) | |
147 | of {ok, Sock} -> | |
148 | State#state{sock = {some, Sock}} | |
149 | ; {error, _}=Error -> | |
f079a56c | 150 | ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), |
70939664 SK |
151 | State#state{sock = none} |
152 | end. | |
153 | ||
154 | %% ============================================================================ | |
155 | %% Serialization | |
156 | %% ============================================================================ | |
157 | ||
5b6519f3 SK |
158 | -spec beam_stats_queue_to_packets( |
159 | beam_stats_consumer:queue(), | |
160 | non_neg_integer(), | |
161 | hope_option:t(binary()) | |
162 | ) -> | |
4d24f3b7 | 163 | [binary()]. |
5b6519f3 SK |
164 | beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> |
165 | MsgBins = lists:append([beam_stats_to_bins(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), | |
4d24f3b7 SK |
166 | MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), |
167 | lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). | |
70939664 | 168 | |
5b6519f3 | 169 | -spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) -> |
70939664 SK |
170 | [binary()]. |
171 | beam_stats_to_bins(#beam_stats | |
172 | { node_id = NodeID | |
173 | , memory = Memory | |
b4e2333f SK |
174 | , io_bytes_in = IOBytesIn |
175 | , io_bytes_out = IOBytesOut | |
3fe887d7 | 176 | , context_switches = ContextSwitches |
142c0796 | 177 | , reductions = Reductions |
deefeb3c | 178 | , run_queue = RunQueue |
a8d431d1 | 179 | , ets = ETS |
69ebab97 | 180 | , processes = Processes |
5b6519f3 SK |
181 | }, |
182 | StaticNodeNameOpt | |
70939664 | 183 | ) -> |
5b6519f3 | 184 | NodeIDBin = hope_option:get(StaticNodeNameOpt, node_id_to_bin(NodeID)), |
b4e2333f SK |
185 | Msgs1 = |
186 | [ io_bytes_in_to_msg(IOBytesIn) | |
187 | , io_bytes_out_to_msg(IOBytesOut) | |
3fe887d7 | 188 | , context_switches_to_msg(ContextSwitches) |
142c0796 | 189 | , reductions_to_msg(Reductions) |
deefeb3c | 190 | , run_queue_to_msg(RunQueue) |
b4e2333f | 191 | | memory_to_msgs(Memory) |
a8d431d1 | 192 | ] |
69ebab97 SK |
193 | ++ ets_to_msgs(ETS) |
194 | ++ procs_to_msgs(Processes), | |
70939664 SK |
195 | Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], |
196 | [statsd_msg_to_bin(M) || M <- Msgs2]. | |
197 | ||
deefeb3c SK |
198 | -spec run_queue_to_msg(non_neg_integer()) -> |
199 | statsd_msg(). | |
200 | run_queue_to_msg(RunQueue) -> | |
201 | #statsd_msg | |
202 | { name = <<"run_queue">> | |
203 | , value = RunQueue | |
204 | , type = gauge | |
205 | }. | |
206 | ||
142c0796 SK |
207 | -spec reductions_to_msg(non_neg_integer()) -> |
208 | statsd_msg(). | |
209 | reductions_to_msg(Reductions) -> | |
210 | #statsd_msg | |
211 | { name = <<"reductions">> | |
212 | , value = Reductions | |
213 | , type = gauge | |
214 | }. | |
215 | ||
3fe887d7 SK |
216 | -spec context_switches_to_msg(non_neg_integer()) -> |
217 | statsd_msg(). | |
218 | context_switches_to_msg(ContextSwitches) -> | |
219 | #statsd_msg | |
220 | { name = <<"context_switches">> | |
221 | , value = ContextSwitches | |
222 | , type = gauge | |
223 | }. | |
224 | ||
b4e2333f SK |
225 | -spec io_bytes_in_to_msg(non_neg_integer()) -> |
226 | statsd_msg(). | |
227 | io_bytes_in_to_msg(IOBytesIn) -> | |
228 | #statsd_msg | |
229 | { name = <<"io.bytes_in">> | |
230 | , value = IOBytesIn | |
231 | , type = gauge | |
232 | }. | |
233 | ||
234 | -spec io_bytes_out_to_msg(non_neg_integer()) -> | |
235 | statsd_msg(). | |
236 | io_bytes_out_to_msg(IOBytesOut) -> | |
237 | #statsd_msg | |
238 | { name = <<"io.bytes_out">> | |
239 | , value = IOBytesOut | |
240 | , type = gauge | |
241 | }. | |
242 | ||
69ebab97 SK |
243 | -spec procs_to_msgs(beam_stats_processes:t()) -> |
244 | [statsd_msg()]. | |
245 | procs_to_msgs( | |
246 | #beam_stats_processes | |
247 | { individual_stats = Procs | |
248 | , count_all = CountAll | |
249 | , count_exiting = CountExiting | |
250 | , count_garbage_collecting = CountGarbageCollecting | |
251 | , count_registered = CountRegistered | |
252 | , count_runnable = CountRunnable | |
253 | , count_running = CountRunning | |
254 | , count_suspended = CountSuspended | |
255 | , count_waiting = CountWaiting | |
256 | } | |
257 | ) -> | |
258 | [ gauge(<<"processes_count_all">> , CountAll) | |
259 | , gauge(<<"processes_count_exiting">> , CountExiting) | |
260 | , gauge(<<"processes_count_garbage_collecting">>, CountGarbageCollecting) | |
261 | , gauge(<<"processes_count_registered">> , CountRegistered) | |
262 | , gauge(<<"processes_count_runnable">> , CountRunnable) | |
263 | , gauge(<<"processes_count_running">> , CountRunning) | |
264 | , gauge(<<"processes_count_suspended">> , CountSuspended) | |
265 | , gauge(<<"processes_count_waiting">> , CountWaiting) | |
266 | | lists:append([proc_to_msgs(P) || P <- Procs]) | |
267 | ]. | |
268 | ||
269 | -spec proc_to_msgs(beam_stats_process:t()) -> | |
270 | [statsd_msg()]. | |
271 | proc_to_msgs( | |
272 | #beam_stats_process | |
273 | { pid = Pid | |
274 | , memory = Memory | |
275 | , total_heap_size = TotalHeapSize | |
276 | , stack_size = StackSize | |
277 | , message_queue_len = MsgQueueLen | |
278 | }=Process | |
279 | ) -> | |
280 | Origin = beam_stats_process:get_best_known_origin(Process), | |
281 | OriginBin = proc_origin_to_bin(Origin), | |
282 | PidBin = pid_to_bin(Pid), | |
283 | OriginDotPid = <<OriginBin/binary, ".", PidBin/binary>>, | |
284 | [ gauge(<<"process_memory." , OriginDotPid/binary>>, Memory) | |
285 | , gauge(<<"process_total_heap_size." , OriginDotPid/binary>>, TotalHeapSize) | |
286 | , gauge(<<"process_stack_size." , OriginDotPid/binary>>, StackSize) | |
287 | , gauge(<<"process_message_queue_len." , OriginDotPid/binary>>, MsgQueueLen) | |
288 | ]. | |
289 | ||
290 | -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> | |
291 | binary(). | |
292 | proc_origin_to_bin({registered_name, Name}) -> | |
293 | atom_to_binary(Name, utf8); | |
294 | proc_origin_to_bin({ancestry, Ancestry}) -> | |
295 | #beam_stats_process_ancestry | |
296 | { raw_initial_call = InitCallRaw | |
297 | , otp_initial_call = InitCallOTPOpt | |
298 | , otp_ancestors = AncestorsOpt | |
299 | } = Ancestry, | |
300 | Blank = <<"NONE">>, | |
301 | InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1), | |
302 | InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank), | |
303 | AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1), | |
304 | AncestorsBin = hope_option:get(AncestorsBinOpt , Blank), | |
305 | InitCallRawBin = mfa_to_bin(InitCallRaw), | |
306 | << InitCallRawBin/binary | |
307 | , "--" | |
308 | , InitCallOTPBin/binary | |
309 | , "--" | |
310 | , AncestorsBin/binary | |
311 | >>. | |
312 | ||
313 | ancestors_to_bin([]) -> | |
314 | <<>>; | |
315 | ancestors_to_bin([A | Ancestors]) -> | |
316 | ABin = ancestor_to_bin(A), | |
317 | case ancestors_to_bin(Ancestors) | |
318 | of <<>> -> | |
319 | ABin | |
320 | ; <<AncestorsBin/binary>> -> | |
321 | <<ABin/binary, "-", AncestorsBin/binary>> | |
322 | end. | |
323 | ||
324 | ancestor_to_bin(A) when is_atom(A) -> | |
325 | atom_to_binary(A, utf8); | |
326 | ancestor_to_bin(A) when is_pid(A) -> | |
327 | pid_to_bin(A). | |
328 | ||
329 | pid_to_bin(Pid) -> | |
330 | PidList = erlang:pid_to_list(Pid), | |
331 | PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]), | |
332 | re:replace(PidBin , "[><]", "" , [global, {return, binary}]). | |
333 | ||
334 | -spec mfa_to_bin(mfa()) -> | |
335 | binary(). | |
336 | mfa_to_bin({Module, Function, Arity}) -> | |
337 | ModuleBin = atom_to_binary(Module , utf8), | |
338 | FunctionBin = atom_to_binary(Function, utf8), | |
339 | ArityBin = erlang:integer_to_binary(Arity), | |
340 | <<ModuleBin/binary, "-", FunctionBin/binary, "-", ArityBin/binary>>. | |
341 | ||
342 | ||
343 | -spec gauge(binary(), integer()) -> | |
344 | statsd_msg(). | |
345 | gauge(<<Name/binary>>, Value) when is_integer(Value) -> | |
346 | #statsd_msg | |
347 | { name = Name | |
348 | , value = Value | |
349 | , type = gauge | |
350 | }. | |
351 | ||
a8d431d1 SK |
352 | -spec ets_to_msgs(beam_stats_ets:t()) -> |
353 | [statsd_msg()]. | |
354 | ets_to_msgs(PerTableStats) -> | |
355 | NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), | |
356 | lists:append(NestedMsgs). | |
357 | ||
358 | -spec ets_table_to_msgs(beam_stats_ets_table:t()) -> | |
359 | [statsd_msg()]. | |
360 | ets_table_to_msgs(#beam_stats_ets_table | |
361 | { id = ID | |
362 | , name = Name | |
363 | , size = Size | |
364 | , memory = Memory | |
365 | } | |
366 | ) -> | |
367 | IDBin = beam_stats_ets_table:id_to_bin(ID), | |
368 | NameBin = atom_to_binary(Name, latin1), | |
369 | NameAndID = <<NameBin/binary, ".", IDBin/binary>>, | |
370 | SizeMsg = | |
371 | #statsd_msg | |
372 | { name = <<"ets_table.size.", NameAndID/binary>> | |
373 | , value = Size | |
374 | , type = gauge | |
375 | }, | |
376 | MemoryMsg = | |
377 | #statsd_msg | |
378 | { name = <<"ets_table.memory.", NameAndID/binary>> | |
379 | , value = Memory | |
380 | , type = gauge | |
381 | }, | |
382 | [SizeMsg, MemoryMsg]. | |
383 | ||
e7b39f07 | 384 | -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> |
70939664 SK |
385 | [statsd_msg()]. |
386 | memory_to_msgs(Memory) -> | |
387 | [memory_component_to_statsd_msg(MC) || MC <- Memory]. | |
388 | ||
e7b39f07 | 389 | -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> |
70939664 SK |
390 | statsd_msg(). |
391 | memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> | |
101874c3 | 392 | MemTypeBin = atom_to_binary(MemType, latin1), |
70939664 | 393 | #statsd_msg |
101874c3 | 394 | { name = <<"memory.", MemTypeBin/binary>> |
70939664 SK |
395 | , value = MemSize |
396 | , type = gauge | |
397 | }. | |
398 | ||
399 | -spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> | |
400 | statsd_msg(). | |
401 | statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) -> | |
402 | Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>, | |
403 | Name2 = <<Prefix/binary, Name1/binary>>, | |
404 | Msg#statsd_msg{name=Name2}. | |
405 | ||
406 | -spec statsd_msg_to_bin(statsd_msg()) -> | |
407 | binary(). | |
408 | statsd_msg_to_bin( | |
409 | #statsd_msg | |
410 | { name = <<Name/binary>> | |
411 | , value = Value | |
412 | , type = Type = gauge | |
413 | } | |
414 | ) when Value >= 0 -> | |
415 | TypeBin = metric_type_to_bin(Type), | |
416 | ValueBin = integer_to_binary(Value), | |
417 | << Name/binary | |
418 | , ":" | |
419 | , ValueBin/binary | |
420 | , "|" | |
421 | , TypeBin/binary | |
422 | , "\n" | |
423 | >>. | |
424 | ||
425 | -spec metric_type_to_bin(metric_type()) -> | |
426 | binary(). | |
427 | metric_type_to_bin(gauge) -> | |
428 | <<"g">>. | |
429 | ||
430 | -spec node_id_to_bin(node()) -> | |
431 | binary(). | |
432 | node_id_to_bin(NodeID) -> | |
433 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
434 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). |