Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_consumer_graphite). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
f079a56c | 4 | -include("beam_stats_logging.hrl"). |
caf75ed8 SK |
5 | |
6 | -behaviour(beam_stats_consumer). | |
7 | ||
8 | -export_type( | |
1b0b4721 | 9 | [ option/0 |
caf75ed8 SK |
10 | ]). |
11 | ||
12 | -export( | |
13 | [ init/1 | |
14 | , consume/2 | |
15 | , terminate/1 | |
16 | ]). | |
17 | ||
caf75ed8 | 18 | -type option() :: |
b2f78fc6 | 19 | {consumption_interval , non_neg_integer()} |
1b0b4721 SK |
20 | | {host , inet:ip_address() | inet:hostname()} |
21 | | {port , inet:port_number()} | |
22 | | {timeout , timeout()} | |
caf75ed8 SK |
23 | . |
24 | ||
25 | -record(state, | |
1b0b4721 SK |
26 | { sock = none :: hope_option:t(Socket :: port()) |
27 | , host :: inet:ip_address() | inet:hostname() | |
28 | , port :: inet:port_number() | |
29 | , timeout :: timeout() | |
caf75ed8 SK |
30 | }). |
31 | ||
32 | -type state() :: | |
33 | #state{}. | |
34 | ||
1aa506ea | 35 | -define(PATH_PREFIX , <<"beam_stats">>). |
1b0b4721 SK |
36 | -define(DEFAULT_HOST , "localhost"). |
37 | -define(DEFAULT_PORT , 2003). | |
38 | -define(DEFAULT_TIMEOUT , 5000). | |
caf75ed8 SK |
39 | |
40 | -spec init([option()]) -> | |
76aefffb | 41 | {non_neg_integer(), state()}. |
caf75ed8 | 42 | init(Options) -> |
1b0b4721 SK |
43 | Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end, |
44 | ConsumptionInterval = Get(consumption_interval, 60000), | |
caf75ed8 | 45 | State = #state |
1b0b4721 SK |
46 | { sock = none |
47 | , host = Get(host , ?DEFAULT_HOST) | |
48 | , port = Get(port , ?DEFAULT_PORT) | |
49 | , timeout = Get(timeout , ?DEFAULT_TIMEOUT) | |
caf75ed8 SK |
50 | }, |
51 | {ConsumptionInterval, State}. | |
52 | ||
53 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
54 | state(). | |
55 | consume(Q, #state{}=State1) -> | |
56 | Payload = beam_stats_queue_to_binary(Q), | |
57 | State2 = try_to_connect_if_no_socket(State1), | |
58 | try_to_send(State2, Payload). | |
59 | ||
60 | -spec terminate(state()) -> | |
61 | {}. | |
62 | terminate(#state{sock=SockOpt}) -> | |
63 | ok = hope_option:iter(SockOpt, fun gen_tcp:close/1), | |
64 | {}. | |
65 | ||
66 | %% ============================================================================ | |
67 | ||
68 | -spec try_to_send(state(), binary()) -> | |
69 | state(). | |
70 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 71 | ?log_error("Sending failed. No socket in state."), |
caf75ed8 SK |
72 | % TODO: Maybe schedule retry? |
73 | State; | |
74 | try_to_send(#state{sock={some, Sock}}=State, Payload) -> | |
75 | case gen_tcp:send(Sock, Payload) | |
76 | of ok -> | |
77 | State | |
78 | ; {error, _}=Error -> | |
f079a56c | 79 | ?log_error("gen_tcp:send(~p, ~p) -> ~p", [Sock, Payload, Error]), |
caf75ed8 SK |
80 | % TODO: Maybe schedule retry? |
81 | ok = gen_tcp:close(Sock), | |
82 | State#state{sock=none} | |
83 | end. | |
84 | ||
85 | -spec try_to_connect_if_no_socket(state()) -> | |
86 | state(). | |
87 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
88 | State; | |
1b0b4721 SK |
89 | try_to_connect_if_no_socket( |
90 | #state | |
91 | { sock = none | |
92 | , host = Host | |
93 | , port = Port | |
94 | , timeout = Timeout | |
95 | }=State | |
96 | ) -> | |
f079a56c SK |
97 | Options = [binary, {active, false}], |
98 | case gen_tcp:connect(Host, Port, Options, Timeout) | |
caf75ed8 SK |
99 | of {ok, Sock} -> |
100 | State#state{sock = {some, Sock}} | |
101 | ; {error, _}=Error -> | |
f079a56c SK |
102 | ?log_error( |
103 | "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p", | |
104 | [Host, Port, Options, Timeout, Error] | |
105 | ), | |
caf75ed8 SK |
106 | State#state{sock = none} |
107 | end. | |
108 | ||
109 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
110 | binary(). | |
111 | beam_stats_queue_to_binary(Q) -> | |
1aa506ea | 112 | Bins = [beam_stats_to_bins(B) || B <- queue:to_list(Q)], |
caf75ed8 SK |
113 | iolist_to_binary(Bins). |
114 | ||
1aa506ea SK |
115 | -spec beam_stats_to_bins(beam_stats:t()) -> |
116 | [binary()]. | |
117 | beam_stats_to_bins(#beam_stats{}=BeamStats) -> | |
118 | MsgAddPrefix = | |
119 | fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end, | |
120 | Msgs1 = beam_stats_msg_graphite:of_beam_stats(BeamStats), | |
121 | Msgs2 = lists:map(MsgAddPrefix, Msgs1), | |
122 | lists:map(fun beam_stats_msg_graphite:to_bin/1, Msgs2). |