-module(beam_stats_consumer_graphite).
-include("include/beam_stats.hrl").
+-include("beam_stats_logging.hrl").
-behaviour(beam_stats_consumer).
-export_type(
- [ option/0
- , connect_option/0
+ [ option/0
]).
-export(
, terminate/1
]).
--type connect_option() ::
- {host , inet:ip_address() | inet:hostname()}
- | {port , inet:port_number()}
- | {timeout , timeout()}
- .
-
-type option() ::
- {connect_options , [connect_option()]}
- | {consumption_interval , erlang:time()}
+ {consumption_interval , non_neg_integer()}
+ | {host , inet:ip_address() | inet:hostname()}
+ | {port , inet:port_number()}
+ | {timeout , timeout()}
.
-record(state,
- { connect_options = [] :: [connect_option()]
- , sock = none :: hope_option:t(Socket :: port())
+ { sock = none :: hope_option:t(Socket :: port())
+ , host :: inet:ip_address() | inet:hostname()
+ , port :: inet:port_number()
+ , timeout :: timeout()
}).
-type state() ::
#state{}.
--define(GRAPHITE_PATH_PREFIX, "beam_stats").
+-define(DEFAULT_HOST , "localhost").
+-define(DEFAULT_PORT , 2003).
+-define(DEFAULT_TIMEOUT , 5000).
-spec init([option()]) ->
- {erlang:time(), state()}.
+ {non_neg_integer(), state()}.
init(Options) ->
- ConnectOptions = hope_kv_list:get(Options, connect_options , []),
- ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
+ Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end,
+ ConsumptionInterval = Get(consumption_interval, 60000),
State = #state
- { connect_options = ConnectOptions
- , sock = none
+ { sock = none
+ , host = Get(host , ?DEFAULT_HOST)
+ , port = Get(port , ?DEFAULT_PORT)
+ , timeout = Get(timeout , ?DEFAULT_TIMEOUT)
},
{ConsumptionInterval, State}.
-spec try_to_send(state(), binary()) ->
state().
try_to_send(#state{sock=none}=State, _) ->
- io:format("error: socket closed~n"),
+ ?log_error("Sending failed. No socket in state."),
% TODO: Maybe schedule retry?
State;
try_to_send(#state{sock={some, Sock}}=State, Payload) ->
of ok ->
State
; {error, _}=Error ->
- io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]),
+ ?log_error("gen_tcp:send(~p, ~p) -> ~p", [Sock, Payload, Error]),
% TODO: Maybe schedule retry?
ok = gen_tcp:close(Sock),
State#state{sock=none}
state().
try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
State;
-try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) ->
- DefaultHost = "localhost",
- DefaultPort = 2003,
- DefaultTimeout = 5000,
- Host = hope_kv_list:get(Options, host , DefaultHost),
- Port = hope_kv_list:get(Options, port , DefaultPort),
- Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout),
- case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout)
+try_to_connect_if_no_socket(
+ #state
+ { sock = none
+ , host = Host
+ , port = Port
+ , timeout = Timeout
+ }=State
+) ->
+ Options = [binary, {active, false}],
+ case gen_tcp:connect(Host, Port, Options, Timeout)
of {ok, Sock} ->
State#state{sock = {some, Sock}}
; {error, _}=Error ->
- io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]),
+ ?log_error(
+ "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p",
+ [Host, Port, Options, Timeout, Error]
+ ),
State#state{sock = none}
end.
-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
binary().
beam_stats_queue_to_binary(Q) ->
- Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)],
+ Bins = [beam_stats_to_bins(B) || B <- queue:to_list(Q)],
iolist_to_binary(Bins).
--spec beam_stats_to_bin(beam_stats:t()) ->
- binary().
-beam_stats_to_bin(#beam_stats
- { timestamp = Timestamp
- , node_id = NodeID
- , memory = Memory
- }
-) ->
- TimestampInt = timestamp_to_integer(Timestamp),
- TimestampBin = integer_to_binary(TimestampInt),
- <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
- PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
- MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
- MemoryBins = lists:map(PairToBin, MemoryBinPairs),
- AllBins =
- [ MemoryBins
- ],
- iolist_to_binary(AllBins).
-
--spec timestamp_to_integer(erlang:timestamp()) ->
- non_neg_integer().
-timestamp_to_integer({Megaseconds, Seconds, _}) ->
- Megaseconds * 1000000 + Seconds.
-
--spec make_pair_to_bin(binary(), binary()) ->
- fun(({binary(), binary()}) -> binary()).
-make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
- fun ({<<K/binary>>, <<V/binary>>}) ->
- << ?GRAPHITE_PATH_PREFIX
- , "."
- , NodeID/binary
- , "."
- , K/binary
- , " "
- , V/binary
- , " "
- , TimestampBin/binary
- , "\n"
- >>
- end.
-
--spec node_id_to_bin(node()) ->
- binary().
-node_id_to_bin(NodeID) ->
- NodeIDBin = atom_to_binary(NodeID, utf8),
- re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
-
--spec atom_int_to_bin_bin({atom(), integer()}) ->
- {binary(), binary()}.
-atom_int_to_bin_bin({K, V}) ->
- {atom_to_binary(K, latin1), integer_to_binary(V)}.
+-spec beam_stats_to_bins(beam_stats:t()) ->
+ [binary()].
+beam_stats_to_bins(#beam_stats{}=BeamStats) ->
+ Msgs = beam_stats_msg_graphite:of_beam_stats(BeamStats),
+ lists:map(fun beam_stats_msg_graphite:to_bin/1, Msgs).