Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
f079a56c | 4 | -include("beam_stats_logging.hrl"). |
70939664 SK |
5 | |
6 | -behaviour(beam_stats_consumer). | |
7 | ||
8 | -export_type( | |
9 | [ option/0 | |
10 | ]). | |
11 | ||
b0ab6ee2 | 12 | %% Consumer interface |
70939664 SK |
13 | -export( |
14 | [ init/1 | |
15 | , consume/2 | |
16 | , terminate/1 | |
17 | ]). | |
18 | ||
19 | -type option() :: | |
76aefffb | 20 | {consumption_interval , non_neg_integer()} |
70939664 SK |
21 | | {dst_host , inet:ip_address() | inet:hostname()} |
22 | | {dst_port , inet:port_number()} | |
23 | | {src_port , inet:port_number()} | |
24 | . | |
25 | ||
26 | -define(DEFAULT_DST_HOST, "localhost"). | |
27 | -define(DEFAULT_DST_PORT, 8125). | |
28 | -define(DEFAULT_SRC_PORT, 8124). | |
29 | ||
30 | -type metric_type() :: | |
31 | % TODO: Add other metric types | |
32 | gauge. | |
33 | ||
34 | -record(statsd_msg, | |
35 | { name :: binary() | |
36 | , value :: non_neg_integer() | |
37 | , type :: metric_type() | |
38 | }). | |
39 | ||
40 | -type statsd_msg() :: | |
41 | #statsd_msg{}. | |
42 | ||
43 | -record(state, | |
44 | { sock :: hope_option:t(gen_udp:socket()) | |
45 | , dst_host :: inet:ip_address() | inet:hostname() | |
46 | , dst_port :: inet:port_number() | |
47 | , src_port :: inet:port_number() | |
48 | }). | |
49 | ||
50 | -type state() :: | |
51 | #state{}. | |
52 | ||
53 | -define(PATH_PREFIX, "beam_stats"). | |
54 | ||
55 | %% ============================================================================ | |
56 | %% Consumer implementation | |
57 | %% ============================================================================ | |
58 | ||
59 | -spec init([option()]) -> | |
76aefffb | 60 | {non_neg_integer(), state()}. |
70939664 SK |
61 | init(Options) -> |
62 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
63 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
64 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
65 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
66 | State = #state | |
67 | { sock = none | |
68 | , dst_host = DstHost | |
69 | , dst_port = DstPort | |
70 | , src_port = SrcPort | |
71 | }, | |
72 | {ConsumptionInterval, State}. | |
73 | ||
74 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
75 | state(). | |
76 | consume(Q, #state{}=State1) -> | |
77 | Payload = beam_stats_queue_to_binary(Q), | |
78 | State2 = try_to_connect_if_no_socket(State1), | |
79 | try_to_send(State2, Payload). | |
80 | ||
81 | -spec terminate(state()) -> | |
82 | {}. | |
83 | terminate(#state{sock=SockOpt}) -> | |
84 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
85 | {}. | |
86 | ||
87 | %% ============================================================================ | |
88 | %% Transport | |
89 | %% ============================================================================ | |
90 | ||
91 | -spec try_to_send(state(), binary()) -> | |
92 | state(). | |
93 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 94 | ?log_error("Sending failed. No socket in state."), |
70939664 SK |
95 | % TODO: Maybe schedule retry? |
96 | State; | |
97 | try_to_send( | |
98 | #state | |
99 | { sock = {some, Sock} | |
100 | , dst_host = DstHost | |
101 | , dst_port = DstPort | |
102 | }=State, | |
103 | Payload | |
104 | ) -> | |
105 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
106 | of ok -> | |
107 | State | |
108 | ; {error, _}=Error -> | |
f079a56c SK |
109 | ?log_error( |
110 | "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", | |
111 | [Sock, DstHost, DstPort, Error] | |
112 | ), | |
70939664 SK |
113 | % TODO: Do something with unsent messages? |
114 | ok = gen_udp:close(Sock), | |
115 | State#state{sock=none} | |
116 | end. | |
117 | ||
118 | -spec try_to_connect_if_no_socket(state()) -> | |
119 | state(). | |
120 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
121 | State; | |
122 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
123 | case gen_udp:open(SrcPort) | |
124 | of {ok, Sock} -> | |
125 | State#state{sock = {some, Sock}} | |
126 | ; {error, _}=Error -> | |
f079a56c | 127 | ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), |
70939664 SK |
128 | State#state{sock = none} |
129 | end. | |
130 | ||
131 | %% ============================================================================ | |
132 | %% Serialization | |
133 | %% ============================================================================ | |
134 | ||
135 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
136 | binary(). | |
137 | beam_stats_queue_to_binary(Q) -> | |
138 | iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). | |
139 | ||
140 | -spec beam_stats_to_bins(beam_stats:t()) -> | |
141 | [binary()]. | |
142 | beam_stats_to_bins(#beam_stats | |
143 | { node_id = NodeID | |
144 | , memory = Memory | |
b4e2333f SK |
145 | , io_bytes_in = IOBytesIn |
146 | , io_bytes_out = IOBytesOut | |
70939664 SK |
147 | } |
148 | ) -> | |
149 | NodeIDBin = node_id_to_bin(NodeID), | |
b4e2333f SK |
150 | Msgs1 = |
151 | [ io_bytes_in_to_msg(IOBytesIn) | |
152 | , io_bytes_out_to_msg(IOBytesOut) | |
153 | | memory_to_msgs(Memory) | |
154 | ], | |
70939664 SK |
155 | Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], |
156 | [statsd_msg_to_bin(M) || M <- Msgs2]. | |
157 | ||
b4e2333f SK |
158 | -spec io_bytes_in_to_msg(non_neg_integer()) -> |
159 | statsd_msg(). | |
160 | io_bytes_in_to_msg(IOBytesIn) -> | |
161 | #statsd_msg | |
162 | { name = <<"io.bytes_in">> | |
163 | , value = IOBytesIn | |
164 | , type = gauge | |
165 | }. | |
166 | ||
167 | -spec io_bytes_out_to_msg(non_neg_integer()) -> | |
168 | statsd_msg(). | |
169 | io_bytes_out_to_msg(IOBytesOut) -> | |
170 | #statsd_msg | |
171 | { name = <<"io.bytes_out">> | |
172 | , value = IOBytesOut | |
173 | , type = gauge | |
174 | }. | |
175 | ||
e7b39f07 | 176 | -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> |
70939664 SK |
177 | [statsd_msg()]. |
178 | memory_to_msgs(Memory) -> | |
179 | [memory_component_to_statsd_msg(MC) || MC <- Memory]. | |
180 | ||
e7b39f07 | 181 | -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> |
70939664 SK |
182 | statsd_msg(). |
183 | memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> | |
101874c3 | 184 | MemTypeBin = atom_to_binary(MemType, latin1), |
70939664 | 185 | #statsd_msg |
101874c3 | 186 | { name = <<"memory.", MemTypeBin/binary>> |
70939664 SK |
187 | , value = MemSize |
188 | , type = gauge | |
189 | }. | |
190 | ||
191 | -spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> | |
192 | statsd_msg(). | |
193 | statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) -> | |
194 | Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>, | |
195 | Name2 = <<Prefix/binary, Name1/binary>>, | |
196 | Msg#statsd_msg{name=Name2}. | |
197 | ||
198 | -spec statsd_msg_to_bin(statsd_msg()) -> | |
199 | binary(). | |
200 | statsd_msg_to_bin( | |
201 | #statsd_msg | |
202 | { name = <<Name/binary>> | |
203 | , value = Value | |
204 | , type = Type = gauge | |
205 | } | |
206 | ) when Value >= 0 -> | |
207 | TypeBin = metric_type_to_bin(Type), | |
208 | ValueBin = integer_to_binary(Value), | |
209 | << Name/binary | |
210 | , ":" | |
211 | , ValueBin/binary | |
212 | , "|" | |
213 | , TypeBin/binary | |
214 | , "\n" | |
215 | >>. | |
216 | ||
217 | -spec metric_type_to_bin(metric_type()) -> | |
218 | binary(). | |
219 | metric_type_to_bin(gauge) -> | |
220 | <<"g">>. | |
221 | ||
222 | -spec node_id_to_bin(node()) -> | |
223 | binary(). | |
224 | node_id_to_bin(NodeID) -> | |
225 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
226 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). |