Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_consumer_graphite). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
4 | ||
5 | -behaviour(beam_stats_consumer). | |
6 | ||
7 | -export_type( | |
1b0b4721 | 8 | [ option/0 |
caf75ed8 SK |
9 | ]). |
10 | ||
11 | -export( | |
12 | [ init/1 | |
13 | , consume/2 | |
14 | , terminate/1 | |
15 | ]). | |
16 | ||
caf75ed8 | 17 | -type option() :: |
b2f78fc6 | 18 | {consumption_interval , non_neg_integer()} |
1b0b4721 SK |
19 | | {host , inet:ip_address() | inet:hostname()} |
20 | | {port , inet:port_number()} | |
21 | | {timeout , timeout()} | |
caf75ed8 SK |
22 | . |
23 | ||
24 | -record(state, | |
1b0b4721 SK |
25 | { sock = none :: hope_option:t(Socket :: port()) |
26 | , host :: inet:ip_address() | inet:hostname() | |
27 | , port :: inet:port_number() | |
28 | , timeout :: timeout() | |
caf75ed8 SK |
29 | }). |
30 | ||
31 | -type state() :: | |
32 | #state{}. | |
33 | ||
34 | -define(GRAPHITE_PATH_PREFIX, "beam_stats"). | |
1b0b4721 SK |
35 | -define(DEFAULT_HOST , "localhost"). |
36 | -define(DEFAULT_PORT , 2003). | |
37 | -define(DEFAULT_TIMEOUT , 5000). | |
caf75ed8 SK |
38 | |
39 | -spec init([option()]) -> | |
76aefffb | 40 | {non_neg_integer(), state()}. |
caf75ed8 | 41 | init(Options) -> |
1b0b4721 SK |
42 | Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end, |
43 | ConsumptionInterval = Get(consumption_interval, 60000), | |
caf75ed8 | 44 | State = #state |
1b0b4721 SK |
45 | { sock = none |
46 | , host = Get(host , ?DEFAULT_HOST) | |
47 | , port = Get(port , ?DEFAULT_PORT) | |
48 | , timeout = Get(timeout , ?DEFAULT_TIMEOUT) | |
caf75ed8 SK |
49 | }, |
50 | {ConsumptionInterval, State}. | |
51 | ||
52 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
53 | state(). | |
54 | consume(Q, #state{}=State1) -> | |
55 | Payload = beam_stats_queue_to_binary(Q), | |
56 | State2 = try_to_connect_if_no_socket(State1), | |
57 | try_to_send(State2, Payload). | |
58 | ||
59 | -spec terminate(state()) -> | |
60 | {}. | |
61 | terminate(#state{sock=SockOpt}) -> | |
62 | ok = hope_option:iter(SockOpt, fun gen_tcp:close/1), | |
63 | {}. | |
64 | ||
65 | %% ============================================================================ | |
66 | ||
67 | -spec try_to_send(state(), binary()) -> | |
68 | state(). | |
69 | try_to_send(#state{sock=none}=State, _) -> | |
70 | io:format("error: socket closed~n"), | |
71 | % TODO: Maybe schedule retry? | |
72 | State; | |
73 | try_to_send(#state{sock={some, Sock}}=State, Payload) -> | |
74 | case gen_tcp:send(Sock, Payload) | |
75 | of ok -> | |
76 | State | |
77 | ; {error, _}=Error -> | |
78 | io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]), | |
79 | % TODO: Maybe schedule retry? | |
80 | ok = gen_tcp:close(Sock), | |
81 | State#state{sock=none} | |
82 | end. | |
83 | ||
84 | -spec try_to_connect_if_no_socket(state()) -> | |
85 | state(). | |
86 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
87 | State; | |
1b0b4721 SK |
88 | try_to_connect_if_no_socket( |
89 | #state | |
90 | { sock = none | |
91 | , host = Host | |
92 | , port = Port | |
93 | , timeout = Timeout | |
94 | }=State | |
95 | ) -> | |
caf75ed8 SK |
96 | case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout) |
97 | of {ok, Sock} -> | |
98 | State#state{sock = {some, Sock}} | |
99 | ; {error, _}=Error -> | |
100 | io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]), | |
101 | State#state{sock = none} | |
102 | end. | |
103 | ||
104 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
105 | binary(). | |
106 | beam_stats_queue_to_binary(Q) -> | |
107 | Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)], | |
108 | iolist_to_binary(Bins). | |
109 | ||
110 | -spec beam_stats_to_bin(beam_stats:t()) -> | |
111 | binary(). | |
112 | beam_stats_to_bin(#beam_stats | |
113 | { timestamp = Timestamp | |
114 | , node_id = NodeID | |
115 | , memory = Memory | |
116 | } | |
117 | ) -> | |
118 | TimestampInt = timestamp_to_integer(Timestamp), | |
119 | TimestampBin = integer_to_binary(TimestampInt), | |
120 | <<NodeIDBin/binary>> = node_id_to_bin(NodeID), | |
121 | PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin), | |
122 | MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory), | |
123 | MemoryBins = lists:map(PairToBin, MemoryBinPairs), | |
124 | AllBins = | |
125 | [ MemoryBins | |
126 | ], | |
127 | iolist_to_binary(AllBins). | |
128 | ||
129 | -spec timestamp_to_integer(erlang:timestamp()) -> | |
130 | non_neg_integer(). | |
131 | timestamp_to_integer({Megaseconds, Seconds, _}) -> | |
132 | Megaseconds * 1000000 + Seconds. | |
133 | ||
134 | -spec make_pair_to_bin(binary(), binary()) -> | |
135 | fun(({binary(), binary()}) -> binary()). | |
136 | make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) -> | |
137 | fun ({<<K/binary>>, <<V/binary>>}) -> | |
138 | << ?GRAPHITE_PATH_PREFIX | |
139 | , "." | |
140 | , NodeID/binary | |
141 | , "." | |
142 | , K/binary | |
143 | , " " | |
144 | , V/binary | |
145 | , " " | |
146 | , TimestampBin/binary | |
147 | , "\n" | |
148 | >> | |
149 | end. | |
150 | ||
151 | -spec node_id_to_bin(node()) -> | |
152 | binary(). | |
153 | node_id_to_bin(NodeID) -> | |
154 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
155 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). | |
156 | ||
157 | -spec atom_int_to_bin_bin({atom(), integer()}) -> | |
158 | {binary(), binary()}. | |
159 | atom_int_to_bin_bin({K, V}) -> | |
160 | {atom_to_binary(K, latin1), integer_to_binary(V)}. |