X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_consumer_statsd.erl;h=06945e1867dd14c245e9898c27c0436ea7572d5f;hb=ece99ea366e3bc8720528aaf5e1e1563e85b561c;hp=cfd6cbf828ac3a112a6e255482b4ed3b53ce6df4;hpb=7093966438a142f4a65615a3d242482e074c747e;p=beam_stats.git diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index cfd6cbf..06945e1 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -1,6 +1,7 @@ -module(beam_stats_consumer_statsd). -include("include/beam_stats.hrl"). +-include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). @@ -8,6 +9,7 @@ [ option/0 ]). +%% Consumer interface -export( [ init/1 , consume/2 @@ -15,66 +17,66 @@ ]). -type option() :: - {consumption_interval , erlang:time()} + {consumption_interval , non_neg_integer()} | {dst_host , inet:ip_address() | inet:hostname()} | {dst_port , inet:port_number()} | {src_port , inet:port_number()} + | {num_msgs_per_packet , non_neg_integer()} + | {static_node_name , binary()} . -define(DEFAULT_DST_HOST, "localhost"). -define(DEFAULT_DST_PORT, 8125). -define(DEFAULT_SRC_PORT, 8124). --type metric_type() :: - % TODO: Add other metric types - gauge. - --record(statsd_msg, - { name :: binary() - , value :: non_neg_integer() - , type :: metric_type() - }). - --type statsd_msg() :: - #statsd_msg{}. - -record(state, { sock :: hope_option:t(gen_udp:socket()) , dst_host :: inet:ip_address() | inet:hostname() , dst_port :: inet:port_number() , src_port :: inet:port_number() + , num_msgs_per_packet :: non_neg_integer() + , static_node_name :: hope_option:t(binary()) }). -type state() :: #state{}. --define(PATH_PREFIX, "beam_stats"). +-define(PATH_PREFIX, <<"beam_stats">>). %% ============================================================================ %% Consumer implementation %% ============================================================================ -spec init([option()]) -> - {erlang:time(), state()}. + {non_neg_integer(), state()}. init(Options) -> ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), + NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), + StaticNodeNameOpt = hope_kv_list:get(Options, static_node_name), State = #state { sock = none , dst_host = DstHost , dst_port = DstPort , src_port = SrcPort + , num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt }, {ConsumptionInterval, State}. -spec consume(beam_stats_consumer:queue(), state()) -> state(). -consume(Q, #state{}=State1) -> - Payload = beam_stats_queue_to_binary(Q), - State2 = try_to_connect_if_no_socket(State1), - try_to_send(State2, Payload). +consume( + Q, + #state + { num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt + }=State +) -> + Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt), + lists:foldl(fun try_to_connect_and_send/2, State, Packets). -spec terminate(state()) -> {}. @@ -86,10 +88,16 @@ terminate(#state{sock=SockOpt}) -> %% Transport %% ============================================================================ +-spec try_to_connect_and_send(binary(), state()) -> + state(). +try_to_connect_and_send(<>, #state{}=State1) -> + State2 = try_to_connect_if_no_socket(State1), + try_to_send(State2, Payload). + -spec try_to_send(state(), binary()) -> state(). try_to_send(#state{sock=none}=State, _) -> - io:format("error: socket closed~n"), + ?log_error("Sending failed. No socket in state."), % TODO: Maybe schedule retry? State; try_to_send( @@ -104,7 +112,10 @@ try_to_send( of ok -> State ; {error, _}=Error -> - io:format("error: gen_udp:send/4 failed: ~p~n", [Error]), + ?log_error( + "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", + [Sock, DstHost, DstPort, Error] + ), % TODO: Do something with unsent messages? ok = gen_udp:close(Sock), State#state{sock=none} @@ -119,7 +130,7 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> of {ok, Sock} -> State#state{sock = {some, Sock}} ; {error, _}=Error -> - io:format("error: gen_udp:open/1 failed: ~p~n", [Error]), + ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), State#state{sock = none} end. @@ -127,70 +138,26 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> %% Serialization %% ============================================================================ --spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> - binary(). -beam_stats_queue_to_binary(Q) -> - iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). +-spec beam_stats_queue_to_packets( + beam_stats_consumer:queue(), + non_neg_integer(), + hope_option:t(binary()) +) -> + [binary()]. +beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> + MsgBins = lists:append([beam_stats_to_bins(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), + MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), + lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). --spec beam_stats_to_bins(beam_stats:t()) -> +-spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) -> [binary()]. -beam_stats_to_bins(#beam_stats - { node_id = NodeID - , memory = Memory - } -) -> - NodeIDBin = node_id_to_bin(NodeID), - Msgs1 = memory_to_msgs(Memory), - Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], - [statsd_msg_to_bin(M) || M <- Msgs2]. - --spec memory_to_msgs(erlang:memory()) -> - [statsd_msg()]. -memory_to_msgs(Memory) -> - [memory_component_to_statsd_msg(MC) || MC <- Memory]. - --spec memory_component_to_statsd_msg({erlang:memory_type(), non_neg_integer()}) -> - statsd_msg(). -memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> - #statsd_msg - { name = atom_to_binary(MemType, latin1) - , value = MemSize - , type = gauge - }. - --spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> - statsd_msg(). -statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <>) -> - Prefix = <>, - Name2 = <>, - Msg#statsd_msg{name=Name2}. - --spec statsd_msg_to_bin(statsd_msg()) -> - binary(). -statsd_msg_to_bin( - #statsd_msg - { name = <> - , value = Value - , type = Type = gauge - } -) when Value >= 0 -> - TypeBin = metric_type_to_bin(Type), - ValueBin = integer_to_binary(Value), - << Name/binary - , ":" - , ValueBin/binary - , "|" - , TypeBin/binary - , "\n" - >>. - --spec metric_type_to_bin(metric_type()) -> - binary(). -metric_type_to_bin(gauge) -> - <<"g">>. - --spec node_id_to_bin(node()) -> - binary(). -node_id_to_bin(NodeID) -> - NodeIDBin = atom_to_binary(NodeID, utf8), - re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). +beam_stats_to_bins(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) -> + NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID), + NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault), + GraphiteMsgAddPrefix = + fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end, + MsgsGraphite1 = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin), + MsgsGraphite2 = lists:map(GraphiteMsgAddPrefix, MsgsGraphite1), + MsgsStatsD = + lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite2), + lists:map(fun beam_stats_msg_statsd_gauge:to_bin/1, MsgsStatsD).