Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
a8d431d1 | 4 | -include("include/beam_stats_ets_table.hrl"). |
f079a56c | 5 | -include("beam_stats_logging.hrl"). |
70939664 SK |
6 | |
7 | -behaviour(beam_stats_consumer). | |
8 | ||
9 | -export_type( | |
10 | [ option/0 | |
11 | ]). | |
12 | ||
b0ab6ee2 | 13 | %% Consumer interface |
70939664 SK |
14 | -export( |
15 | [ init/1 | |
16 | , consume/2 | |
17 | , terminate/1 | |
18 | ]). | |
19 | ||
20 | -type option() :: | |
76aefffb | 21 | {consumption_interval , non_neg_integer()} |
70939664 SK |
22 | | {dst_host , inet:ip_address() | inet:hostname()} |
23 | | {dst_port , inet:port_number()} | |
24 | | {src_port , inet:port_number()} | |
25 | . | |
26 | ||
27 | -define(DEFAULT_DST_HOST, "localhost"). | |
28 | -define(DEFAULT_DST_PORT, 8125). | |
29 | -define(DEFAULT_SRC_PORT, 8124). | |
30 | ||
31 | -type metric_type() :: | |
32 | % TODO: Add other metric types | |
33 | gauge. | |
34 | ||
35 | -record(statsd_msg, | |
36 | { name :: binary() | |
37 | , value :: non_neg_integer() | |
38 | , type :: metric_type() | |
39 | }). | |
40 | ||
41 | -type statsd_msg() :: | |
42 | #statsd_msg{}. | |
43 | ||
44 | -record(state, | |
45 | { sock :: hope_option:t(gen_udp:socket()) | |
46 | , dst_host :: inet:ip_address() | inet:hostname() | |
47 | , dst_port :: inet:port_number() | |
48 | , src_port :: inet:port_number() | |
49 | }). | |
50 | ||
51 | -type state() :: | |
52 | #state{}. | |
53 | ||
54 | -define(PATH_PREFIX, "beam_stats"). | |
55 | ||
56 | %% ============================================================================ | |
57 | %% Consumer implementation | |
58 | %% ============================================================================ | |
59 | ||
60 | -spec init([option()]) -> | |
76aefffb | 61 | {non_neg_integer(), state()}. |
70939664 SK |
62 | init(Options) -> |
63 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
64 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
65 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
66 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
67 | State = #state | |
68 | { sock = none | |
69 | , dst_host = DstHost | |
70 | , dst_port = DstPort | |
71 | , src_port = SrcPort | |
72 | }, | |
73 | {ConsumptionInterval, State}. | |
74 | ||
75 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
76 | state(). | |
77 | consume(Q, #state{}=State1) -> | |
78 | Payload = beam_stats_queue_to_binary(Q), | |
79 | State2 = try_to_connect_if_no_socket(State1), | |
80 | try_to_send(State2, Payload). | |
81 | ||
82 | -spec terminate(state()) -> | |
83 | {}. | |
84 | terminate(#state{sock=SockOpt}) -> | |
85 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
86 | {}. | |
87 | ||
88 | %% ============================================================================ | |
89 | %% Transport | |
90 | %% ============================================================================ | |
91 | ||
92 | -spec try_to_send(state(), binary()) -> | |
93 | state(). | |
94 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 95 | ?log_error("Sending failed. No socket in state."), |
70939664 SK |
96 | % TODO: Maybe schedule retry? |
97 | State; | |
98 | try_to_send( | |
99 | #state | |
100 | { sock = {some, Sock} | |
101 | , dst_host = DstHost | |
102 | , dst_port = DstPort | |
103 | }=State, | |
104 | Payload | |
105 | ) -> | |
106 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
107 | of ok -> | |
108 | State | |
109 | ; {error, _}=Error -> | |
f079a56c SK |
110 | ?log_error( |
111 | "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", | |
112 | [Sock, DstHost, DstPort, Error] | |
113 | ), | |
70939664 SK |
114 | % TODO: Do something with unsent messages? |
115 | ok = gen_udp:close(Sock), | |
116 | State#state{sock=none} | |
117 | end. | |
118 | ||
119 | -spec try_to_connect_if_no_socket(state()) -> | |
120 | state(). | |
121 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
122 | State; | |
123 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
124 | case gen_udp:open(SrcPort) | |
125 | of {ok, Sock} -> | |
126 | State#state{sock = {some, Sock}} | |
127 | ; {error, _}=Error -> | |
f079a56c | 128 | ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), |
70939664 SK |
129 | State#state{sock = none} |
130 | end. | |
131 | ||
132 | %% ============================================================================ | |
133 | %% Serialization | |
134 | %% ============================================================================ | |
135 | ||
136 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
137 | binary(). | |
138 | beam_stats_queue_to_binary(Q) -> | |
139 | iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). | |
140 | ||
141 | -spec beam_stats_to_bins(beam_stats:t()) -> | |
142 | [binary()]. | |
143 | beam_stats_to_bins(#beam_stats | |
144 | { node_id = NodeID | |
145 | , memory = Memory | |
b4e2333f SK |
146 | , io_bytes_in = IOBytesIn |
147 | , io_bytes_out = IOBytesOut | |
3fe887d7 | 148 | , context_switches = ContextSwitches |
142c0796 | 149 | , reductions = Reductions |
deefeb3c | 150 | , run_queue = RunQueue |
a8d431d1 | 151 | , ets = ETS |
70939664 SK |
152 | } |
153 | ) -> | |
154 | NodeIDBin = node_id_to_bin(NodeID), | |
b4e2333f SK |
155 | Msgs1 = |
156 | [ io_bytes_in_to_msg(IOBytesIn) | |
157 | , io_bytes_out_to_msg(IOBytesOut) | |
3fe887d7 | 158 | , context_switches_to_msg(ContextSwitches) |
142c0796 | 159 | , reductions_to_msg(Reductions) |
deefeb3c | 160 | , run_queue_to_msg(RunQueue) |
b4e2333f | 161 | | memory_to_msgs(Memory) |
a8d431d1 SK |
162 | ] |
163 | ++ ets_to_msgs(ETS), | |
70939664 SK |
164 | Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], |
165 | [statsd_msg_to_bin(M) || M <- Msgs2]. | |
166 | ||
deefeb3c SK |
167 | -spec run_queue_to_msg(non_neg_integer()) -> |
168 | statsd_msg(). | |
169 | run_queue_to_msg(RunQueue) -> | |
170 | #statsd_msg | |
171 | { name = <<"run_queue">> | |
172 | , value = RunQueue | |
173 | , type = gauge | |
174 | }. | |
175 | ||
142c0796 SK |
176 | -spec reductions_to_msg(non_neg_integer()) -> |
177 | statsd_msg(). | |
178 | reductions_to_msg(Reductions) -> | |
179 | #statsd_msg | |
180 | { name = <<"reductions">> | |
181 | , value = Reductions | |
182 | , type = gauge | |
183 | }. | |
184 | ||
3fe887d7 SK |
185 | -spec context_switches_to_msg(non_neg_integer()) -> |
186 | statsd_msg(). | |
187 | context_switches_to_msg(ContextSwitches) -> | |
188 | #statsd_msg | |
189 | { name = <<"context_switches">> | |
190 | , value = ContextSwitches | |
191 | , type = gauge | |
192 | }. | |
193 | ||
b4e2333f SK |
194 | -spec io_bytes_in_to_msg(non_neg_integer()) -> |
195 | statsd_msg(). | |
196 | io_bytes_in_to_msg(IOBytesIn) -> | |
197 | #statsd_msg | |
198 | { name = <<"io.bytes_in">> | |
199 | , value = IOBytesIn | |
200 | , type = gauge | |
201 | }. | |
202 | ||
203 | -spec io_bytes_out_to_msg(non_neg_integer()) -> | |
204 | statsd_msg(). | |
205 | io_bytes_out_to_msg(IOBytesOut) -> | |
206 | #statsd_msg | |
207 | { name = <<"io.bytes_out">> | |
208 | , value = IOBytesOut | |
209 | , type = gauge | |
210 | }. | |
211 | ||
a8d431d1 SK |
212 | -spec ets_to_msgs(beam_stats_ets:t()) -> |
213 | [statsd_msg()]. | |
214 | ets_to_msgs(PerTableStats) -> | |
215 | NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), | |
216 | lists:append(NestedMsgs). | |
217 | ||
218 | -spec ets_table_to_msgs(beam_stats_ets_table:t()) -> | |
219 | [statsd_msg()]. | |
220 | ets_table_to_msgs(#beam_stats_ets_table | |
221 | { id = ID | |
222 | , name = Name | |
223 | , size = Size | |
224 | , memory = Memory | |
225 | } | |
226 | ) -> | |
227 | IDBin = beam_stats_ets_table:id_to_bin(ID), | |
228 | NameBin = atom_to_binary(Name, latin1), | |
229 | NameAndID = <<NameBin/binary, ".", IDBin/binary>>, | |
230 | SizeMsg = | |
231 | #statsd_msg | |
232 | { name = <<"ets_table.size.", NameAndID/binary>> | |
233 | , value = Size | |
234 | , type = gauge | |
235 | }, | |
236 | MemoryMsg = | |
237 | #statsd_msg | |
238 | { name = <<"ets_table.memory.", NameAndID/binary>> | |
239 | , value = Memory | |
240 | , type = gauge | |
241 | }, | |
242 | [SizeMsg, MemoryMsg]. | |
243 | ||
e7b39f07 | 244 | -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> |
70939664 SK |
245 | [statsd_msg()]. |
246 | memory_to_msgs(Memory) -> | |
247 | [memory_component_to_statsd_msg(MC) || MC <- Memory]. | |
248 | ||
e7b39f07 | 249 | -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> |
70939664 SK |
250 | statsd_msg(). |
251 | memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> | |
101874c3 | 252 | MemTypeBin = atom_to_binary(MemType, latin1), |
70939664 | 253 | #statsd_msg |
101874c3 | 254 | { name = <<"memory.", MemTypeBin/binary>> |
70939664 SK |
255 | , value = MemSize |
256 | , type = gauge | |
257 | }. | |
258 | ||
259 | -spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> | |
260 | statsd_msg(). | |
261 | statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) -> | |
262 | Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>, | |
263 | Name2 = <<Prefix/binary, Name1/binary>>, | |
264 | Msg#statsd_msg{name=Name2}. | |
265 | ||
266 | -spec statsd_msg_to_bin(statsd_msg()) -> | |
267 | binary(). | |
268 | statsd_msg_to_bin( | |
269 | #statsd_msg | |
270 | { name = <<Name/binary>> | |
271 | , value = Value | |
272 | , type = Type = gauge | |
273 | } | |
274 | ) when Value >= 0 -> | |
275 | TypeBin = metric_type_to_bin(Type), | |
276 | ValueBin = integer_to_binary(Value), | |
277 | << Name/binary | |
278 | , ":" | |
279 | , ValueBin/binary | |
280 | , "|" | |
281 | , TypeBin/binary | |
282 | , "\n" | |
283 | >>. | |
284 | ||
285 | -spec metric_type_to_bin(metric_type()) -> | |
286 | binary(). | |
287 | metric_type_to_bin(gauge) -> | |
288 | <<"g">>. | |
289 | ||
290 | -spec node_id_to_bin(node()) -> | |
291 | binary(). | |
292 | node_id_to_bin(NodeID) -> | |
293 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
294 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). |