Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_consumer_graphite). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
f079a56c | 4 | -include("beam_stats_logging.hrl"). |
caf75ed8 SK |
5 | |
6 | -behaviour(beam_stats_consumer). | |
7 | ||
8 | -export_type( | |
1b0b4721 | 9 | [ option/0 |
caf75ed8 SK |
10 | ]). |
11 | ||
12 | -export( | |
13 | [ init/1 | |
14 | , consume/2 | |
15 | , terminate/1 | |
16 | ]). | |
17 | ||
caf75ed8 | 18 | -type option() :: |
b2f78fc6 | 19 | {consumption_interval , non_neg_integer()} |
1b0b4721 SK |
20 | | {host , inet:ip_address() | inet:hostname()} |
21 | | {port , inet:port_number()} | |
22 | | {timeout , timeout()} | |
caf75ed8 SK |
23 | . |
24 | ||
25 | -record(state, | |
1b0b4721 SK |
26 | { sock = none :: hope_option:t(Socket :: port()) |
27 | , host :: inet:ip_address() | inet:hostname() | |
28 | , port :: inet:port_number() | |
29 | , timeout :: timeout() | |
caf75ed8 SK |
30 | }). |
31 | ||
32 | -type state() :: | |
33 | #state{}. | |
34 | ||
1b0b4721 SK |
35 | -define(DEFAULT_HOST , "localhost"). |
36 | -define(DEFAULT_PORT , 2003). | |
37 | -define(DEFAULT_TIMEOUT , 5000). | |
caf75ed8 SK |
38 | |
39 | -spec init([option()]) -> | |
76aefffb | 40 | {non_neg_integer(), state()}. |
caf75ed8 | 41 | init(Options) -> |
1b0b4721 SK |
42 | Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end, |
43 | ConsumptionInterval = Get(consumption_interval, 60000), | |
caf75ed8 | 44 | State = #state |
1b0b4721 SK |
45 | { sock = none |
46 | , host = Get(host , ?DEFAULT_HOST) | |
47 | , port = Get(port , ?DEFAULT_PORT) | |
48 | , timeout = Get(timeout , ?DEFAULT_TIMEOUT) | |
caf75ed8 SK |
49 | }, |
50 | {ConsumptionInterval, State}. | |
51 | ||
52 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
53 | state(). | |
54 | consume(Q, #state{}=State1) -> | |
cdcb989e | 55 | Payload = beam_stats_queue_to_iolists(Q), |
caf75ed8 SK |
56 | State2 = try_to_connect_if_no_socket(State1), |
57 | try_to_send(State2, Payload). | |
58 | ||
59 | -spec terminate(state()) -> | |
60 | {}. | |
61 | terminate(#state{sock=SockOpt}) -> | |
62 | ok = hope_option:iter(SockOpt, fun gen_tcp:close/1), | |
63 | {}. | |
64 | ||
65 | %% ============================================================================ | |
66 | ||
cdcb989e | 67 | -spec try_to_send(state(), iolist()) -> |
caf75ed8 SK |
68 | state(). |
69 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 70 | ?log_error("Sending failed. No socket in state."), |
caf75ed8 SK |
71 | % TODO: Maybe schedule retry? |
72 | State; | |
73 | try_to_send(#state{sock={some, Sock}}=State, Payload) -> | |
74 | case gen_tcp:send(Sock, Payload) | |
75 | of ok -> | |
76 | State | |
77 | ; {error, _}=Error -> | |
7ef3dd01 | 78 | ?log_error("gen_tcp:send(~p, Payload) -> ~p", [Sock, Error]), |
caf75ed8 SK |
79 | % TODO: Maybe schedule retry? |
80 | ok = gen_tcp:close(Sock), | |
81 | State#state{sock=none} | |
82 | end. | |
83 | ||
84 | -spec try_to_connect_if_no_socket(state()) -> | |
85 | state(). | |
86 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
87 | State; | |
1b0b4721 SK |
88 | try_to_connect_if_no_socket( |
89 | #state | |
90 | { sock = none | |
91 | , host = Host | |
92 | , port = Port | |
93 | , timeout = Timeout | |
94 | }=State | |
95 | ) -> | |
f079a56c SK |
96 | Options = [binary, {active, false}], |
97 | case gen_tcp:connect(Host, Port, Options, Timeout) | |
caf75ed8 SK |
98 | of {ok, Sock} -> |
99 | State#state{sock = {some, Sock}} | |
100 | ; {error, _}=Error -> | |
f079a56c SK |
101 | ?log_error( |
102 | "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p", | |
103 | [Host, Port, Options, Timeout, Error] | |
104 | ), | |
caf75ed8 SK |
105 | State#state{sock = none} |
106 | end. | |
107 | ||
cdcb989e PO |
108 | -spec beam_stats_queue_to_iolists(beam_stats_consumer:queue()) -> |
109 | [iolist()]. | |
110 | beam_stats_queue_to_iolists(Q) -> | |
111 | [beam_stats_to_iolist(B) || B <- queue:to_list(Q)]. | |
caf75ed8 | 112 | |
cdcb989e PO |
113 | -spec beam_stats_to_iolist(beam_stats:t()) -> |
114 | [iolist()]. | |
115 | beam_stats_to_iolist(#beam_stats{}=BeamStats) -> | |
8fe744e7 | 116 | Msgs = beam_stats_msg_graphite:of_beam_stats(BeamStats), |
cdcb989e | 117 | lists:map(fun beam_stats_msg_graphite:to_iolist/1, Msgs). |