Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
4 | ||
5 | -behaviour(beam_stats_consumer). | |
6 | ||
7 | -export_type( | |
8 | [ option/0 | |
9 | ]). | |
10 | ||
11 | -export( | |
12 | [ init/1 | |
13 | , consume/2 | |
14 | , terminate/1 | |
15 | ]). | |
16 | ||
17 | -type option() :: | |
18 | {consumption_interval , erlang:time()} | |
19 | | {dst_host , inet:ip_address() | inet:hostname()} | |
20 | | {dst_port , inet:port_number()} | |
21 | | {src_port , inet:port_number()} | |
22 | . | |
23 | ||
24 | -define(DEFAULT_DST_HOST, "localhost"). | |
25 | -define(DEFAULT_DST_PORT, 8125). | |
26 | -define(DEFAULT_SRC_PORT, 8124). | |
27 | ||
28 | -type metric_type() :: | |
29 | % TODO: Add other metric types | |
30 | gauge. | |
31 | ||
32 | -record(statsd_msg, | |
33 | { name :: binary() | |
34 | , value :: non_neg_integer() | |
35 | , type :: metric_type() | |
36 | }). | |
37 | ||
38 | -type statsd_msg() :: | |
39 | #statsd_msg{}. | |
40 | ||
41 | -record(state, | |
42 | { sock :: hope_option:t(gen_udp:socket()) | |
43 | , dst_host :: inet:ip_address() | inet:hostname() | |
44 | , dst_port :: inet:port_number() | |
45 | , src_port :: inet:port_number() | |
46 | }). | |
47 | ||
48 | -type state() :: | |
49 | #state{}. | |
50 | ||
51 | -define(PATH_PREFIX, "beam_stats"). | |
52 | ||
53 | %% ============================================================================ | |
54 | %% Consumer implementation | |
55 | %% ============================================================================ | |
56 | ||
57 | -spec init([option()]) -> | |
58 | {erlang:time(), state()}. | |
59 | init(Options) -> | |
60 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
61 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
62 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
63 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
64 | State = #state | |
65 | { sock = none | |
66 | , dst_host = DstHost | |
67 | , dst_port = DstPort | |
68 | , src_port = SrcPort | |
69 | }, | |
70 | {ConsumptionInterval, State}. | |
71 | ||
72 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
73 | state(). | |
74 | consume(Q, #state{}=State1) -> | |
75 | Payload = beam_stats_queue_to_binary(Q), | |
76 | State2 = try_to_connect_if_no_socket(State1), | |
77 | try_to_send(State2, Payload). | |
78 | ||
79 | -spec terminate(state()) -> | |
80 | {}. | |
81 | terminate(#state{sock=SockOpt}) -> | |
82 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
83 | {}. | |
84 | ||
85 | %% ============================================================================ | |
86 | %% Transport | |
87 | %% ============================================================================ | |
88 | ||
89 | -spec try_to_send(state(), binary()) -> | |
90 | state(). | |
91 | try_to_send(#state{sock=none}=State, _) -> | |
92 | io:format("error: socket closed~n"), | |
93 | % TODO: Maybe schedule retry? | |
94 | State; | |
95 | try_to_send( | |
96 | #state | |
97 | { sock = {some, Sock} | |
98 | , dst_host = DstHost | |
99 | , dst_port = DstPort | |
100 | }=State, | |
101 | Payload | |
102 | ) -> | |
103 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
104 | of ok -> | |
105 | State | |
106 | ; {error, _}=Error -> | |
107 | io:format("error: gen_udp:send/4 failed: ~p~n", [Error]), | |
108 | % TODO: Do something with unsent messages? | |
109 | ok = gen_udp:close(Sock), | |
110 | State#state{sock=none} | |
111 | end. | |
112 | ||
113 | -spec try_to_connect_if_no_socket(state()) -> | |
114 | state(). | |
115 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
116 | State; | |
117 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
118 | case gen_udp:open(SrcPort) | |
119 | of {ok, Sock} -> | |
120 | State#state{sock = {some, Sock}} | |
121 | ; {error, _}=Error -> | |
122 | io:format("error: gen_udp:open/1 failed: ~p~n", [Error]), | |
123 | State#state{sock = none} | |
124 | end. | |
125 | ||
126 | %% ============================================================================ | |
127 | %% Serialization | |
128 | %% ============================================================================ | |
129 | ||
130 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
131 | binary(). | |
132 | beam_stats_queue_to_binary(Q) -> | |
133 | iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). | |
134 | ||
135 | -spec beam_stats_to_bins(beam_stats:t()) -> | |
136 | [binary()]. | |
137 | beam_stats_to_bins(#beam_stats | |
138 | { node_id = NodeID | |
139 | , memory = Memory | |
140 | } | |
141 | ) -> | |
142 | NodeIDBin = node_id_to_bin(NodeID), | |
143 | Msgs1 = memory_to_msgs(Memory), | |
144 | Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], | |
145 | [statsd_msg_to_bin(M) || M <- Msgs2]. | |
146 | ||
147 | -spec memory_to_msgs(erlang:memory()) -> | |
148 | [statsd_msg()]. | |
149 | memory_to_msgs(Memory) -> | |
150 | [memory_component_to_statsd_msg(MC) || MC <- Memory]. | |
151 | ||
152 | -spec memory_component_to_statsd_msg({erlang:memory_type(), non_neg_integer()}) -> | |
153 | statsd_msg(). | |
154 | memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> | |
155 | #statsd_msg | |
156 | { name = atom_to_binary(MemType, latin1) | |
157 | , value = MemSize | |
158 | , type = gauge | |
159 | }. | |
160 | ||
161 | -spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> | |
162 | statsd_msg(). | |
163 | statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) -> | |
164 | Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>, | |
165 | Name2 = <<Prefix/binary, Name1/binary>>, | |
166 | Msg#statsd_msg{name=Name2}. | |
167 | ||
168 | -spec statsd_msg_to_bin(statsd_msg()) -> | |
169 | binary(). | |
170 | statsd_msg_to_bin( | |
171 | #statsd_msg | |
172 | { name = <<Name/binary>> | |
173 | , value = Value | |
174 | , type = Type = gauge | |
175 | } | |
176 | ) when Value >= 0 -> | |
177 | TypeBin = metric_type_to_bin(Type), | |
178 | ValueBin = integer_to_binary(Value), | |
179 | << Name/binary | |
180 | , ":" | |
181 | , ValueBin/binary | |
182 | , "|" | |
183 | , TypeBin/binary | |
184 | , "\n" | |
185 | >>. | |
186 | ||
187 | -spec metric_type_to_bin(metric_type()) -> | |
188 | binary(). | |
189 | metric_type_to_bin(gauge) -> | |
190 | <<"g">>. | |
191 | ||
192 | -spec node_id_to_bin(node()) -> | |
193 | binary(). | |
194 | node_id_to_bin(NodeID) -> | |
195 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
196 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). |