Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
f079a56c | 4 | -include("beam_stats_logging.hrl"). |
70939664 SK |
5 | |
6 | -behaviour(beam_stats_consumer). | |
7 | ||
8 | -export_type( | |
9 | [ option/0 | |
10 | ]). | |
11 | ||
b0ab6ee2 | 12 | %% Consumer interface |
70939664 SK |
13 | -export( |
14 | [ init/1 | |
15 | , consume/2 | |
16 | , terminate/1 | |
17 | ]). | |
18 | ||
b0ab6ee2 SK |
19 | %% Tests (to be run by CT): |
20 | -export( | |
21 | [ ct_test__beam_stats_to_bins/1 | |
22 | , ct_test__memory_component_to_statsd_msg/1 | |
23 | , ct_test__statsd_msg_add_name_prefix/1 | |
24 | , ct_test__statsd_msg_to_bin/1 | |
25 | , ct_test__node_id_to_bin/1 | |
26 | ]). | |
27 | ||
70939664 | 28 | -type option() :: |
76aefffb | 29 | {consumption_interval , non_neg_integer()} |
70939664 SK |
30 | | {dst_host , inet:ip_address() | inet:hostname()} |
31 | | {dst_port , inet:port_number()} | |
32 | | {src_port , inet:port_number()} | |
33 | . | |
34 | ||
35 | -define(DEFAULT_DST_HOST, "localhost"). | |
36 | -define(DEFAULT_DST_PORT, 8125). | |
37 | -define(DEFAULT_SRC_PORT, 8124). | |
38 | ||
39 | -type metric_type() :: | |
40 | % TODO: Add other metric types | |
41 | gauge. | |
42 | ||
43 | -record(statsd_msg, | |
44 | { name :: binary() | |
45 | , value :: non_neg_integer() | |
46 | , type :: metric_type() | |
47 | }). | |
48 | ||
49 | -type statsd_msg() :: | |
50 | #statsd_msg{}. | |
51 | ||
52 | -record(state, | |
53 | { sock :: hope_option:t(gen_udp:socket()) | |
54 | , dst_host :: inet:ip_address() | inet:hostname() | |
55 | , dst_port :: inet:port_number() | |
56 | , src_port :: inet:port_number() | |
57 | }). | |
58 | ||
59 | -type state() :: | |
60 | #state{}. | |
61 | ||
62 | -define(PATH_PREFIX, "beam_stats"). | |
63 | ||
64 | %% ============================================================================ | |
65 | %% Consumer implementation | |
66 | %% ============================================================================ | |
67 | ||
68 | -spec init([option()]) -> | |
76aefffb | 69 | {non_neg_integer(), state()}. |
70939664 SK |
70 | init(Options) -> |
71 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
72 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
73 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
74 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
75 | State = #state | |
76 | { sock = none | |
77 | , dst_host = DstHost | |
78 | , dst_port = DstPort | |
79 | , src_port = SrcPort | |
80 | }, | |
81 | {ConsumptionInterval, State}. | |
82 | ||
83 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
84 | state(). | |
85 | consume(Q, #state{}=State1) -> | |
86 | Payload = beam_stats_queue_to_binary(Q), | |
87 | State2 = try_to_connect_if_no_socket(State1), | |
88 | try_to_send(State2, Payload). | |
89 | ||
90 | -spec terminate(state()) -> | |
91 | {}. | |
92 | terminate(#state{sock=SockOpt}) -> | |
93 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
94 | {}. | |
95 | ||
96 | %% ============================================================================ | |
97 | %% Transport | |
98 | %% ============================================================================ | |
99 | ||
100 | -spec try_to_send(state(), binary()) -> | |
101 | state(). | |
102 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 103 | ?log_error("Sending failed. No socket in state."), |
70939664 SK |
104 | % TODO: Maybe schedule retry? |
105 | State; | |
106 | try_to_send( | |
107 | #state | |
108 | { sock = {some, Sock} | |
109 | , dst_host = DstHost | |
110 | , dst_port = DstPort | |
111 | }=State, | |
112 | Payload | |
113 | ) -> | |
114 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
115 | of ok -> | |
116 | State | |
117 | ; {error, _}=Error -> | |
f079a56c SK |
118 | ?log_error( |
119 | "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", | |
120 | [Sock, DstHost, DstPort, Error] | |
121 | ), | |
70939664 SK |
122 | % TODO: Do something with unsent messages? |
123 | ok = gen_udp:close(Sock), | |
124 | State#state{sock=none} | |
125 | end. | |
126 | ||
127 | -spec try_to_connect_if_no_socket(state()) -> | |
128 | state(). | |
129 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
130 | State; | |
131 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
132 | case gen_udp:open(SrcPort) | |
133 | of {ok, Sock} -> | |
134 | State#state{sock = {some, Sock}} | |
135 | ; {error, _}=Error -> | |
f079a56c | 136 | ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), |
70939664 SK |
137 | State#state{sock = none} |
138 | end. | |
139 | ||
140 | %% ============================================================================ | |
141 | %% Serialization | |
142 | %% ============================================================================ | |
143 | ||
144 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
145 | binary(). | |
146 | beam_stats_queue_to_binary(Q) -> | |
147 | iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). | |
148 | ||
149 | -spec beam_stats_to_bins(beam_stats:t()) -> | |
150 | [binary()]. | |
151 | beam_stats_to_bins(#beam_stats | |
152 | { node_id = NodeID | |
153 | , memory = Memory | |
154 | } | |
155 | ) -> | |
156 | NodeIDBin = node_id_to_bin(NodeID), | |
157 | Msgs1 = memory_to_msgs(Memory), | |
158 | Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], | |
159 | [statsd_msg_to_bin(M) || M <- Msgs2]. | |
160 | ||
e7b39f07 | 161 | -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> |
70939664 SK |
162 | [statsd_msg()]. |
163 | memory_to_msgs(Memory) -> | |
164 | [memory_component_to_statsd_msg(MC) || MC <- Memory]. | |
165 | ||
e7b39f07 | 166 | -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> |
70939664 SK |
167 | statsd_msg(). |
168 | memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> | |
169 | #statsd_msg | |
170 | { name = atom_to_binary(MemType, latin1) | |
171 | , value = MemSize | |
172 | , type = gauge | |
173 | }. | |
174 | ||
175 | -spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> | |
176 | statsd_msg(). | |
177 | statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) -> | |
178 | Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>, | |
179 | Name2 = <<Prefix/binary, Name1/binary>>, | |
180 | Msg#statsd_msg{name=Name2}. | |
181 | ||
182 | -spec statsd_msg_to_bin(statsd_msg()) -> | |
183 | binary(). | |
184 | statsd_msg_to_bin( | |
185 | #statsd_msg | |
186 | { name = <<Name/binary>> | |
187 | , value = Value | |
188 | , type = Type = gauge | |
189 | } | |
190 | ) when Value >= 0 -> | |
191 | TypeBin = metric_type_to_bin(Type), | |
192 | ValueBin = integer_to_binary(Value), | |
193 | << Name/binary | |
194 | , ":" | |
195 | , ValueBin/binary | |
196 | , "|" | |
197 | , TypeBin/binary | |
198 | , "\n" | |
199 | >>. | |
200 | ||
201 | -spec metric_type_to_bin(metric_type()) -> | |
202 | binary(). | |
203 | metric_type_to_bin(gauge) -> | |
204 | <<"g">>. | |
205 | ||
206 | -spec node_id_to_bin(node()) -> | |
207 | binary(). | |
208 | node_id_to_bin(NodeID) -> | |
209 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
210 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). | |
b0ab6ee2 SK |
211 | |
212 | %% ============================================================================ | |
213 | %% Tests | |
214 | %% ============================================================================ | |
215 | ||
216 | ct_test__beam_stats_to_bins(_Cfg) -> | |
217 | BEAMStats = #beam_stats{node_id = 'node@host.local', memory = [{foo,1}]}, | |
218 | [<<?PATH_PREFIX, ".node_host_local.foo:1|g\n">>] = | |
219 | beam_stats_to_bins(BEAMStats). | |
220 | ||
221 | ct_test__memory_component_to_statsd_msg(_Cfg) -> | |
222 | #statsd_msg{name = <<"foo">>, value = 1, type = gauge} = | |
223 | memory_component_to_statsd_msg({foo, 1}). | |
224 | ||
225 | ct_test__statsd_msg_add_name_prefix(_Cfg) -> | |
226 | Msg1 = #statsd_msg{name = <<"foo">>, value = 1, type = gauge}, | |
227 | Msg2 = statsd_msg_add_name_prefix(Msg1, <<"bar">>), | |
228 | <<?PATH_PREFIX,".bar.foo">> = Msg2#statsd_msg.name. | |
229 | ||
230 | ct_test__statsd_msg_to_bin(_Cfg) -> | |
231 | Msg = #statsd_msg{name = <<"foo">>, value = 1, type = gauge}, | |
232 | <<"foo:1|g\n">> = statsd_msg_to_bin(Msg). | |
233 | ||
234 | ct_test__node_id_to_bin(_Cfg) -> | |
235 | <<"foo_bar_local">> = node_id_to_bin('foo@bar.local'). |