Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
f079a56c | 4 | -include("beam_stats_logging.hrl"). |
70939664 SK |
5 | |
6 | -behaviour(beam_stats_consumer). | |
7 | ||
8 | -export_type( | |
9 | [ option/0 | |
10 | ]). | |
11 | ||
b0ab6ee2 | 12 | %% Consumer interface |
70939664 SK |
13 | -export( |
14 | [ init/1 | |
15 | , consume/2 | |
16 | , terminate/1 | |
17 | ]). | |
18 | ||
19 | -type option() :: | |
76aefffb | 20 | {consumption_interval , non_neg_integer()} |
70939664 SK |
21 | | {dst_host , inet:ip_address() | inet:hostname()} |
22 | | {dst_port , inet:port_number()} | |
23 | | {src_port , inet:port_number()} | |
24 | . | |
25 | ||
26 | -define(DEFAULT_DST_HOST, "localhost"). | |
27 | -define(DEFAULT_DST_PORT, 8125). | |
28 | -define(DEFAULT_SRC_PORT, 8124). | |
29 | ||
30 | -type metric_type() :: | |
31 | % TODO: Add other metric types | |
32 | gauge. | |
33 | ||
34 | -record(statsd_msg, | |
35 | { name :: binary() | |
36 | , value :: non_neg_integer() | |
37 | , type :: metric_type() | |
38 | }). | |
39 | ||
40 | -type statsd_msg() :: | |
41 | #statsd_msg{}. | |
42 | ||
43 | -record(state, | |
44 | { sock :: hope_option:t(gen_udp:socket()) | |
45 | , dst_host :: inet:ip_address() | inet:hostname() | |
46 | , dst_port :: inet:port_number() | |
47 | , src_port :: inet:port_number() | |
48 | }). | |
49 | ||
50 | -type state() :: | |
51 | #state{}. | |
52 | ||
53 | -define(PATH_PREFIX, "beam_stats"). | |
54 | ||
55 | %% ============================================================================ | |
56 | %% Consumer implementation | |
57 | %% ============================================================================ | |
58 | ||
59 | -spec init([option()]) -> | |
76aefffb | 60 | {non_neg_integer(), state()}. |
70939664 SK |
61 | init(Options) -> |
62 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
63 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
64 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
65 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
66 | State = #state | |
67 | { sock = none | |
68 | , dst_host = DstHost | |
69 | , dst_port = DstPort | |
70 | , src_port = SrcPort | |
71 | }, | |
72 | {ConsumptionInterval, State}. | |
73 | ||
74 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
75 | state(). | |
76 | consume(Q, #state{}=State1) -> | |
77 | Payload = beam_stats_queue_to_binary(Q), | |
78 | State2 = try_to_connect_if_no_socket(State1), | |
79 | try_to_send(State2, Payload). | |
80 | ||
81 | -spec terminate(state()) -> | |
82 | {}. | |
83 | terminate(#state{sock=SockOpt}) -> | |
84 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
85 | {}. | |
86 | ||
87 | %% ============================================================================ | |
88 | %% Transport | |
89 | %% ============================================================================ | |
90 | ||
91 | -spec try_to_send(state(), binary()) -> | |
92 | state(). | |
93 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 94 | ?log_error("Sending failed. No socket in state."), |
70939664 SK |
95 | % TODO: Maybe schedule retry? |
96 | State; | |
97 | try_to_send( | |
98 | #state | |
99 | { sock = {some, Sock} | |
100 | , dst_host = DstHost | |
101 | , dst_port = DstPort | |
102 | }=State, | |
103 | Payload | |
104 | ) -> | |
105 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
106 | of ok -> | |
107 | State | |
108 | ; {error, _}=Error -> | |
f079a56c SK |
109 | ?log_error( |
110 | "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", | |
111 | [Sock, DstHost, DstPort, Error] | |
112 | ), | |
70939664 SK |
113 | % TODO: Do something with unsent messages? |
114 | ok = gen_udp:close(Sock), | |
115 | State#state{sock=none} | |
116 | end. | |
117 | ||
118 | -spec try_to_connect_if_no_socket(state()) -> | |
119 | state(). | |
120 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
121 | State; | |
122 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
123 | case gen_udp:open(SrcPort) | |
124 | of {ok, Sock} -> | |
125 | State#state{sock = {some, Sock}} | |
126 | ; {error, _}=Error -> | |
f079a56c | 127 | ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), |
70939664 SK |
128 | State#state{sock = none} |
129 | end. | |
130 | ||
131 | %% ============================================================================ | |
132 | %% Serialization | |
133 | %% ============================================================================ | |
134 | ||
135 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
136 | binary(). | |
137 | beam_stats_queue_to_binary(Q) -> | |
138 | iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). | |
139 | ||
140 | -spec beam_stats_to_bins(beam_stats:t()) -> | |
141 | [binary()]. | |
142 | beam_stats_to_bins(#beam_stats | |
143 | { node_id = NodeID | |
144 | , memory = Memory | |
b4e2333f SK |
145 | , io_bytes_in = IOBytesIn |
146 | , io_bytes_out = IOBytesOut | |
3fe887d7 | 147 | , context_switches = ContextSwitches |
142c0796 | 148 | , reductions = Reductions |
deefeb3c | 149 | , run_queue = RunQueue |
70939664 SK |
150 | } |
151 | ) -> | |
152 | NodeIDBin = node_id_to_bin(NodeID), | |
b4e2333f SK |
153 | Msgs1 = |
154 | [ io_bytes_in_to_msg(IOBytesIn) | |
155 | , io_bytes_out_to_msg(IOBytesOut) | |
3fe887d7 | 156 | , context_switches_to_msg(ContextSwitches) |
142c0796 | 157 | , reductions_to_msg(Reductions) |
deefeb3c | 158 | , run_queue_to_msg(RunQueue) |
b4e2333f SK |
159 | | memory_to_msgs(Memory) |
160 | ], | |
70939664 SK |
161 | Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], |
162 | [statsd_msg_to_bin(M) || M <- Msgs2]. | |
163 | ||
deefeb3c SK |
164 | -spec run_queue_to_msg(non_neg_integer()) -> |
165 | statsd_msg(). | |
166 | run_queue_to_msg(RunQueue) -> | |
167 | #statsd_msg | |
168 | { name = <<"run_queue">> | |
169 | , value = RunQueue | |
170 | , type = gauge | |
171 | }. | |
172 | ||
142c0796 SK |
173 | -spec reductions_to_msg(non_neg_integer()) -> |
174 | statsd_msg(). | |
175 | reductions_to_msg(Reductions) -> | |
176 | #statsd_msg | |
177 | { name = <<"reductions">> | |
178 | , value = Reductions | |
179 | , type = gauge | |
180 | }. | |
181 | ||
3fe887d7 SK |
182 | -spec context_switches_to_msg(non_neg_integer()) -> |
183 | statsd_msg(). | |
184 | context_switches_to_msg(ContextSwitches) -> | |
185 | #statsd_msg | |
186 | { name = <<"context_switches">> | |
187 | , value = ContextSwitches | |
188 | , type = gauge | |
189 | }. | |
190 | ||
b4e2333f SK |
191 | -spec io_bytes_in_to_msg(non_neg_integer()) -> |
192 | statsd_msg(). | |
193 | io_bytes_in_to_msg(IOBytesIn) -> | |
194 | #statsd_msg | |
195 | { name = <<"io.bytes_in">> | |
196 | , value = IOBytesIn | |
197 | , type = gauge | |
198 | }. | |
199 | ||
200 | -spec io_bytes_out_to_msg(non_neg_integer()) -> | |
201 | statsd_msg(). | |
202 | io_bytes_out_to_msg(IOBytesOut) -> | |
203 | #statsd_msg | |
204 | { name = <<"io.bytes_out">> | |
205 | , value = IOBytesOut | |
206 | , type = gauge | |
207 | }. | |
208 | ||
e7b39f07 | 209 | -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> |
70939664 SK |
210 | [statsd_msg()]. |
211 | memory_to_msgs(Memory) -> | |
212 | [memory_component_to_statsd_msg(MC) || MC <- Memory]. | |
213 | ||
e7b39f07 | 214 | -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> |
70939664 SK |
215 | statsd_msg(). |
216 | memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> | |
101874c3 | 217 | MemTypeBin = atom_to_binary(MemType, latin1), |
70939664 | 218 | #statsd_msg |
101874c3 | 219 | { name = <<"memory.", MemTypeBin/binary>> |
70939664 SK |
220 | , value = MemSize |
221 | , type = gauge | |
222 | }. | |
223 | ||
224 | -spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> | |
225 | statsd_msg(). | |
226 | statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) -> | |
227 | Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>, | |
228 | Name2 = <<Prefix/binary, Name1/binary>>, | |
229 | Msg#statsd_msg{name=Name2}. | |
230 | ||
231 | -spec statsd_msg_to_bin(statsd_msg()) -> | |
232 | binary(). | |
233 | statsd_msg_to_bin( | |
234 | #statsd_msg | |
235 | { name = <<Name/binary>> | |
236 | , value = Value | |
237 | , type = Type = gauge | |
238 | } | |
239 | ) when Value >= 0 -> | |
240 | TypeBin = metric_type_to_bin(Type), | |
241 | ValueBin = integer_to_binary(Value), | |
242 | << Name/binary | |
243 | , ":" | |
244 | , ValueBin/binary | |
245 | , "|" | |
246 | , TypeBin/binary | |
247 | , "\n" | |
248 | >>. | |
249 | ||
250 | -spec metric_type_to_bin(metric_type()) -> | |
251 | binary(). | |
252 | metric_type_to_bin(gauge) -> | |
253 | <<"g">>. | |
254 | ||
255 | -spec node_id_to_bin(node()) -> | |
256 | binary(). | |
257 | node_id_to_bin(NodeID) -> | |
258 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
259 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). |