1 -module(beam_stats_consumer_graphite).
3 -include("include/beam_stats.hrl").
5 -behaviour(beam_stats_consumer).
18 -type connect_option() ::
19 {host , inet:ip_address() | inet:hostname()}
20 | {port , inet:port_number()}
21 | {timeout , timeout()}
25 {connect_options , [connect_option()]}
26 | {consumption_interval , erlang:time()}
30 { connect_options = [] :: [connect_option()]
31 , sock = none :: hope_option:t(gen_tcp:socket())
37 -define(GRAPHITE_PATH_PREFIX, "beam_stats").
39 -spec init([option()]) ->
40 {erlang:time(), state()}.
42 ConnectOptions = hope_kv_list:get(Options, connect_options , []),
43 ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
45 { connect_options = ConnectOptions
48 {ConsumptionInterval, State}.
50 -spec consume(beam_stats_consumer:queue(), state()) ->
52 consume(Q, #state{}=State1) ->
53 Payload = beam_stats_queue_to_binary(Q),
54 State2 = try_to_connect_if_no_socket(State1),
55 try_to_send(State2, Payload).
57 -spec terminate(state()) ->
59 terminate(#state{sock=SockOpt}) ->
60 ok = hope_option:iter(SockOpt, fun gen_tcp:close/1),
63 %% ============================================================================
65 -spec try_to_send(state(), binary()) ->
67 try_to_send(#state{sock=none}=State, _) ->
68 io:format("error: socket closed~n"),
69 % TODO: Maybe schedule retry?
71 try_to_send(#state{sock={some, Sock}}=State, Payload) ->
72 case gen_tcp:send(Sock, Payload)
76 io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]),
77 % TODO: Maybe schedule retry?
78 ok = gen_tcp:close(Sock),
79 State#state{sock=none}
82 -spec try_to_connect_if_no_socket(state()) ->
84 try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
86 try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) ->
87 DefaultHost = "localhost",
89 DefaultTimeout = 5000,
90 Host = hope_kv_list:get(Options, host , DefaultHost),
91 Port = hope_kv_list:get(Options, port , DefaultPort),
92 Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout),
93 case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout)
95 State#state{sock = {some, Sock}}
97 io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]),
98 State#state{sock = none}
101 -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
103 beam_stats_queue_to_binary(Q) ->
104 Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)],
105 iolist_to_binary(Bins).
107 -spec beam_stats_to_bin(beam_stats:t()) ->
109 beam_stats_to_bin(#beam_stats
110 { timestamp = Timestamp
115 TimestampInt = timestamp_to_integer(Timestamp),
116 TimestampBin = integer_to_binary(TimestampInt),
117 <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
118 PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
119 MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
120 MemoryBins = lists:map(PairToBin, MemoryBinPairs),
124 iolist_to_binary(AllBins).
126 -spec timestamp_to_integer(erlang:timestamp()) ->
128 timestamp_to_integer({Megaseconds, Seconds, _}) ->
129 Megaseconds * 1000000 + Seconds.
131 -spec make_pair_to_bin(binary(), binary()) ->
132 fun(({binary(), binary()}) -> binary()).
133 make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
134 fun ({<<K/binary>>, <<V/binary>>}) ->
135 << ?GRAPHITE_PATH_PREFIX
143 , TimestampBin/binary
148 -spec node_id_to_bin(node()) ->
150 node_id_to_bin(NodeID) ->
151 NodeIDBin = atom_to_binary(NodeID, utf8),
152 re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
154 -spec atom_int_to_bin_bin({atom(), integer()}) ->
155 {binary(), binary()}.
156 atom_int_to_bin_bin({K, V}) ->
157 {atom_to_binary(K, latin1), integer_to_binary(V)}.