X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_consumer_graphite.erl;h=6e0d5dd39ef019b4b148004c235dcbcd1adf9d94;hb=2086842d3832286088d0ca9646b23e3bfd50553c;hp=c2847f0a30b10176e68823d6fdec9ff7dab11ac7;hpb=76aefffb4a92500ad4664d120b5ef2fec80e7988;p=beam_stats.git diff --git a/src/beam_stats_consumer_graphite.erl b/src/beam_stats_consumer_graphite.erl index c2847f0..6e0d5dd 100644 --- a/src/beam_stats_consumer_graphite.erl +++ b/src/beam_stats_consumer_graphite.erl @@ -1,12 +1,12 @@ -module(beam_stats_consumer_graphite). -include("include/beam_stats.hrl"). +-include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). -export_type( - [ option/0 - , connect_option/0 + [ option/0 ]). -export( @@ -15,35 +15,38 @@ , terminate/1 ]). --type connect_option() :: - {host , inet:ip_address() | inet:hostname()} - | {port , inet:port_number()} - | {timeout , timeout()} - . - -type option() :: - {connect_options , [connect_option()]} - | {consumption_interval , non_neg_integer()} + {consumption_interval , non_neg_integer()} + | {host , inet:ip_address() | inet:hostname()} + | {port , inet:port_number()} + | {timeout , timeout()} . -record(state, - { connect_options = [] :: [connect_option()] - , sock = none :: hope_option:t(Socket :: port()) + { sock = none :: hope_option:t(Socket :: port()) + , host :: inet:ip_address() | inet:hostname() + , port :: inet:port_number() + , timeout :: timeout() }). -type state() :: #state{}. -define(GRAPHITE_PATH_PREFIX, "beam_stats"). +-define(DEFAULT_HOST , "localhost"). +-define(DEFAULT_PORT , 2003). +-define(DEFAULT_TIMEOUT , 5000). -spec init([option()]) -> {non_neg_integer(), state()}. init(Options) -> - ConnectOptions = hope_kv_list:get(Options, connect_options , []), - ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), + Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end, + ConsumptionInterval = Get(consumption_interval, 60000), State = #state - { connect_options = ConnectOptions - , sock = none + { sock = none + , host = Get(host , ?DEFAULT_HOST) + , port = Get(port , ?DEFAULT_PORT) + , timeout = Get(timeout , ?DEFAULT_TIMEOUT) }, {ConsumptionInterval, State}. @@ -65,7 +68,7 @@ terminate(#state{sock=SockOpt}) -> -spec try_to_send(state(), binary()) -> state(). try_to_send(#state{sock=none}=State, _) -> - io:format("error: socket closed~n"), + ?log_error("Sending failed. No socket in state."), % TODO: Maybe schedule retry? State; try_to_send(#state{sock={some, Sock}}=State, Payload) -> @@ -73,7 +76,7 @@ try_to_send(#state{sock={some, Sock}}=State, Payload) -> of ok -> State ; {error, _}=Error -> - io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]), + ?log_error("gen_tcp:send(~p, ~p) -> ~p", [Sock, Payload, Error]), % TODO: Maybe schedule retry? ok = gen_tcp:close(Sock), State#state{sock=none} @@ -83,18 +86,23 @@ try_to_send(#state{sock={some, Sock}}=State, Payload) -> state(). try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> State; -try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) -> - DefaultHost = "localhost", - DefaultPort = 2003, - DefaultTimeout = 5000, - Host = hope_kv_list:get(Options, host , DefaultHost), - Port = hope_kv_list:get(Options, port , DefaultPort), - Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout), - case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout) +try_to_connect_if_no_socket( + #state + { sock = none + , host = Host + , port = Port + , timeout = Timeout + }=State +) -> + Options = [binary, {active, false}], + case gen_tcp:connect(Host, Port, Options, Timeout) of {ok, Sock} -> State#state{sock = {some, Sock}} ; {error, _}=Error -> - io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]), + ?log_error( + "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p", + [Host, Port, Options, Timeout, Error] + ), State#state{sock = none} end. @@ -115,9 +123,9 @@ beam_stats_to_bin(#beam_stats TimestampInt = timestamp_to_integer(Timestamp), TimestampBin = integer_to_binary(TimestampInt), <> = node_id_to_bin(NodeID), - PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin), + MemoryPairToBin = make_pair_to_bin(NodeIDBin, TimestampBin, <<"memory">>), MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory), - MemoryBins = lists:map(PairToBin, MemoryBinPairs), + MemoryBins = lists:map(MemoryPairToBin, MemoryBinPairs), AllBins = [ MemoryBins ], @@ -128,14 +136,16 @@ beam_stats_to_bin(#beam_stats timestamp_to_integer({Megaseconds, Seconds, _}) -> Megaseconds * 1000000 + Seconds. --spec make_pair_to_bin(binary(), binary()) -> +-spec make_pair_to_bin(binary(), binary(), binary()) -> fun(({binary(), binary()}) -> binary()). -make_pair_to_bin(<>, <>) -> +make_pair_to_bin(<>, <>, <>) -> fun ({<>, <>}) -> << ?GRAPHITE_PATH_PREFIX , "." , NodeID/binary , "." + , Type/binary + , "." , K/binary , " " , V/binary