Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_consumer_graphite). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
f079a56c | 4 | -include("beam_stats_logging.hrl"). |
caf75ed8 SK |
5 | |
6 | -behaviour(beam_stats_consumer). | |
7 | ||
8 | -export_type( | |
1b0b4721 | 9 | [ option/0 |
caf75ed8 SK |
10 | ]). |
11 | ||
12 | -export( | |
13 | [ init/1 | |
14 | , consume/2 | |
15 | , terminate/1 | |
16 | ]). | |
17 | ||
caf75ed8 | 18 | -type option() :: |
b2f78fc6 | 19 | {consumption_interval , non_neg_integer()} |
1b0b4721 SK |
20 | | {host , inet:ip_address() | inet:hostname()} |
21 | | {port , inet:port_number()} | |
22 | | {timeout , timeout()} | |
caf75ed8 SK |
23 | . |
24 | ||
25 | -record(state, | |
1b0b4721 SK |
26 | { sock = none :: hope_option:t(Socket :: port()) |
27 | , host :: inet:ip_address() | inet:hostname() | |
28 | , port :: inet:port_number() | |
29 | , timeout :: timeout() | |
caf75ed8 SK |
30 | }). |
31 | ||
32 | -type state() :: | |
33 | #state{}. | |
34 | ||
35 | -define(GRAPHITE_PATH_PREFIX, "beam_stats"). | |
1b0b4721 SK |
36 | -define(DEFAULT_HOST , "localhost"). |
37 | -define(DEFAULT_PORT , 2003). | |
38 | -define(DEFAULT_TIMEOUT , 5000). | |
caf75ed8 SK |
39 | |
40 | -spec init([option()]) -> | |
76aefffb | 41 | {non_neg_integer(), state()}. |
caf75ed8 | 42 | init(Options) -> |
1b0b4721 SK |
43 | Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end, |
44 | ConsumptionInterval = Get(consumption_interval, 60000), | |
caf75ed8 | 45 | State = #state |
1b0b4721 SK |
46 | { sock = none |
47 | , host = Get(host , ?DEFAULT_HOST) | |
48 | , port = Get(port , ?DEFAULT_PORT) | |
49 | , timeout = Get(timeout , ?DEFAULT_TIMEOUT) | |
caf75ed8 SK |
50 | }, |
51 | {ConsumptionInterval, State}. | |
52 | ||
53 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
54 | state(). | |
55 | consume(Q, #state{}=State1) -> | |
56 | Payload = beam_stats_queue_to_binary(Q), | |
57 | State2 = try_to_connect_if_no_socket(State1), | |
58 | try_to_send(State2, Payload). | |
59 | ||
60 | -spec terminate(state()) -> | |
61 | {}. | |
62 | terminate(#state{sock=SockOpt}) -> | |
63 | ok = hope_option:iter(SockOpt, fun gen_tcp:close/1), | |
64 | {}. | |
65 | ||
66 | %% ============================================================================ | |
67 | ||
68 | -spec try_to_send(state(), binary()) -> | |
69 | state(). | |
70 | try_to_send(#state{sock=none}=State, _) -> | |
f079a56c | 71 | ?log_error("Sending failed. No socket in state."), |
caf75ed8 SK |
72 | % TODO: Maybe schedule retry? |
73 | State; | |
74 | try_to_send(#state{sock={some, Sock}}=State, Payload) -> | |
75 | case gen_tcp:send(Sock, Payload) | |
76 | of ok -> | |
77 | State | |
78 | ; {error, _}=Error -> | |
f079a56c | 79 | ?log_error("gen_tcp:send(~p, ~p) -> ~p", [Sock, Payload, Error]), |
caf75ed8 SK |
80 | % TODO: Maybe schedule retry? |
81 | ok = gen_tcp:close(Sock), | |
82 | State#state{sock=none} | |
83 | end. | |
84 | ||
85 | -spec try_to_connect_if_no_socket(state()) -> | |
86 | state(). | |
87 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
88 | State; | |
1b0b4721 SK |
89 | try_to_connect_if_no_socket( |
90 | #state | |
91 | { sock = none | |
92 | , host = Host | |
93 | , port = Port | |
94 | , timeout = Timeout | |
95 | }=State | |
96 | ) -> | |
f079a56c SK |
97 | Options = [binary, {active, false}], |
98 | case gen_tcp:connect(Host, Port, Options, Timeout) | |
caf75ed8 SK |
99 | of {ok, Sock} -> |
100 | State#state{sock = {some, Sock}} | |
101 | ; {error, _}=Error -> | |
f079a56c SK |
102 | ?log_error( |
103 | "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p", | |
104 | [Host, Port, Options, Timeout, Error] | |
105 | ), | |
caf75ed8 SK |
106 | State#state{sock = none} |
107 | end. | |
108 | ||
109 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
110 | binary(). | |
111 | beam_stats_queue_to_binary(Q) -> | |
112 | Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)], | |
113 | iolist_to_binary(Bins). | |
114 | ||
115 | -spec beam_stats_to_bin(beam_stats:t()) -> | |
116 | binary(). | |
117 | beam_stats_to_bin(#beam_stats | |
118 | { timestamp = Timestamp | |
119 | , node_id = NodeID | |
120 | , memory = Memory | |
121 | } | |
122 | ) -> | |
123 | TimestampInt = timestamp_to_integer(Timestamp), | |
124 | TimestampBin = integer_to_binary(TimestampInt), | |
125 | <<NodeIDBin/binary>> = node_id_to_bin(NodeID), | |
101874c3 | 126 | MemoryPairToBin = make_pair_to_bin(NodeIDBin, TimestampBin, <<"memory">>), |
caf75ed8 | 127 | MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory), |
101874c3 | 128 | MemoryBins = lists:map(MemoryPairToBin, MemoryBinPairs), |
caf75ed8 SK |
129 | AllBins = |
130 | [ MemoryBins | |
131 | ], | |
132 | iolist_to_binary(AllBins). | |
133 | ||
134 | -spec timestamp_to_integer(erlang:timestamp()) -> | |
135 | non_neg_integer(). | |
136 | timestamp_to_integer({Megaseconds, Seconds, _}) -> | |
137 | Megaseconds * 1000000 + Seconds. | |
138 | ||
101874c3 | 139 | -spec make_pair_to_bin(binary(), binary(), binary()) -> |
caf75ed8 | 140 | fun(({binary(), binary()}) -> binary()). |
101874c3 | 141 | make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>, <<Type/binary>>) -> |
caf75ed8 SK |
142 | fun ({<<K/binary>>, <<V/binary>>}) -> |
143 | << ?GRAPHITE_PATH_PREFIX | |
144 | , "." | |
145 | , NodeID/binary | |
146 | , "." | |
101874c3 SK |
147 | , Type/binary |
148 | , "." | |
caf75ed8 SK |
149 | , K/binary |
150 | , " " | |
151 | , V/binary | |
152 | , " " | |
153 | , TimestampBin/binary | |
154 | , "\n" | |
155 | >> | |
156 | end. | |
157 | ||
158 | -spec node_id_to_bin(node()) -> | |
159 | binary(). | |
160 | node_id_to_bin(NodeID) -> | |
161 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
162 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). | |
163 | ||
164 | -spec atom_int_to_bin_bin({atom(), integer()}) -> | |
165 | {binary(), binary()}. | |
166 | atom_int_to_bin_bin({K, V}) -> | |
167 | {atom_to_binary(K, latin1), integer_to_binary(V)}. |