-module(beam_stats_consumer_graphite).
-include("include/beam_stats.hrl").
+-include("beam_stats_logging.hrl").
-behaviour(beam_stats_consumer).
-export_type(
- [ option/0
- , connect_option/0
+ [ option/0
]).
-export(
, terminate/1
]).
--type connect_option() ::
- {host , inet:ip_address() | inet:hostname()}
- | {port , inet:port_number()}
- | {timeout , timeout()}
- .
-
-type option() ::
{consumption_interval , non_neg_integer()}
- | {connect_options , [connect_option()]}
+ | {host , inet:ip_address() | inet:hostname()}
+ | {port , inet:port_number()}
+ | {timeout , timeout()}
.
-record(state,
- { connect_options = [] :: [connect_option()]
- , sock = none :: hope_option:t(Socket :: port())
+ { sock = none :: hope_option:t(Socket :: port())
+ , host :: inet:ip_address() | inet:hostname()
+ , port :: inet:port_number()
+ , timeout :: timeout()
}).
-type state() ::
#state{}.
-define(GRAPHITE_PATH_PREFIX, "beam_stats").
+-define(DEFAULT_HOST , "localhost").
+-define(DEFAULT_PORT , 2003).
+-define(DEFAULT_TIMEOUT , 5000).
-spec init([option()]) ->
{non_neg_integer(), state()}.
init(Options) ->
- ConnectOptions = hope_kv_list:get(Options, connect_options , []),
- ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
+ Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end,
+ ConsumptionInterval = Get(consumption_interval, 60000),
State = #state
- { connect_options = ConnectOptions
- , sock = none
+ { sock = none
+ , host = Get(host , ?DEFAULT_HOST)
+ , port = Get(port , ?DEFAULT_PORT)
+ , timeout = Get(timeout , ?DEFAULT_TIMEOUT)
},
{ConsumptionInterval, State}.
-spec try_to_send(state(), binary()) ->
state().
try_to_send(#state{sock=none}=State, _) ->
- io:format("error: socket closed~n"),
+ ?log_error("Sending failed. No socket in state."),
% TODO: Maybe schedule retry?
State;
try_to_send(#state{sock={some, Sock}}=State, Payload) ->
of ok ->
State
; {error, _}=Error ->
- io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]),
+ ?log_error("gen_tcp:send(~p, ~p) -> ~p", [Sock, Payload, Error]),
% TODO: Maybe schedule retry?
ok = gen_tcp:close(Sock),
State#state{sock=none}
state().
try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
State;
-try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) ->
- DefaultHost = "localhost",
- DefaultPort = 2003,
- DefaultTimeout = 5000,
- Host = hope_kv_list:get(Options, host , DefaultHost),
- Port = hope_kv_list:get(Options, port , DefaultPort),
- Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout),
- case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout)
+try_to_connect_if_no_socket(
+ #state
+ { sock = none
+ , host = Host
+ , port = Port
+ , timeout = Timeout
+ }=State
+) ->
+ Options = [binary, {active, false}],
+ case gen_tcp:connect(Host, Port, Options, Timeout)
of {ok, Sock} ->
State#state{sock = {some, Sock}}
; {error, _}=Error ->
- io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]),
+ ?log_error(
+ "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p",
+ [Host, Port, Options, Timeout, Error]
+ ),
State#state{sock = none}
end.
TimestampInt = timestamp_to_integer(Timestamp),
TimestampBin = integer_to_binary(TimestampInt),
<<NodeIDBin/binary>> = node_id_to_bin(NodeID),
- PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
+ MemoryPairToBin = make_pair_to_bin(NodeIDBin, TimestampBin, <<"memory">>),
MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
- MemoryBins = lists:map(PairToBin, MemoryBinPairs),
+ MemoryBins = lists:map(MemoryPairToBin, MemoryBinPairs),
AllBins =
[ MemoryBins
],
timestamp_to_integer({Megaseconds, Seconds, _}) ->
Megaseconds * 1000000 + Seconds.
--spec make_pair_to_bin(binary(), binary()) ->
+-spec make_pair_to_bin(binary(), binary(), binary()) ->
fun(({binary(), binary()}) -> binary()).
-make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
+make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>, <<Type/binary>>) ->
fun ({<<K/binary>>, <<V/binary>>}) ->
<< ?GRAPHITE_PATH_PREFIX
, "."
, NodeID/binary
, "."
+ , Type/binary
+ , "."
, K/binary
, " "
, V/binary