Include the metric type in it's name.
[beam_stats.git] / src / beam_stats_consumer_graphite.erl
1 -module(beam_stats_consumer_graphite).
2
3 -include("include/beam_stats.hrl").
4 -include("beam_stats_logging.hrl").
5
6 -behaviour(beam_stats_consumer).
7
8 -export_type(
9 [ option/0
10 ]).
11
12 -export(
13 [ init/1
14 , consume/2
15 , terminate/1
16 ]).
17
18 -type option() ::
19 {consumption_interval , non_neg_integer()}
20 | {host , inet:ip_address() | inet:hostname()}
21 | {port , inet:port_number()}
22 | {timeout , timeout()}
23 .
24
25 -record(state,
26 { sock = none :: hope_option:t(Socket :: port())
27 , host :: inet:ip_address() | inet:hostname()
28 , port :: inet:port_number()
29 , timeout :: timeout()
30 }).
31
32 -type state() ::
33 #state{}.
34
35 -define(GRAPHITE_PATH_PREFIX, "beam_stats").
36 -define(DEFAULT_HOST , "localhost").
37 -define(DEFAULT_PORT , 2003).
38 -define(DEFAULT_TIMEOUT , 5000).
39
40 -spec init([option()]) ->
41 {non_neg_integer(), state()}.
42 init(Options) ->
43 Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end,
44 ConsumptionInterval = Get(consumption_interval, 60000),
45 State = #state
46 { sock = none
47 , host = Get(host , ?DEFAULT_HOST)
48 , port = Get(port , ?DEFAULT_PORT)
49 , timeout = Get(timeout , ?DEFAULT_TIMEOUT)
50 },
51 {ConsumptionInterval, State}.
52
53 -spec consume(beam_stats_consumer:queue(), state()) ->
54 state().
55 consume(Q, #state{}=State1) ->
56 Payload = beam_stats_queue_to_binary(Q),
57 State2 = try_to_connect_if_no_socket(State1),
58 try_to_send(State2, Payload).
59
60 -spec terminate(state()) ->
61 {}.
62 terminate(#state{sock=SockOpt}) ->
63 ok = hope_option:iter(SockOpt, fun gen_tcp:close/1),
64 {}.
65
66 %% ============================================================================
67
68 -spec try_to_send(state(), binary()) ->
69 state().
70 try_to_send(#state{sock=none}=State, _) ->
71 ?log_error("Sending failed. No socket in state."),
72 % TODO: Maybe schedule retry?
73 State;
74 try_to_send(#state{sock={some, Sock}}=State, Payload) ->
75 case gen_tcp:send(Sock, Payload)
76 of ok ->
77 State
78 ; {error, _}=Error ->
79 ?log_error("gen_tcp:send(~p, ~p) -> ~p", [Sock, Payload, Error]),
80 % TODO: Maybe schedule retry?
81 ok = gen_tcp:close(Sock),
82 State#state{sock=none}
83 end.
84
85 -spec try_to_connect_if_no_socket(state()) ->
86 state().
87 try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
88 State;
89 try_to_connect_if_no_socket(
90 #state
91 { sock = none
92 , host = Host
93 , port = Port
94 , timeout = Timeout
95 }=State
96 ) ->
97 Options = [binary, {active, false}],
98 case gen_tcp:connect(Host, Port, Options, Timeout)
99 of {ok, Sock} ->
100 State#state{sock = {some, Sock}}
101 ; {error, _}=Error ->
102 ?log_error(
103 "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p",
104 [Host, Port, Options, Timeout, Error]
105 ),
106 State#state{sock = none}
107 end.
108
109 -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
110 binary().
111 beam_stats_queue_to_binary(Q) ->
112 Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)],
113 iolist_to_binary(Bins).
114
115 -spec beam_stats_to_bin(beam_stats:t()) ->
116 binary().
117 beam_stats_to_bin(#beam_stats
118 { timestamp = Timestamp
119 , node_id = NodeID
120 , memory = Memory
121 }
122 ) ->
123 TimestampInt = timestamp_to_integer(Timestamp),
124 TimestampBin = integer_to_binary(TimestampInt),
125 <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
126 MemoryPairToBin = make_pair_to_bin(NodeIDBin, TimestampBin, <<"memory">>),
127 MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
128 MemoryBins = lists:map(MemoryPairToBin, MemoryBinPairs),
129 AllBins =
130 [ MemoryBins
131 ],
132 iolist_to_binary(AllBins).
133
134 -spec timestamp_to_integer(erlang:timestamp()) ->
135 non_neg_integer().
136 timestamp_to_integer({Megaseconds, Seconds, _}) ->
137 Megaseconds * 1000000 + Seconds.
138
139 -spec make_pair_to_bin(binary(), binary(), binary()) ->
140 fun(({binary(), binary()}) -> binary()).
141 make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>, <<Type/binary>>) ->
142 fun ({<<K/binary>>, <<V/binary>>}) ->
143 << ?GRAPHITE_PATH_PREFIX
144 , "."
145 , NodeID/binary
146 , "."
147 , Type/binary
148 , "."
149 , K/binary
150 , " "
151 , V/binary
152 , " "
153 , TimestampBin/binary
154 , "\n"
155 >>
156 end.
157
158 -spec node_id_to_bin(node()) ->
159 binary().
160 node_id_to_bin(NodeID) ->
161 NodeIDBin = atom_to_binary(NodeID, utf8),
162 re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
163
164 -spec atom_int_to_bin_bin({atom(), integer()}) ->
165 {binary(), binary()}.
166 atom_int_to_bin_bin({K, V}) ->
167 {atom_to_binary(K, latin1), integer_to_binary(V)}.
This page took 0.082892 seconds and 4 git commands to generate.