Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
a8d431d1 | 4 | -include("include/beam_stats_ets_table.hrl"). |
69ebab97 SK |
5 | -include("include/beam_stats_process.hrl"). |
6 | -include("include/beam_stats_process_ancestry.hrl"). | |
7 | -include("include/beam_stats_processes.hrl"). | |
f079a56c | 8 | -include("beam_stats_logging.hrl"). |
70939664 SK |
9 | |
10 | -behaviour(beam_stats_consumer). | |
11 | ||
12 | -export_type( | |
13 | [ option/0 | |
14 | ]). | |
15 | ||
b0ab6ee2 | 16 | %% Consumer interface |
70939664 SK |
17 | -export( |
18 | [ init/1 | |
19 | , consume/2 | |
20 | , terminate/1 | |
21 | ]). | |
22 | ||
23 | -type option() :: | |
76aefffb | 24 | {consumption_interval , non_neg_integer()} |
70939664 SK |
25 | | {dst_host , inet:ip_address() | inet:hostname()} |
26 | | {dst_port , inet:port_number()} | |
27 | | {src_port , inet:port_number()} | |
4d24f3b7 | 28 | | {num_msgs_per_packet , non_neg_integer()} |
70939664 SK |
29 | . |
30 | ||
31 | -define(DEFAULT_DST_HOST, "localhost"). | |
32 | -define(DEFAULT_DST_PORT, 8125). | |
33 | -define(DEFAULT_SRC_PORT, 8124). | |
34 | ||
35 | -type metric_type() :: | |
36 | % TODO: Add other metric types | |
37 | gauge. | |
38 | ||
39 | -record(statsd_msg, | |
40 | { name :: binary() | |
41 | , value :: non_neg_integer() | |
42 | , type :: metric_type() | |
43 | }). | |
44 | ||
45 | -type statsd_msg() :: | |
46 | #statsd_msg{}. | |
47 | ||
48 | -record(state, | |
49 | { sock :: hope_option:t(gen_udp:socket()) | |
50 | , dst_host :: inet:ip_address() | inet:hostname() | |
51 | , dst_port :: inet:port_number() | |
52 | , src_port :: inet:port_number() | |
4d24f3b7 | 53 | , num_msgs_per_packet :: non_neg_integer() |
70939664 SK |
54 | }). |
55 | ||
56 | -type state() :: | |
57 | #state{}. | |
58 | ||
59 | -define(PATH_PREFIX, "beam_stats"). | |
60 | ||
61 | %% ============================================================================ | |
62 | %% Consumer implementation | |
63 | %% ============================================================================ | |
64 | ||
65 | -spec init([option()]) -> | |
76aefffb | 66 | {non_neg_integer(), state()}. |
70939664 SK |
67 | init(Options) -> |
68 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
69 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
70 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
71 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
4d24f3b7 | 72 | NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), |
70939664 SK |
73 | State = #state |
74 | { sock = none | |
75 | , dst_host = DstHost | |
76 | , dst_port = DstPort | |
77 | , src_port = SrcPort | |
4d24f3b7 | 78 | , num_msgs_per_packet = NumMsgsPerPacket |
70939664 SK |
79 | }, |
80 | {ConsumptionInterval, State}. | |
81 | ||
82 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
83 | state(). | |
4d24f3b7 SK |
84 | consume(Q, #state{num_msgs_per_packet=NumMsgsPerPacket}=State) -> |
85 | Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket), | |
86 | lists:foldl(fun try_to_connect_and_send/2, State, Packets). | |
70939664 SK |
87 | |
88 | -spec terminate(state()) -> | |
89 | {}. | |
90 | terminate(#state{sock=SockOpt}) -> | |
91 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
92 | {}. | |
93 | ||
94 | %% ============================================================================ | |
95 | %% Transport | |
96 | %% ============================================================================ | |
97 | ||
4d24f3b7 SK |
98 | -spec try_to_connect_and_send(binary(), state()) -> |
99 | state(). | |
100 | try_to_connect_and_send(<<Payload/binary>>, #state{}=State1) -> | |
101 | State2 = try_to_connect_if_no_socket(State1), | |
102 | try_to_send(State2, Payload). | |
103 | ||
70939664 SK |
104 | -spec try_to_send(state(), binary()) -> |
105 | state(). | |
106 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 107 | ?log_error("Sending failed. No socket in state."), |
70939664 SK |
108 | % TODO: Maybe schedule retry? |
109 | State; | |
110 | try_to_send( | |
111 | #state | |
112 | { sock = {some, Sock} | |
113 | , dst_host = DstHost | |
114 | , dst_port = DstPort | |
115 | }=State, | |
116 | Payload | |
117 | ) -> | |
118 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
119 | of ok -> | |
120 | State | |
121 | ; {error, _}=Error -> | |
f079a56c SK |
122 | ?log_error( |
123 | "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", | |
124 | [Sock, DstHost, DstPort, Error] | |
125 | ), | |
70939664 SK |
126 | % TODO: Do something with unsent messages? |
127 | ok = gen_udp:close(Sock), | |
128 | State#state{sock=none} | |
129 | end. | |
130 | ||
131 | -spec try_to_connect_if_no_socket(state()) -> | |
132 | state(). | |
133 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
134 | State; | |
135 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
136 | case gen_udp:open(SrcPort) | |
137 | of {ok, Sock} -> | |
138 | State#state{sock = {some, Sock}} | |
139 | ; {error, _}=Error -> | |
f079a56c | 140 | ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), |
70939664 SK |
141 | State#state{sock = none} |
142 | end. | |
143 | ||
144 | %% ============================================================================ | |
145 | %% Serialization | |
146 | %% ============================================================================ | |
147 | ||
4d24f3b7 SK |
148 | -spec beam_stats_queue_to_packets(beam_stats_consumer:queue(), non_neg_integer()) -> |
149 | [binary()]. | |
150 | beam_stats_queue_to_packets(Q, NumMsgsPerPacket) -> | |
151 | MsgBins = lists:append([beam_stats_to_bins(B) || B <- queue:to_list(Q)]), | |
152 | MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), | |
153 | lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). | |
70939664 SK |
154 | |
155 | -spec beam_stats_to_bins(beam_stats:t()) -> | |
156 | [binary()]. | |
157 | beam_stats_to_bins(#beam_stats | |
158 | { node_id = NodeID | |
159 | , memory = Memory | |
b4e2333f SK |
160 | , io_bytes_in = IOBytesIn |
161 | , io_bytes_out = IOBytesOut | |
3fe887d7 | 162 | , context_switches = ContextSwitches |
142c0796 | 163 | , reductions = Reductions |
deefeb3c | 164 | , run_queue = RunQueue |
a8d431d1 | 165 | , ets = ETS |
69ebab97 | 166 | , processes = Processes |
70939664 SK |
167 | } |
168 | ) -> | |
169 | NodeIDBin = node_id_to_bin(NodeID), | |
b4e2333f SK |
170 | Msgs1 = |
171 | [ io_bytes_in_to_msg(IOBytesIn) | |
172 | , io_bytes_out_to_msg(IOBytesOut) | |
3fe887d7 | 173 | , context_switches_to_msg(ContextSwitches) |
142c0796 | 174 | , reductions_to_msg(Reductions) |
deefeb3c | 175 | , run_queue_to_msg(RunQueue) |
b4e2333f | 176 | | memory_to_msgs(Memory) |
a8d431d1 | 177 | ] |
69ebab97 SK |
178 | ++ ets_to_msgs(ETS) |
179 | ++ procs_to_msgs(Processes), | |
70939664 SK |
180 | Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], |
181 | [statsd_msg_to_bin(M) || M <- Msgs2]. | |
182 | ||
deefeb3c SK |
183 | -spec run_queue_to_msg(non_neg_integer()) -> |
184 | statsd_msg(). | |
185 | run_queue_to_msg(RunQueue) -> | |
186 | #statsd_msg | |
187 | { name = <<"run_queue">> | |
188 | , value = RunQueue | |
189 | , type = gauge | |
190 | }. | |
191 | ||
142c0796 SK |
192 | -spec reductions_to_msg(non_neg_integer()) -> |
193 | statsd_msg(). | |
194 | reductions_to_msg(Reductions) -> | |
195 | #statsd_msg | |
196 | { name = <<"reductions">> | |
197 | , value = Reductions | |
198 | , type = gauge | |
199 | }. | |
200 | ||
3fe887d7 SK |
201 | -spec context_switches_to_msg(non_neg_integer()) -> |
202 | statsd_msg(). | |
203 | context_switches_to_msg(ContextSwitches) -> | |
204 | #statsd_msg | |
205 | { name = <<"context_switches">> | |
206 | , value = ContextSwitches | |
207 | , type = gauge | |
208 | }. | |
209 | ||
b4e2333f SK |
210 | -spec io_bytes_in_to_msg(non_neg_integer()) -> |
211 | statsd_msg(). | |
212 | io_bytes_in_to_msg(IOBytesIn) -> | |
213 | #statsd_msg | |
214 | { name = <<"io.bytes_in">> | |
215 | , value = IOBytesIn | |
216 | , type = gauge | |
217 | }. | |
218 | ||
219 | -spec io_bytes_out_to_msg(non_neg_integer()) -> | |
220 | statsd_msg(). | |
221 | io_bytes_out_to_msg(IOBytesOut) -> | |
222 | #statsd_msg | |
223 | { name = <<"io.bytes_out">> | |
224 | , value = IOBytesOut | |
225 | , type = gauge | |
226 | }. | |
227 | ||
69ebab97 SK |
228 | -spec procs_to_msgs(beam_stats_processes:t()) -> |
229 | [statsd_msg()]. | |
230 | procs_to_msgs( | |
231 | #beam_stats_processes | |
232 | { individual_stats = Procs | |
233 | , count_all = CountAll | |
234 | , count_exiting = CountExiting | |
235 | , count_garbage_collecting = CountGarbageCollecting | |
236 | , count_registered = CountRegistered | |
237 | , count_runnable = CountRunnable | |
238 | , count_running = CountRunning | |
239 | , count_suspended = CountSuspended | |
240 | , count_waiting = CountWaiting | |
241 | } | |
242 | ) -> | |
243 | [ gauge(<<"processes_count_all">> , CountAll) | |
244 | , gauge(<<"processes_count_exiting">> , CountExiting) | |
245 | , gauge(<<"processes_count_garbage_collecting">>, CountGarbageCollecting) | |
246 | , gauge(<<"processes_count_registered">> , CountRegistered) | |
247 | , gauge(<<"processes_count_runnable">> , CountRunnable) | |
248 | , gauge(<<"processes_count_running">> , CountRunning) | |
249 | , gauge(<<"processes_count_suspended">> , CountSuspended) | |
250 | , gauge(<<"processes_count_waiting">> , CountWaiting) | |
251 | | lists:append([proc_to_msgs(P) || P <- Procs]) | |
252 | ]. | |
253 | ||
254 | -spec proc_to_msgs(beam_stats_process:t()) -> | |
255 | [statsd_msg()]. | |
256 | proc_to_msgs( | |
257 | #beam_stats_process | |
258 | { pid = Pid | |
259 | , memory = Memory | |
260 | , total_heap_size = TotalHeapSize | |
261 | , stack_size = StackSize | |
262 | , message_queue_len = MsgQueueLen | |
263 | }=Process | |
264 | ) -> | |
265 | Origin = beam_stats_process:get_best_known_origin(Process), | |
266 | OriginBin = proc_origin_to_bin(Origin), | |
267 | PidBin = pid_to_bin(Pid), | |
268 | OriginDotPid = <<OriginBin/binary, ".", PidBin/binary>>, | |
269 | [ gauge(<<"process_memory." , OriginDotPid/binary>>, Memory) | |
270 | , gauge(<<"process_total_heap_size." , OriginDotPid/binary>>, TotalHeapSize) | |
271 | , gauge(<<"process_stack_size." , OriginDotPid/binary>>, StackSize) | |
272 | , gauge(<<"process_message_queue_len." , OriginDotPid/binary>>, MsgQueueLen) | |
273 | ]. | |
274 | ||
275 | -spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> | |
276 | binary(). | |
277 | proc_origin_to_bin({registered_name, Name}) -> | |
278 | atom_to_binary(Name, utf8); | |
279 | proc_origin_to_bin({ancestry, Ancestry}) -> | |
280 | #beam_stats_process_ancestry | |
281 | { raw_initial_call = InitCallRaw | |
282 | , otp_initial_call = InitCallOTPOpt | |
283 | , otp_ancestors = AncestorsOpt | |
284 | } = Ancestry, | |
285 | Blank = <<"NONE">>, | |
286 | InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1), | |
287 | InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank), | |
288 | AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1), | |
289 | AncestorsBin = hope_option:get(AncestorsBinOpt , Blank), | |
290 | InitCallRawBin = mfa_to_bin(InitCallRaw), | |
291 | << InitCallRawBin/binary | |
292 | , "--" | |
293 | , InitCallOTPBin/binary | |
294 | , "--" | |
295 | , AncestorsBin/binary | |
296 | >>. | |
297 | ||
298 | ancestors_to_bin([]) -> | |
299 | <<>>; | |
300 | ancestors_to_bin([A | Ancestors]) -> | |
301 | ABin = ancestor_to_bin(A), | |
302 | case ancestors_to_bin(Ancestors) | |
303 | of <<>> -> | |
304 | ABin | |
305 | ; <<AncestorsBin/binary>> -> | |
306 | <<ABin/binary, "-", AncestorsBin/binary>> | |
307 | end. | |
308 | ||
309 | ancestor_to_bin(A) when is_atom(A) -> | |
310 | atom_to_binary(A, utf8); | |
311 | ancestor_to_bin(A) when is_pid(A) -> | |
312 | pid_to_bin(A). | |
313 | ||
314 | pid_to_bin(Pid) -> | |
315 | PidList = erlang:pid_to_list(Pid), | |
316 | PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]), | |
317 | re:replace(PidBin , "[><]", "" , [global, {return, binary}]). | |
318 | ||
319 | -spec mfa_to_bin(mfa()) -> | |
320 | binary(). | |
321 | mfa_to_bin({Module, Function, Arity}) -> | |
322 | ModuleBin = atom_to_binary(Module , utf8), | |
323 | FunctionBin = atom_to_binary(Function, utf8), | |
324 | ArityBin = erlang:integer_to_binary(Arity), | |
325 | <<ModuleBin/binary, "-", FunctionBin/binary, "-", ArityBin/binary>>. | |
326 | ||
327 | ||
328 | -spec gauge(binary(), integer()) -> | |
329 | statsd_msg(). | |
330 | gauge(<<Name/binary>>, Value) when is_integer(Value) -> | |
331 | #statsd_msg | |
332 | { name = Name | |
333 | , value = Value | |
334 | , type = gauge | |
335 | }. | |
336 | ||
a8d431d1 SK |
337 | -spec ets_to_msgs(beam_stats_ets:t()) -> |
338 | [statsd_msg()]. | |
339 | ets_to_msgs(PerTableStats) -> | |
340 | NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), | |
341 | lists:append(NestedMsgs). | |
342 | ||
343 | -spec ets_table_to_msgs(beam_stats_ets_table:t()) -> | |
344 | [statsd_msg()]. | |
345 | ets_table_to_msgs(#beam_stats_ets_table | |
346 | { id = ID | |
347 | , name = Name | |
348 | , size = Size | |
349 | , memory = Memory | |
350 | } | |
351 | ) -> | |
352 | IDBin = beam_stats_ets_table:id_to_bin(ID), | |
353 | NameBin = atom_to_binary(Name, latin1), | |
354 | NameAndID = <<NameBin/binary, ".", IDBin/binary>>, | |
355 | SizeMsg = | |
356 | #statsd_msg | |
357 | { name = <<"ets_table.size.", NameAndID/binary>> | |
358 | , value = Size | |
359 | , type = gauge | |
360 | }, | |
361 | MemoryMsg = | |
362 | #statsd_msg | |
363 | { name = <<"ets_table.memory.", NameAndID/binary>> | |
364 | , value = Memory | |
365 | , type = gauge | |
366 | }, | |
367 | [SizeMsg, MemoryMsg]. | |
368 | ||
e7b39f07 | 369 | -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> |
70939664 SK |
370 | [statsd_msg()]. |
371 | memory_to_msgs(Memory) -> | |
372 | [memory_component_to_statsd_msg(MC) || MC <- Memory]. | |
373 | ||
e7b39f07 | 374 | -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> |
70939664 SK |
375 | statsd_msg(). |
376 | memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> | |
101874c3 | 377 | MemTypeBin = atom_to_binary(MemType, latin1), |
70939664 | 378 | #statsd_msg |
101874c3 | 379 | { name = <<"memory.", MemTypeBin/binary>> |
70939664 SK |
380 | , value = MemSize |
381 | , type = gauge | |
382 | }. | |
383 | ||
384 | -spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> | |
385 | statsd_msg(). | |
386 | statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) -> | |
387 | Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>, | |
388 | Name2 = <<Prefix/binary, Name1/binary>>, | |
389 | Msg#statsd_msg{name=Name2}. | |
390 | ||
391 | -spec statsd_msg_to_bin(statsd_msg()) -> | |
392 | binary(). | |
393 | statsd_msg_to_bin( | |
394 | #statsd_msg | |
395 | { name = <<Name/binary>> | |
396 | , value = Value | |
397 | , type = Type = gauge | |
398 | } | |
399 | ) when Value >= 0 -> | |
400 | TypeBin = metric_type_to_bin(Type), | |
401 | ValueBin = integer_to_binary(Value), | |
402 | << Name/binary | |
403 | , ":" | |
404 | , ValueBin/binary | |
405 | , "|" | |
406 | , TypeBin/binary | |
407 | , "\n" | |
408 | >>. | |
409 | ||
410 | -spec metric_type_to_bin(metric_type()) -> | |
411 | binary(). | |
412 | metric_type_to_bin(gauge) -> | |
413 | <<"g">>. | |
414 | ||
415 | -spec node_id_to_bin(node()) -> | |
416 | binary(). | |
417 | node_id_to_bin(NodeID) -> | |
418 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
419 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). |