Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
4 | ||
5 | -behaviour(beam_stats_consumer). | |
6 | ||
7 | -export_type( | |
8 | [ option/0 | |
9 | ]). | |
10 | ||
b0ab6ee2 | 11 | %% Consumer interface |
70939664 SK |
12 | -export( |
13 | [ init/1 | |
14 | , consume/2 | |
15 | , terminate/1 | |
16 | ]). | |
17 | ||
b0ab6ee2 SK |
18 | %% Tests (to be run by CT): |
19 | -export( | |
20 | [ ct_test__beam_stats_to_bins/1 | |
21 | , ct_test__memory_component_to_statsd_msg/1 | |
22 | , ct_test__statsd_msg_add_name_prefix/1 | |
23 | , ct_test__statsd_msg_to_bin/1 | |
24 | , ct_test__node_id_to_bin/1 | |
25 | ]). | |
26 | ||
70939664 SK |
27 | -type option() :: |
28 | {consumption_interval , erlang:time()} | |
29 | | {dst_host , inet:ip_address() | inet:hostname()} | |
30 | | {dst_port , inet:port_number()} | |
31 | | {src_port , inet:port_number()} | |
32 | . | |
33 | ||
34 | -define(DEFAULT_DST_HOST, "localhost"). | |
35 | -define(DEFAULT_DST_PORT, 8125). | |
36 | -define(DEFAULT_SRC_PORT, 8124). | |
37 | ||
38 | -type metric_type() :: | |
39 | % TODO: Add other metric types | |
40 | gauge. | |
41 | ||
42 | -record(statsd_msg, | |
43 | { name :: binary() | |
44 | , value :: non_neg_integer() | |
45 | , type :: metric_type() | |
46 | }). | |
47 | ||
48 | -type statsd_msg() :: | |
49 | #statsd_msg{}. | |
50 | ||
51 | -record(state, | |
52 | { sock :: hope_option:t(gen_udp:socket()) | |
53 | , dst_host :: inet:ip_address() | inet:hostname() | |
54 | , dst_port :: inet:port_number() | |
55 | , src_port :: inet:port_number() | |
56 | }). | |
57 | ||
58 | -type state() :: | |
59 | #state{}. | |
60 | ||
61 | -define(PATH_PREFIX, "beam_stats"). | |
62 | ||
63 | %% ============================================================================ | |
64 | %% Consumer implementation | |
65 | %% ============================================================================ | |
66 | ||
67 | -spec init([option()]) -> | |
68 | {erlang:time(), state()}. | |
69 | init(Options) -> | |
70 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
71 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
72 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
73 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
74 | State = #state | |
75 | { sock = none | |
76 | , dst_host = DstHost | |
77 | , dst_port = DstPort | |
78 | , src_port = SrcPort | |
79 | }, | |
80 | {ConsumptionInterval, State}. | |
81 | ||
82 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
83 | state(). | |
84 | consume(Q, #state{}=State1) -> | |
85 | Payload = beam_stats_queue_to_binary(Q), | |
86 | State2 = try_to_connect_if_no_socket(State1), | |
87 | try_to_send(State2, Payload). | |
88 | ||
89 | -spec terminate(state()) -> | |
90 | {}. | |
91 | terminate(#state{sock=SockOpt}) -> | |
92 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
93 | {}. | |
94 | ||
95 | %% ============================================================================ | |
96 | %% Transport | |
97 | %% ============================================================================ | |
98 | ||
99 | -spec try_to_send(state(), binary()) -> | |
100 | state(). | |
101 | try_to_send(#state{sock=none}=State, _) -> | |
102 | io:format("error: socket closed~n"), | |
103 | % TODO: Maybe schedule retry? | |
104 | State; | |
105 | try_to_send( | |
106 | #state | |
107 | { sock = {some, Sock} | |
108 | , dst_host = DstHost | |
109 | , dst_port = DstPort | |
110 | }=State, | |
111 | Payload | |
112 | ) -> | |
113 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
114 | of ok -> | |
115 | State | |
116 | ; {error, _}=Error -> | |
117 | io:format("error: gen_udp:send/4 failed: ~p~n", [Error]), | |
118 | % TODO: Do something with unsent messages? | |
119 | ok = gen_udp:close(Sock), | |
120 | State#state{sock=none} | |
121 | end. | |
122 | ||
123 | -spec try_to_connect_if_no_socket(state()) -> | |
124 | state(). | |
125 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
126 | State; | |
127 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
128 | case gen_udp:open(SrcPort) | |
129 | of {ok, Sock} -> | |
130 | State#state{sock = {some, Sock}} | |
131 | ; {error, _}=Error -> | |
132 | io:format("error: gen_udp:open/1 failed: ~p~n", [Error]), | |
133 | State#state{sock = none} | |
134 | end. | |
135 | ||
136 | %% ============================================================================ | |
137 | %% Serialization | |
138 | %% ============================================================================ | |
139 | ||
140 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
141 | binary(). | |
142 | beam_stats_queue_to_binary(Q) -> | |
143 | iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). | |
144 | ||
145 | -spec beam_stats_to_bins(beam_stats:t()) -> | |
146 | [binary()]. | |
147 | beam_stats_to_bins(#beam_stats | |
148 | { node_id = NodeID | |
149 | , memory = Memory | |
150 | } | |
151 | ) -> | |
152 | NodeIDBin = node_id_to_bin(NodeID), | |
153 | Msgs1 = memory_to_msgs(Memory), | |
154 | Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], | |
155 | [statsd_msg_to_bin(M) || M <- Msgs2]. | |
156 | ||
e7b39f07 | 157 | -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> |
70939664 SK |
158 | [statsd_msg()]. |
159 | memory_to_msgs(Memory) -> | |
160 | [memory_component_to_statsd_msg(MC) || MC <- Memory]. | |
161 | ||
e7b39f07 | 162 | -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> |
70939664 SK |
163 | statsd_msg(). |
164 | memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> | |
165 | #statsd_msg | |
166 | { name = atom_to_binary(MemType, latin1) | |
167 | , value = MemSize | |
168 | , type = gauge | |
169 | }. | |
170 | ||
171 | -spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> | |
172 | statsd_msg(). | |
173 | statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) -> | |
174 | Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>, | |
175 | Name2 = <<Prefix/binary, Name1/binary>>, | |
176 | Msg#statsd_msg{name=Name2}. | |
177 | ||
178 | -spec statsd_msg_to_bin(statsd_msg()) -> | |
179 | binary(). | |
180 | statsd_msg_to_bin( | |
181 | #statsd_msg | |
182 | { name = <<Name/binary>> | |
183 | , value = Value | |
184 | , type = Type = gauge | |
185 | } | |
186 | ) when Value >= 0 -> | |
187 | TypeBin = metric_type_to_bin(Type), | |
188 | ValueBin = integer_to_binary(Value), | |
189 | << Name/binary | |
190 | , ":" | |
191 | , ValueBin/binary | |
192 | , "|" | |
193 | , TypeBin/binary | |
194 | , "\n" | |
195 | >>. | |
196 | ||
197 | -spec metric_type_to_bin(metric_type()) -> | |
198 | binary(). | |
199 | metric_type_to_bin(gauge) -> | |
200 | <<"g">>. | |
201 | ||
202 | -spec node_id_to_bin(node()) -> | |
203 | binary(). | |
204 | node_id_to_bin(NodeID) -> | |
205 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
206 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). | |
b0ab6ee2 SK |
207 | |
208 | %% ============================================================================ | |
209 | %% Tests | |
210 | %% ============================================================================ | |
211 | ||
212 | ct_test__beam_stats_to_bins(_Cfg) -> | |
213 | BEAMStats = #beam_stats{node_id = 'node@host.local', memory = [{foo,1}]}, | |
214 | [<<?PATH_PREFIX, ".node_host_local.foo:1|g\n">>] = | |
215 | beam_stats_to_bins(BEAMStats). | |
216 | ||
217 | ct_test__memory_component_to_statsd_msg(_Cfg) -> | |
218 | #statsd_msg{name = <<"foo">>, value = 1, type = gauge} = | |
219 | memory_component_to_statsd_msg({foo, 1}). | |
220 | ||
221 | ct_test__statsd_msg_add_name_prefix(_Cfg) -> | |
222 | Msg1 = #statsd_msg{name = <<"foo">>, value = 1, type = gauge}, | |
223 | Msg2 = statsd_msg_add_name_prefix(Msg1, <<"bar">>), | |
224 | <<?PATH_PREFIX,".bar.foo">> = Msg2#statsd_msg.name. | |
225 | ||
226 | ct_test__statsd_msg_to_bin(_Cfg) -> | |
227 | Msg = #statsd_msg{name = <<"foo">>, value = 1, type = gauge}, | |
228 | <<"foo:1|g\n">> = statsd_msg_to_bin(Msg). | |
229 | ||
230 | ct_test__node_id_to_bin(_Cfg) -> | |
231 | <<"foo_bar_local">> = node_id_to_bin('foo@bar.local'). |