Flatten Graphite consumer options.
[beam_stats.git] / src / beam_stats_consumer_graphite.erl
CommitLineData
caf75ed8
SK
1-module(beam_stats_consumer_graphite).
2
3-include("include/beam_stats.hrl").
4
5-behaviour(beam_stats_consumer).
6
7-export_type(
1b0b4721 8 [ option/0
caf75ed8
SK
9 ]).
10
11-export(
12 [ init/1
13 , consume/2
14 , terminate/1
15 ]).
16
caf75ed8 17-type option() ::
b2f78fc6 18 {consumption_interval , non_neg_integer()}
1b0b4721
SK
19 | {host , inet:ip_address() | inet:hostname()}
20 | {port , inet:port_number()}
21 | {timeout , timeout()}
caf75ed8
SK
22 .
23
24-record(state,
1b0b4721
SK
25 { sock = none :: hope_option:t(Socket :: port())
26 , host :: inet:ip_address() | inet:hostname()
27 , port :: inet:port_number()
28 , timeout :: timeout()
caf75ed8
SK
29 }).
30
31-type state() ::
32 #state{}.
33
34-define(GRAPHITE_PATH_PREFIX, "beam_stats").
1b0b4721
SK
35-define(DEFAULT_HOST , "localhost").
36-define(DEFAULT_PORT , 2003).
37-define(DEFAULT_TIMEOUT , 5000).
caf75ed8
SK
38
39-spec init([option()]) ->
76aefffb 40 {non_neg_integer(), state()}.
caf75ed8 41init(Options) ->
1b0b4721
SK
42 Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end,
43 ConsumptionInterval = Get(consumption_interval, 60000),
caf75ed8 44 State = #state
1b0b4721
SK
45 { sock = none
46 , host = Get(host , ?DEFAULT_HOST)
47 , port = Get(port , ?DEFAULT_PORT)
48 , timeout = Get(timeout , ?DEFAULT_TIMEOUT)
caf75ed8
SK
49 },
50 {ConsumptionInterval, State}.
51
52-spec consume(beam_stats_consumer:queue(), state()) ->
53 state().
54consume(Q, #state{}=State1) ->
55 Payload = beam_stats_queue_to_binary(Q),
56 State2 = try_to_connect_if_no_socket(State1),
57 try_to_send(State2, Payload).
58
59-spec terminate(state()) ->
60 {}.
61terminate(#state{sock=SockOpt}) ->
62 ok = hope_option:iter(SockOpt, fun gen_tcp:close/1),
63 {}.
64
65%% ============================================================================
66
67-spec try_to_send(state(), binary()) ->
68 state().
69try_to_send(#state{sock=none}=State, _) ->
70 io:format("error: socket closed~n"),
71 % TODO: Maybe schedule retry?
72 State;
73try_to_send(#state{sock={some, Sock}}=State, Payload) ->
74 case gen_tcp:send(Sock, Payload)
75 of ok ->
76 State
77 ; {error, _}=Error ->
78 io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]),
79 % TODO: Maybe schedule retry?
80 ok = gen_tcp:close(Sock),
81 State#state{sock=none}
82 end.
83
84-spec try_to_connect_if_no_socket(state()) ->
85 state().
86try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
87 State;
1b0b4721
SK
88try_to_connect_if_no_socket(
89 #state
90 { sock = none
91 , host = Host
92 , port = Port
93 , timeout = Timeout
94 }=State
95) ->
caf75ed8
SK
96 case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout)
97 of {ok, Sock} ->
98 State#state{sock = {some, Sock}}
99 ; {error, _}=Error ->
100 io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]),
101 State#state{sock = none}
102 end.
103
104-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
105 binary().
106beam_stats_queue_to_binary(Q) ->
107 Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)],
108 iolist_to_binary(Bins).
109
110-spec beam_stats_to_bin(beam_stats:t()) ->
111 binary().
112beam_stats_to_bin(#beam_stats
113 { timestamp = Timestamp
114 , node_id = NodeID
115 , memory = Memory
116 }
117) ->
118 TimestampInt = timestamp_to_integer(Timestamp),
119 TimestampBin = integer_to_binary(TimestampInt),
120 <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
121 PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
122 MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
123 MemoryBins = lists:map(PairToBin, MemoryBinPairs),
124 AllBins =
125 [ MemoryBins
126 ],
127 iolist_to_binary(AllBins).
128
129-spec timestamp_to_integer(erlang:timestamp()) ->
130 non_neg_integer().
131timestamp_to_integer({Megaseconds, Seconds, _}) ->
132 Megaseconds * 1000000 + Seconds.
133
134-spec make_pair_to_bin(binary(), binary()) ->
135 fun(({binary(), binary()}) -> binary()).
136make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
137 fun ({<<K/binary>>, <<V/binary>>}) ->
138 << ?GRAPHITE_PATH_PREFIX
139 , "."
140 , NodeID/binary
141 , "."
142 , K/binary
143 , " "
144 , V/binary
145 , " "
146 , TimestampBin/binary
147 , "\n"
148 >>
149 end.
150
151-spec node_id_to_bin(node()) ->
152 binary().
153node_id_to_bin(NodeID) ->
154 NodeIDBin = atom_to_binary(NodeID, utf8),
155 re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
156
157-spec atom_int_to_bin_bin({atom(), integer()}) ->
158 {binary(), binary()}.
159atom_int_to_bin_bin({K, V}) ->
160 {atom_to_binary(K, latin1), integer_to_binary(V)}.
This page took 0.037605 seconds and 4 git commands to generate.