Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
a8d431d1 | 4 | -include("include/beam_stats_ets_table.hrl"). |
f079a56c | 5 | -include("beam_stats_logging.hrl"). |
70939664 SK |
6 | |
7 | -behaviour(beam_stats_consumer). | |
8 | ||
9 | -export_type( | |
10 | [ option/0 | |
11 | ]). | |
12 | ||
b0ab6ee2 | 13 | %% Consumer interface |
70939664 SK |
14 | -export( |
15 | [ init/1 | |
16 | , consume/2 | |
17 | , terminate/1 | |
18 | ]). | |
19 | ||
20 | -type option() :: | |
76aefffb | 21 | {consumption_interval , non_neg_integer()} |
70939664 SK |
22 | | {dst_host , inet:ip_address() | inet:hostname()} |
23 | | {dst_port , inet:port_number()} | |
24 | | {src_port , inet:port_number()} | |
4d24f3b7 | 25 | | {num_msgs_per_packet , non_neg_integer()} |
70939664 SK |
26 | . |
27 | ||
28 | -define(DEFAULT_DST_HOST, "localhost"). | |
29 | -define(DEFAULT_DST_PORT, 8125). | |
30 | -define(DEFAULT_SRC_PORT, 8124). | |
31 | ||
32 | -type metric_type() :: | |
33 | % TODO: Add other metric types | |
34 | gauge. | |
35 | ||
36 | -record(statsd_msg, | |
37 | { name :: binary() | |
38 | , value :: non_neg_integer() | |
39 | , type :: metric_type() | |
40 | }). | |
41 | ||
42 | -type statsd_msg() :: | |
43 | #statsd_msg{}. | |
44 | ||
45 | -record(state, | |
46 | { sock :: hope_option:t(gen_udp:socket()) | |
47 | , dst_host :: inet:ip_address() | inet:hostname() | |
48 | , dst_port :: inet:port_number() | |
49 | , src_port :: inet:port_number() | |
4d24f3b7 | 50 | , num_msgs_per_packet :: non_neg_integer() |
70939664 SK |
51 | }). |
52 | ||
53 | -type state() :: | |
54 | #state{}. | |
55 | ||
56 | -define(PATH_PREFIX, "beam_stats"). | |
57 | ||
58 | %% ============================================================================ | |
59 | %% Consumer implementation | |
60 | %% ============================================================================ | |
61 | ||
62 | -spec init([option()]) -> | |
76aefffb | 63 | {non_neg_integer(), state()}. |
70939664 SK |
64 | init(Options) -> |
65 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
66 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
67 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
68 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
4d24f3b7 | 69 | NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), |
70939664 SK |
70 | State = #state |
71 | { sock = none | |
72 | , dst_host = DstHost | |
73 | , dst_port = DstPort | |
74 | , src_port = SrcPort | |
4d24f3b7 | 75 | , num_msgs_per_packet = NumMsgsPerPacket |
70939664 SK |
76 | }, |
77 | {ConsumptionInterval, State}. | |
78 | ||
79 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
80 | state(). | |
4d24f3b7 SK |
81 | consume(Q, #state{num_msgs_per_packet=NumMsgsPerPacket}=State) -> |
82 | Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket), | |
83 | lists:foldl(fun try_to_connect_and_send/2, State, Packets). | |
70939664 SK |
84 | |
85 | -spec terminate(state()) -> | |
86 | {}. | |
87 | terminate(#state{sock=SockOpt}) -> | |
88 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
89 | {}. | |
90 | ||
91 | %% ============================================================================ | |
92 | %% Transport | |
93 | %% ============================================================================ | |
94 | ||
4d24f3b7 SK |
95 | -spec try_to_connect_and_send(binary(), state()) -> |
96 | state(). | |
97 | try_to_connect_and_send(<<Payload/binary>>, #state{}=State1) -> | |
98 | State2 = try_to_connect_if_no_socket(State1), | |
99 | try_to_send(State2, Payload). | |
100 | ||
70939664 SK |
101 | -spec try_to_send(state(), binary()) -> |
102 | state(). | |
103 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 104 | ?log_error("Sending failed. No socket in state."), |
70939664 SK |
105 | % TODO: Maybe schedule retry? |
106 | State; | |
107 | try_to_send( | |
108 | #state | |
109 | { sock = {some, Sock} | |
110 | , dst_host = DstHost | |
111 | , dst_port = DstPort | |
112 | }=State, | |
113 | Payload | |
114 | ) -> | |
115 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
116 | of ok -> | |
117 | State | |
118 | ; {error, _}=Error -> | |
f079a56c SK |
119 | ?log_error( |
120 | "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", | |
121 | [Sock, DstHost, DstPort, Error] | |
122 | ), | |
70939664 SK |
123 | % TODO: Do something with unsent messages? |
124 | ok = gen_udp:close(Sock), | |
125 | State#state{sock=none} | |
126 | end. | |
127 | ||
128 | -spec try_to_connect_if_no_socket(state()) -> | |
129 | state(). | |
130 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
131 | State; | |
132 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
133 | case gen_udp:open(SrcPort) | |
134 | of {ok, Sock} -> | |
135 | State#state{sock = {some, Sock}} | |
136 | ; {error, _}=Error -> | |
f079a56c | 137 | ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), |
70939664 SK |
138 | State#state{sock = none} |
139 | end. | |
140 | ||
141 | %% ============================================================================ | |
142 | %% Serialization | |
143 | %% ============================================================================ | |
144 | ||
4d24f3b7 SK |
145 | -spec beam_stats_queue_to_packets(beam_stats_consumer:queue(), non_neg_integer()) -> |
146 | [binary()]. | |
147 | beam_stats_queue_to_packets(Q, NumMsgsPerPacket) -> | |
148 | MsgBins = lists:append([beam_stats_to_bins(B) || B <- queue:to_list(Q)]), | |
149 | MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), | |
150 | lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). | |
70939664 SK |
151 | |
152 | -spec beam_stats_to_bins(beam_stats:t()) -> | |
153 | [binary()]. | |
154 | beam_stats_to_bins(#beam_stats | |
155 | { node_id = NodeID | |
156 | , memory = Memory | |
b4e2333f SK |
157 | , io_bytes_in = IOBytesIn |
158 | , io_bytes_out = IOBytesOut | |
3fe887d7 | 159 | , context_switches = ContextSwitches |
142c0796 | 160 | , reductions = Reductions |
deefeb3c | 161 | , run_queue = RunQueue |
a8d431d1 | 162 | , ets = ETS |
70939664 SK |
163 | } |
164 | ) -> | |
165 | NodeIDBin = node_id_to_bin(NodeID), | |
b4e2333f SK |
166 | Msgs1 = |
167 | [ io_bytes_in_to_msg(IOBytesIn) | |
168 | , io_bytes_out_to_msg(IOBytesOut) | |
3fe887d7 | 169 | , context_switches_to_msg(ContextSwitches) |
142c0796 | 170 | , reductions_to_msg(Reductions) |
deefeb3c | 171 | , run_queue_to_msg(RunQueue) |
b4e2333f | 172 | | memory_to_msgs(Memory) |
a8d431d1 SK |
173 | ] |
174 | ++ ets_to_msgs(ETS), | |
70939664 SK |
175 | Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], |
176 | [statsd_msg_to_bin(M) || M <- Msgs2]. | |
177 | ||
deefeb3c SK |
178 | -spec run_queue_to_msg(non_neg_integer()) -> |
179 | statsd_msg(). | |
180 | run_queue_to_msg(RunQueue) -> | |
181 | #statsd_msg | |
182 | { name = <<"run_queue">> | |
183 | , value = RunQueue | |
184 | , type = gauge | |
185 | }. | |
186 | ||
142c0796 SK |
187 | -spec reductions_to_msg(non_neg_integer()) -> |
188 | statsd_msg(). | |
189 | reductions_to_msg(Reductions) -> | |
190 | #statsd_msg | |
191 | { name = <<"reductions">> | |
192 | , value = Reductions | |
193 | , type = gauge | |
194 | }. | |
195 | ||
3fe887d7 SK |
196 | -spec context_switches_to_msg(non_neg_integer()) -> |
197 | statsd_msg(). | |
198 | context_switches_to_msg(ContextSwitches) -> | |
199 | #statsd_msg | |
200 | { name = <<"context_switches">> | |
201 | , value = ContextSwitches | |
202 | , type = gauge | |
203 | }. | |
204 | ||
b4e2333f SK |
205 | -spec io_bytes_in_to_msg(non_neg_integer()) -> |
206 | statsd_msg(). | |
207 | io_bytes_in_to_msg(IOBytesIn) -> | |
208 | #statsd_msg | |
209 | { name = <<"io.bytes_in">> | |
210 | , value = IOBytesIn | |
211 | , type = gauge | |
212 | }. | |
213 | ||
214 | -spec io_bytes_out_to_msg(non_neg_integer()) -> | |
215 | statsd_msg(). | |
216 | io_bytes_out_to_msg(IOBytesOut) -> | |
217 | #statsd_msg | |
218 | { name = <<"io.bytes_out">> | |
219 | , value = IOBytesOut | |
220 | , type = gauge | |
221 | }. | |
222 | ||
a8d431d1 SK |
223 | -spec ets_to_msgs(beam_stats_ets:t()) -> |
224 | [statsd_msg()]. | |
225 | ets_to_msgs(PerTableStats) -> | |
226 | NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), | |
227 | lists:append(NestedMsgs). | |
228 | ||
229 | -spec ets_table_to_msgs(beam_stats_ets_table:t()) -> | |
230 | [statsd_msg()]. | |
231 | ets_table_to_msgs(#beam_stats_ets_table | |
232 | { id = ID | |
233 | , name = Name | |
234 | , size = Size | |
235 | , memory = Memory | |
236 | } | |
237 | ) -> | |
238 | IDBin = beam_stats_ets_table:id_to_bin(ID), | |
239 | NameBin = atom_to_binary(Name, latin1), | |
240 | NameAndID = <<NameBin/binary, ".", IDBin/binary>>, | |
241 | SizeMsg = | |
242 | #statsd_msg | |
243 | { name = <<"ets_table.size.", NameAndID/binary>> | |
244 | , value = Size | |
245 | , type = gauge | |
246 | }, | |
247 | MemoryMsg = | |
248 | #statsd_msg | |
249 | { name = <<"ets_table.memory.", NameAndID/binary>> | |
250 | , value = Memory | |
251 | , type = gauge | |
252 | }, | |
253 | [SizeMsg, MemoryMsg]. | |
254 | ||
e7b39f07 | 255 | -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> |
70939664 SK |
256 | [statsd_msg()]. |
257 | memory_to_msgs(Memory) -> | |
258 | [memory_component_to_statsd_msg(MC) || MC <- Memory]. | |
259 | ||
e7b39f07 | 260 | -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> |
70939664 SK |
261 | statsd_msg(). |
262 | memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> | |
101874c3 | 263 | MemTypeBin = atom_to_binary(MemType, latin1), |
70939664 | 264 | #statsd_msg |
101874c3 | 265 | { name = <<"memory.", MemTypeBin/binary>> |
70939664 SK |
266 | , value = MemSize |
267 | , type = gauge | |
268 | }. | |
269 | ||
270 | -spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> | |
271 | statsd_msg(). | |
272 | statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) -> | |
273 | Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>, | |
274 | Name2 = <<Prefix/binary, Name1/binary>>, | |
275 | Msg#statsd_msg{name=Name2}. | |
276 | ||
277 | -spec statsd_msg_to_bin(statsd_msg()) -> | |
278 | binary(). | |
279 | statsd_msg_to_bin( | |
280 | #statsd_msg | |
281 | { name = <<Name/binary>> | |
282 | , value = Value | |
283 | , type = Type = gauge | |
284 | } | |
285 | ) when Value >= 0 -> | |
286 | TypeBin = metric_type_to_bin(Type), | |
287 | ValueBin = integer_to_binary(Value), | |
288 | << Name/binary | |
289 | , ":" | |
290 | , ValueBin/binary | |
291 | , "|" | |
292 | , TypeBin/binary | |
293 | , "\n" | |
294 | >>. | |
295 | ||
296 | -spec metric_type_to_bin(metric_type()) -> | |
297 | binary(). | |
298 | metric_type_to_bin(gauge) -> | |
299 | <<"g">>. | |
300 | ||
301 | -spec node_id_to_bin(node()) -> | |
302 | binary(). | |
303 | node_id_to_bin(NodeID) -> | |
304 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
305 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). |