Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
f079a56c | 4 | -include("beam_stats_logging.hrl"). |
70939664 SK |
5 | |
6 | -behaviour(beam_stats_consumer). | |
7 | ||
8 | -export_type( | |
9 | [ option/0 | |
10 | ]). | |
11 | ||
b0ab6ee2 | 12 | %% Consumer interface |
70939664 SK |
13 | -export( |
14 | [ init/1 | |
15 | , consume/2 | |
16 | , terminate/1 | |
17 | ]). | |
18 | ||
19 | -type option() :: | |
76aefffb | 20 | {consumption_interval , non_neg_integer()} |
70939664 SK |
21 | | {dst_host , inet:ip_address() | inet:hostname()} |
22 | | {dst_port , inet:port_number()} | |
23 | | {src_port , inet:port_number()} | |
4d24f3b7 | 24 | | {num_msgs_per_packet , non_neg_integer()} |
5b6519f3 | 25 | | {static_node_name , binary()} |
70939664 SK |
26 | . |
27 | ||
28 | -define(DEFAULT_DST_HOST, "localhost"). | |
29 | -define(DEFAULT_DST_PORT, 8125). | |
30 | -define(DEFAULT_SRC_PORT, 8124). | |
31 | ||
70939664 SK |
32 | -record(state, |
33 | { sock :: hope_option:t(gen_udp:socket()) | |
34 | , dst_host :: inet:ip_address() | inet:hostname() | |
35 | , dst_port :: inet:port_number() | |
36 | , src_port :: inet:port_number() | |
4d24f3b7 | 37 | , num_msgs_per_packet :: non_neg_integer() |
5b6519f3 | 38 | , static_node_name :: hope_option:t(binary()) |
70939664 SK |
39 | }). |
40 | ||
41 | -type state() :: | |
42 | #state{}. | |
43 | ||
ece99ea3 | 44 | -define(PATH_PREFIX, <<"beam_stats">>). |
70939664 SK |
45 | |
46 | %% ============================================================================ | |
47 | %% Consumer implementation | |
48 | %% ============================================================================ | |
49 | ||
50 | -spec init([option()]) -> | |
76aefffb | 51 | {non_neg_integer(), state()}. |
70939664 SK |
52 | init(Options) -> |
53 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
54 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
55 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
56 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
4d24f3b7 | 57 | NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), |
5b6519f3 | 58 | StaticNodeNameOpt = hope_kv_list:get(Options, static_node_name), |
70939664 SK |
59 | State = #state |
60 | { sock = none | |
61 | , dst_host = DstHost | |
62 | , dst_port = DstPort | |
63 | , src_port = SrcPort | |
4d24f3b7 | 64 | , num_msgs_per_packet = NumMsgsPerPacket |
5b6519f3 | 65 | , static_node_name = StaticNodeNameOpt |
70939664 SK |
66 | }, |
67 | {ConsumptionInterval, State}. | |
68 | ||
69 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
70 | state(). | |
5b6519f3 SK |
71 | consume( |
72 | Q, | |
73 | #state | |
74 | { num_msgs_per_packet = NumMsgsPerPacket | |
75 | , static_node_name = StaticNodeNameOpt | |
76 | }=State | |
77 | ) -> | |
78 | Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt), | |
4d24f3b7 | 79 | lists:foldl(fun try_to_connect_and_send/2, State, Packets). |
70939664 SK |
80 | |
81 | -spec terminate(state()) -> | |
82 | {}. | |
83 | terminate(#state{sock=SockOpt}) -> | |
84 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
85 | {}. | |
86 | ||
87 | %% ============================================================================ | |
88 | %% Transport | |
89 | %% ============================================================================ | |
90 | ||
4d24f3b7 SK |
91 | -spec try_to_connect_and_send(binary(), state()) -> |
92 | state(). | |
93 | try_to_connect_and_send(<<Payload/binary>>, #state{}=State1) -> | |
94 | State2 = try_to_connect_if_no_socket(State1), | |
95 | try_to_send(State2, Payload). | |
96 | ||
70939664 SK |
97 | -spec try_to_send(state(), binary()) -> |
98 | state(). | |
99 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 100 | ?log_error("Sending failed. No socket in state."), |
70939664 SK |
101 | % TODO: Maybe schedule retry? |
102 | State; | |
103 | try_to_send( | |
104 | #state | |
105 | { sock = {some, Sock} | |
106 | , dst_host = DstHost | |
107 | , dst_port = DstPort | |
108 | }=State, | |
109 | Payload | |
110 | ) -> | |
111 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
112 | of ok -> | |
113 | State | |
114 | ; {error, _}=Error -> | |
f079a56c SK |
115 | ?log_error( |
116 | "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", | |
117 | [Sock, DstHost, DstPort, Error] | |
118 | ), | |
70939664 SK |
119 | % TODO: Do something with unsent messages? |
120 | ok = gen_udp:close(Sock), | |
121 | State#state{sock=none} | |
122 | end. | |
123 | ||
124 | -spec try_to_connect_if_no_socket(state()) -> | |
125 | state(). | |
126 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
127 | State; | |
128 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
129 | case gen_udp:open(SrcPort) | |
130 | of {ok, Sock} -> | |
131 | State#state{sock = {some, Sock}} | |
132 | ; {error, _}=Error -> | |
f079a56c | 133 | ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), |
70939664 SK |
134 | State#state{sock = none} |
135 | end. | |
136 | ||
137 | %% ============================================================================ | |
138 | %% Serialization | |
139 | %% ============================================================================ | |
140 | ||
5b6519f3 SK |
141 | -spec beam_stats_queue_to_packets( |
142 | beam_stats_consumer:queue(), | |
143 | non_neg_integer(), | |
144 | hope_option:t(binary()) | |
145 | ) -> | |
4d24f3b7 | 146 | [binary()]. |
5b6519f3 SK |
147 | beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> |
148 | MsgBins = lists:append([beam_stats_to_bins(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), | |
4d24f3b7 SK |
149 | MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), |
150 | lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). | |
70939664 | 151 | |
5b6519f3 | 152 | -spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) -> |
70939664 | 153 | [binary()]. |
ece99ea3 SK |
154 | beam_stats_to_bins(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) -> |
155 | NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID), | |
156 | NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault), | |
157 | GraphiteMsgAddPrefix = | |
158 | fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end, | |
159 | MsgsGraphite1 = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin), | |
160 | MsgsGraphite2 = lists:map(GraphiteMsgAddPrefix, MsgsGraphite1), | |
161 | MsgsStatsD = | |
162 | lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite2), | |
163 | lists:map(fun beam_stats_msg_statsd_gauge:to_bin/1, MsgsStatsD). |