Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_consumer_graphite). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
4 | ||
5 | -behaviour(beam_stats_consumer). | |
6 | ||
7 | -export_type( | |
8 | [ option/0 | |
9 | , connect_option/0 | |
10 | ]). | |
11 | ||
12 | -export( | |
13 | [ init/1 | |
14 | , consume/2 | |
15 | , terminate/1 | |
16 | ]). | |
17 | ||
18 | -type connect_option() :: | |
19 | {host , inet:ip_address() | inet:hostname()} | |
20 | | {port , inet:port_number()} | |
21 | | {timeout , timeout()} | |
22 | . | |
23 | ||
24 | -type option() :: | |
25 | {connect_options , [connect_option()]} | |
26 | | {consumption_interval , erlang:time()} | |
27 | . | |
28 | ||
29 | -record(state, | |
30 | { connect_options = [] :: [connect_option()] | |
31 | , sock = none :: hope_option:t(gen_tcp:socket()) | |
32 | }). | |
33 | ||
34 | -type state() :: | |
35 | #state{}. | |
36 | ||
37 | -define(GRAPHITE_PATH_PREFIX, "beam_stats"). | |
38 | ||
39 | -spec init([option()]) -> | |
40 | {erlang:time(), state()}. | |
41 | init(Options) -> | |
42 | ConnectOptions = hope_kv_list:get(Options, connect_options , []), | |
43 | ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), | |
44 | State = #state | |
45 | { connect_options = ConnectOptions | |
46 | , sock = none | |
47 | }, | |
48 | {ConsumptionInterval, State}. | |
49 | ||
50 | -spec consume(beam_stats_consumer:queue(), state()) -> | |
51 | state(). | |
52 | consume(Q, #state{}=State1) -> | |
53 | Payload = beam_stats_queue_to_binary(Q), | |
54 | State2 = try_to_connect_if_no_socket(State1), | |
55 | try_to_send(State2, Payload). | |
56 | ||
57 | -spec terminate(state()) -> | |
58 | {}. | |
59 | terminate(#state{sock=SockOpt}) -> | |
60 | ok = hope_option:iter(SockOpt, fun gen_tcp:close/1), | |
61 | {}. | |
62 | ||
63 | %% ============================================================================ | |
64 | ||
65 | -spec try_to_send(state(), binary()) -> | |
66 | state(). | |
67 | try_to_send(#state{sock=none}=State, _) -> | |
68 | io:format("error: socket closed~n"), | |
69 | % TODO: Maybe schedule retry? | |
70 | State; | |
71 | try_to_send(#state{sock={some, Sock}}=State, Payload) -> | |
72 | case gen_tcp:send(Sock, Payload) | |
73 | of ok -> | |
74 | State | |
75 | ; {error, _}=Error -> | |
76 | io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]), | |
77 | % TODO: Maybe schedule retry? | |
78 | ok = gen_tcp:close(Sock), | |
79 | State#state{sock=none} | |
80 | end. | |
81 | ||
82 | -spec try_to_connect_if_no_socket(state()) -> | |
83 | state(). | |
84 | try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> | |
85 | State; | |
86 | try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) -> | |
87 | DefaultHost = "localhost", | |
88 | DefaultPort = 2003, | |
89 | DefaultTimeout = 5000, | |
90 | Host = hope_kv_list:get(Options, host , DefaultHost), | |
91 | Port = hope_kv_list:get(Options, port , DefaultPort), | |
92 | Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout), | |
93 | case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout) | |
94 | of {ok, Sock} -> | |
95 | State#state{sock = {some, Sock}} | |
96 | ; {error, _}=Error -> | |
97 | io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]), | |
98 | State#state{sock = none} | |
99 | end. | |
100 | ||
101 | -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> | |
102 | binary(). | |
103 | beam_stats_queue_to_binary(Q) -> | |
104 | Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)], | |
105 | iolist_to_binary(Bins). | |
106 | ||
107 | -spec beam_stats_to_bin(beam_stats:t()) -> | |
108 | binary(). | |
109 | beam_stats_to_bin(#beam_stats | |
110 | { timestamp = Timestamp | |
111 | , node_id = NodeID | |
112 | , memory = Memory | |
113 | } | |
114 | ) -> | |
115 | TimestampInt = timestamp_to_integer(Timestamp), | |
116 | TimestampBin = integer_to_binary(TimestampInt), | |
117 | <<NodeIDBin/binary>> = node_id_to_bin(NodeID), | |
118 | PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin), | |
119 | MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory), | |
120 | MemoryBins = lists:map(PairToBin, MemoryBinPairs), | |
121 | AllBins = | |
122 | [ MemoryBins | |
123 | ], | |
124 | iolist_to_binary(AllBins). | |
125 | ||
126 | -spec timestamp_to_integer(erlang:timestamp()) -> | |
127 | non_neg_integer(). | |
128 | timestamp_to_integer({Megaseconds, Seconds, _}) -> | |
129 | Megaseconds * 1000000 + Seconds. | |
130 | ||
131 | -spec make_pair_to_bin(binary(), binary()) -> | |
132 | fun(({binary(), binary()}) -> binary()). | |
133 | make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) -> | |
134 | fun ({<<K/binary>>, <<V/binary>>}) -> | |
135 | << ?GRAPHITE_PATH_PREFIX | |
136 | , "." | |
137 | , NodeID/binary | |
138 | , "." | |
139 | , K/binary | |
140 | , " " | |
141 | , V/binary | |
142 | , " " | |
143 | , TimestampBin/binary | |
144 | , "\n" | |
145 | >> | |
146 | end. | |
147 | ||
148 | -spec node_id_to_bin(node()) -> | |
149 | binary(). | |
150 | node_id_to_bin(NodeID) -> | |
151 | NodeIDBin = atom_to_binary(NodeID, utf8), | |
152 | re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). | |
153 | ||
154 | -spec atom_int_to_bin_bin({atom(), integer()}) -> | |
155 | {binary(), binary()}. | |
156 | atom_int_to_bin_bin({K, V}) -> | |
157 | {atom_to_binary(K, latin1), integer_to_binary(V)}. |