Fix unknown types.
[beam_stats.git] / src / beam_stats_consumer_graphite.erl
CommitLineData
caf75ed8
SK
1-module(beam_stats_consumer_graphite).
2
3-include("include/beam_stats.hrl").
4
5-behaviour(beam_stats_consumer).
6
7-export_type(
8 [ option/0
9 , connect_option/0
10 ]).
11
12-export(
13 [ init/1
14 , consume/2
15 , terminate/1
16 ]).
17
18-type connect_option() ::
19 {host , inet:ip_address() | inet:hostname()}
20 | {port , inet:port_number()}
21 | {timeout , timeout()}
22 .
23
24-type option() ::
25 {connect_options , [connect_option()]}
26 | {consumption_interval , erlang:time()}
27 .
28
29-record(state,
30 { connect_options = [] :: [connect_option()]
e7b39f07 31 , sock = none :: hope_option:t(Socket :: port())
caf75ed8
SK
32 }).
33
34-type state() ::
35 #state{}.
36
37-define(GRAPHITE_PATH_PREFIX, "beam_stats").
38
39-spec init([option()]) ->
40 {erlang:time(), state()}.
41init(Options) ->
42 ConnectOptions = hope_kv_list:get(Options, connect_options , []),
43 ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
44 State = #state
45 { connect_options = ConnectOptions
46 , sock = none
47 },
48 {ConsumptionInterval, State}.
49
50-spec consume(beam_stats_consumer:queue(), state()) ->
51 state().
52consume(Q, #state{}=State1) ->
53 Payload = beam_stats_queue_to_binary(Q),
54 State2 = try_to_connect_if_no_socket(State1),
55 try_to_send(State2, Payload).
56
57-spec terminate(state()) ->
58 {}.
59terminate(#state{sock=SockOpt}) ->
60 ok = hope_option:iter(SockOpt, fun gen_tcp:close/1),
61 {}.
62
63%% ============================================================================
64
65-spec try_to_send(state(), binary()) ->
66 state().
67try_to_send(#state{sock=none}=State, _) ->
68 io:format("error: socket closed~n"),
69 % TODO: Maybe schedule retry?
70 State;
71try_to_send(#state{sock={some, Sock}}=State, Payload) ->
72 case gen_tcp:send(Sock, Payload)
73 of ok ->
74 State
75 ; {error, _}=Error ->
76 io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]),
77 % TODO: Maybe schedule retry?
78 ok = gen_tcp:close(Sock),
79 State#state{sock=none}
80 end.
81
82-spec try_to_connect_if_no_socket(state()) ->
83 state().
84try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
85 State;
86try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) ->
87 DefaultHost = "localhost",
88 DefaultPort = 2003,
89 DefaultTimeout = 5000,
90 Host = hope_kv_list:get(Options, host , DefaultHost),
91 Port = hope_kv_list:get(Options, port , DefaultPort),
92 Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout),
93 case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout)
94 of {ok, Sock} ->
95 State#state{sock = {some, Sock}}
96 ; {error, _}=Error ->
97 io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]),
98 State#state{sock = none}
99 end.
100
101-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
102 binary().
103beam_stats_queue_to_binary(Q) ->
104 Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)],
105 iolist_to_binary(Bins).
106
107-spec beam_stats_to_bin(beam_stats:t()) ->
108 binary().
109beam_stats_to_bin(#beam_stats
110 { timestamp = Timestamp
111 , node_id = NodeID
112 , memory = Memory
113 }
114) ->
115 TimestampInt = timestamp_to_integer(Timestamp),
116 TimestampBin = integer_to_binary(TimestampInt),
117 <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
118 PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
119 MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
120 MemoryBins = lists:map(PairToBin, MemoryBinPairs),
121 AllBins =
122 [ MemoryBins
123 ],
124 iolist_to_binary(AllBins).
125
126-spec timestamp_to_integer(erlang:timestamp()) ->
127 non_neg_integer().
128timestamp_to_integer({Megaseconds, Seconds, _}) ->
129 Megaseconds * 1000000 + Seconds.
130
131-spec make_pair_to_bin(binary(), binary()) ->
132 fun(({binary(), binary()}) -> binary()).
133make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
134 fun ({<<K/binary>>, <<V/binary>>}) ->
135 << ?GRAPHITE_PATH_PREFIX
136 , "."
137 , NodeID/binary
138 , "."
139 , K/binary
140 , " "
141 , V/binary
142 , " "
143 , TimestampBin/binary
144 , "\n"
145 >>
146 end.
147
148-spec node_id_to_bin(node()) ->
149 binary().
150node_id_to_bin(NodeID) ->
151 NodeIDBin = atom_to_binary(NodeID, utf8),
152 re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
153
154-spec atom_int_to_bin_bin({atom(), integer()}) ->
155 {binary(), binary()}.
156atom_int_to_bin_bin({K, V}) ->
157 {atom_to_binary(K, latin1), integer_to_binary(V)}.
This page took 0.026005 seconds and 4 git commands to generate.