Commit | Line | Data |
---|---|---|
70939664 SK |
1 | -module(beam_stats_consumer_statsd). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
f079a56c | 4 | -include("beam_stats_logging.hrl"). |
70939664 SK |
5 | |
6 | -behaviour(beam_stats_consumer). | |
7 | ||
8 | -export_type( | |
9 | [ option/0 | |
10 | ]). | |
11 | ||
b0ab6ee2 | 12 | %% Consumer interface |
70939664 SK |
13 | -export( |
14 | [ init/1 | |
15 | , consume/2 | |
16 | , terminate/1 | |
17 | ]). | |
18 | ||
19 | -type option() :: | |
76aefffb | 20 | {consumption_interval , non_neg_integer()} |
70939664 SK |
21 | | {dst_host , inet:ip_address() | inet:hostname()} |
22 | | {dst_port , inet:port_number()} | |
23 | | {src_port , inet:port_number()} | |
4d24f3b7 | 24 | | {num_msgs_per_packet , non_neg_integer()} |
5b6519f3 | 25 | | {static_node_name , binary()} |
70939664 SK |
26 | . |
27 | ||
28 | -define(DEFAULT_DST_HOST, "localhost"). | |
29 | -define(DEFAULT_DST_PORT, 8125). | |
30 | -define(DEFAULT_SRC_PORT, 8124). | |
31 | ||
70939664 SK |
32 | -record(state, |
33 | { sock :: hope_option:t(gen_udp:socket()) | |
34 | , dst_host :: inet:ip_address() | inet:hostname() | |
35 | , dst_port :: inet:port_number() | |
36 | , src_port :: inet:port_number() | |
4d24f3b7 | 37 | , num_msgs_per_packet :: non_neg_integer() |
5b6519f3 | 38 | , static_node_name :: hope_option:t(binary()) |
70939664 SK |
39 | }). |
40 | ||
41 | -type state() :: | |
42 | #state{}. | |
43 | ||
70939664 SK |
44 | %% ============================================================================ |
45 | %% Consumer implementation | |
46 | %% ============================================================================ | |
47 | ||
48 | -spec init([option()]) -> | |
76aefffb | 49 | {non_neg_integer(), state()}. |
70939664 SK |
50 | init(Options) -> |
51 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
52 | DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), | |
53 | DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), | |
54 | SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), | |
4d24f3b7 | 55 | NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), |
5b6519f3 | 56 | StaticNodeNameOpt = hope_kv_list:get(Options, static_node_name), |
70939664 SK |
57 | State = #state |
58 | { sock = none | |
59 | , dst_host = DstHost | |
60 | , dst_port = DstPort | |
61 | , src_port = SrcPort | |
4d24f3b7 | 62 | , num_msgs_per_packet = NumMsgsPerPacket |
5b6519f3 | 63 | , static_node_name = StaticNodeNameOpt |
70939664 SK |
64 | }, |
65 | {ConsumptionInterval, State}. | |
66 | ||
67 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
68 | state(). | |
5b6519f3 SK |
69 | consume( |
70 | Q, | |
71 | #state | |
72 | { num_msgs_per_packet = NumMsgsPerPacket | |
73 | , static_node_name = StaticNodeNameOpt | |
74 | }=State | |
75 | ) -> | |
76 | Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt), | |
4d24f3b7 | 77 | lists:foldl(fun try_to_connect_and_send/2, State, Packets). |
70939664 SK |
78 | |
79 | -spec terminate(state()) -> | |
80 | {}. | |
81 | terminate(#state{sock=SockOpt}) -> | |
82 | ok = hope_option:iter(SockOpt, fun gen_udp:close/1), | |
83 | {}. | |
84 | ||
85 | %% ============================================================================ | |
86 | %% Transport | |
87 | %% ============================================================================ | |
88 | ||
4d24f3b7 SK |
89 | -spec try_to_connect_and_send(binary(), state()) -> |
90 | state(). | |
91 | try_to_connect_and_send(<<Payload/binary>>, #state{}=State1) -> | |
92 | State2 = try_to_connect_if_no_socket(State1), | |
93 | try_to_send(State2, Payload). | |
94 | ||
70939664 SK |
95 | -spec try_to_send(state(), binary()) -> |
96 | state(). | |
97 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 98 | ?log_error("Sending failed. No socket in state."), |
70939664 SK |
99 | % TODO: Maybe schedule retry? |
100 | State; | |
101 | try_to_send( | |
102 | #state | |
103 | { sock = {some, Sock} | |
104 | , dst_host = DstHost | |
105 | , dst_port = DstPort | |
106 | }=State, | |
107 | Payload | |
108 | ) -> | |
109 | case gen_udp:send(Sock, DstHost, DstPort, Payload) | |
110 | of ok -> | |
111 | State | |
112 | ; {error, _}=Error -> | |
f079a56c SK |
113 | ?log_error( |
114 | "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", | |
115 | [Sock, DstHost, DstPort, Error] | |
116 | ), | |
70939664 SK |
117 | % TODO: Do something with unsent messages? |
118 | ok = gen_udp:close(Sock), | |
119 | State#state{sock=none} | |
120 | end. | |
121 | ||
122 | -spec try_to_connect_if_no_socket(state()) -> | |
123 | state(). | |
124 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
125 | State; | |
126 | try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> | |
127 | case gen_udp:open(SrcPort) | |
128 | of {ok, Sock} -> | |
129 | State#state{sock = {some, Sock}} | |
130 | ; {error, _}=Error -> | |
f079a56c | 131 | ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), |
70939664 SK |
132 | State#state{sock = none} |
133 | end. | |
134 | ||
135 | %% ============================================================================ | |
136 | %% Serialization | |
137 | %% ============================================================================ | |
138 | ||
5b6519f3 SK |
139 | -spec beam_stats_queue_to_packets( |
140 | beam_stats_consumer:queue(), | |
141 | non_neg_integer(), | |
142 | hope_option:t(binary()) | |
143 | ) -> | |
4d24f3b7 | 144 | [binary()]. |
5b6519f3 SK |
145 | beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> |
146 | MsgBins = lists:append([beam_stats_to_bins(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), | |
4d24f3b7 SK |
147 | MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), |
148 | lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). | |
70939664 | 149 | |
5b6519f3 | 150 | -spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) -> |
70939664 | 151 | [binary()]. |
ece99ea3 SK |
152 | beam_stats_to_bins(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) -> |
153 | NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID), | |
154 | NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault), | |
8fe744e7 | 155 | MsgsGraphite = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin), |
ece99ea3 | 156 | MsgsStatsD = |
8fe744e7 | 157 | lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite), |
ece99ea3 | 158 | lists:map(fun beam_stats_msg_statsd_gauge:to_bin/1, MsgsStatsD). |