X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_consumer_graphite.erl;h=036f6c9840f28bc21f937640ac22f12c32a0850a;hb=1aa506ead33e356c87e1b0b3cbafaeaf62086e61;hp=85c002d17b4de8a6b736b2ea0bf74e2416658e34;hpb=b2f78fc6fd026053e95fe2bf35278295f1f15609;p=beam_stats.git diff --git a/src/beam_stats_consumer_graphite.erl b/src/beam_stats_consumer_graphite.erl index 85c002d..036f6c9 100644 --- a/src/beam_stats_consumer_graphite.erl +++ b/src/beam_stats_consumer_graphite.erl @@ -1,12 +1,12 @@ -module(beam_stats_consumer_graphite). -include("include/beam_stats.hrl"). +-include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). -export_type( - [ option/0 - , connect_option/0 + [ option/0 ]). -export( @@ -15,35 +15,38 @@ , terminate/1 ]). --type connect_option() :: - {host , inet:ip_address() | inet:hostname()} - | {port , inet:port_number()} - | {timeout , timeout()} - . - -type option() :: {consumption_interval , non_neg_integer()} - | {connect_options , [connect_option()]} + | {host , inet:ip_address() | inet:hostname()} + | {port , inet:port_number()} + | {timeout , timeout()} . -record(state, - { connect_options = [] :: [connect_option()] - , sock = none :: hope_option:t(Socket :: port()) + { sock = none :: hope_option:t(Socket :: port()) + , host :: inet:ip_address() | inet:hostname() + , port :: inet:port_number() + , timeout :: timeout() }). -type state() :: #state{}. --define(GRAPHITE_PATH_PREFIX, "beam_stats"). +-define(PATH_PREFIX , <<"beam_stats">>). +-define(DEFAULT_HOST , "localhost"). +-define(DEFAULT_PORT , 2003). +-define(DEFAULT_TIMEOUT , 5000). -spec init([option()]) -> {non_neg_integer(), state()}. init(Options) -> - ConnectOptions = hope_kv_list:get(Options, connect_options , []), - ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), + Get = fun (Key, Default) -> hope_kv_list:get(Options, Key, Default) end, + ConsumptionInterval = Get(consumption_interval, 60000), State = #state - { connect_options = ConnectOptions - , sock = none + { sock = none + , host = Get(host , ?DEFAULT_HOST) + , port = Get(port , ?DEFAULT_PORT) + , timeout = Get(timeout , ?DEFAULT_TIMEOUT) }, {ConsumptionInterval, State}. @@ -65,7 +68,7 @@ terminate(#state{sock=SockOpt}) -> -spec try_to_send(state(), binary()) -> state(). try_to_send(#state{sock=none}=State, _) -> - io:format("error: socket closed~n"), + ?log_error("Sending failed. No socket in state."), % TODO: Maybe schedule retry? State; try_to_send(#state{sock={some, Sock}}=State, Payload) -> @@ -73,7 +76,7 @@ try_to_send(#state{sock={some, Sock}}=State, Payload) -> of ok -> State ; {error, _}=Error -> - io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]), + ?log_error("gen_tcp:send(~p, ~p) -> ~p", [Sock, Payload, Error]), % TODO: Maybe schedule retry? ok = gen_tcp:close(Sock), State#state{sock=none} @@ -83,75 +86,37 @@ try_to_send(#state{sock={some, Sock}}=State, Payload) -> state(). try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> State; -try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) -> - DefaultHost = "localhost", - DefaultPort = 2003, - DefaultTimeout = 5000, - Host = hope_kv_list:get(Options, host , DefaultHost), - Port = hope_kv_list:get(Options, port , DefaultPort), - Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout), - case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout) +try_to_connect_if_no_socket( + #state + { sock = none + , host = Host + , port = Port + , timeout = Timeout + }=State +) -> + Options = [binary, {active, false}], + case gen_tcp:connect(Host, Port, Options, Timeout) of {ok, Sock} -> State#state{sock = {some, Sock}} ; {error, _}=Error -> - io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]), + ?log_error( + "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p", + [Host, Port, Options, Timeout, Error] + ), State#state{sock = none} end. -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> binary(). beam_stats_queue_to_binary(Q) -> - Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)], + Bins = [beam_stats_to_bins(B) || B <- queue:to_list(Q)], iolist_to_binary(Bins). --spec beam_stats_to_bin(beam_stats:t()) -> - binary(). -beam_stats_to_bin(#beam_stats - { timestamp = Timestamp - , node_id = NodeID - , memory = Memory - } -) -> - TimestampInt = timestamp_to_integer(Timestamp), - TimestampBin = integer_to_binary(TimestampInt), - <> = node_id_to_bin(NodeID), - PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin), - MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory), - MemoryBins = lists:map(PairToBin, MemoryBinPairs), - AllBins = - [ MemoryBins - ], - iolist_to_binary(AllBins). - --spec timestamp_to_integer(erlang:timestamp()) -> - non_neg_integer(). -timestamp_to_integer({Megaseconds, Seconds, _}) -> - Megaseconds * 1000000 + Seconds. - --spec make_pair_to_bin(binary(), binary()) -> - fun(({binary(), binary()}) -> binary()). -make_pair_to_bin(<>, <>) -> - fun ({<>, <>}) -> - << ?GRAPHITE_PATH_PREFIX - , "." - , NodeID/binary - , "." - , K/binary - , " " - , V/binary - , " " - , TimestampBin/binary - , "\n" - >> - end. - --spec node_id_to_bin(node()) -> - binary(). -node_id_to_bin(NodeID) -> - NodeIDBin = atom_to_binary(NodeID, utf8), - re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). - --spec atom_int_to_bin_bin({atom(), integer()}) -> - {binary(), binary()}. -atom_int_to_bin_bin({K, V}) -> - {atom_to_binary(K, latin1), integer_to_binary(V)}. +-spec beam_stats_to_bins(beam_stats:t()) -> + [binary()]. +beam_stats_to_bins(#beam_stats{}=BeamStats) -> + MsgAddPrefix = + fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end, + Msgs1 = beam_stats_msg_graphite:of_beam_stats(BeamStats), + Msgs2 = lists:map(MsgAddPrefix, Msgs1), + lists:map(fun beam_stats_msg_graphite:to_bin/1, Msgs2).