From 4d24f3b714a452a69f5567747241ad75d044422c Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Tue, 15 Sep 2015 17:07:02 -0400 Subject: [PATCH] Limit number of StatsD messages per packet. To a configurable amount. --- README.md | 1 + rebar.config | 2 +- src/beam_stats_consumer_statsd.erl | 27 +++++++++---- test/beam_stats_consumer_statsd_SUITE.erl | 49 ++++++++++++++++------- 4 files changed, 55 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 593d6a6..3a55536 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Essentially like `folsomite`, but different. Different in the following ways: , {dst_host , "localhost"} , {dst_port , 8125} , {src_port , 8124} + , {num_msgs_per_packet , 10} ]} , {beam_stats_consumer_graphite, [ {consumption_interval , 60000} diff --git a/rebar.config b/rebar.config index 8a4f3f1..f1816dc 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,5 @@ { deps -, [ {hope, ".*", {git, "https://github.com/ibnfirnas/hope.git", {tag, "3.7.0"}}} +, [ {hope, ".*", {git, "https://github.com/ibnfirnas/hope.git", {tag, "3.8.0"}}} ] }. diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index 112bb91..1ce0afb 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -22,6 +22,7 @@ | {dst_host , inet:ip_address() | inet:hostname()} | {dst_port , inet:port_number()} | {src_port , inet:port_number()} + | {num_msgs_per_packet , non_neg_integer()} . -define(DEFAULT_DST_HOST, "localhost"). @@ -46,6 +47,7 @@ , dst_host :: inet:ip_address() | inet:hostname() , dst_port :: inet:port_number() , src_port :: inet:port_number() + , num_msgs_per_packet :: non_neg_integer() }). -type state() :: @@ -64,20 +66,21 @@ init(Options) -> DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), + NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), State = #state { sock = none , dst_host = DstHost , dst_port = DstPort , src_port = SrcPort + , num_msgs_per_packet = NumMsgsPerPacket }, {ConsumptionInterval, State}. -spec consume(beam_stats_consumer:queue(), state()) -> state(). -consume(Q, #state{}=State1) -> - Payload = beam_stats_queue_to_binary(Q), - State2 = try_to_connect_if_no_socket(State1), - try_to_send(State2, Payload). +consume(Q, #state{num_msgs_per_packet=NumMsgsPerPacket}=State) -> + Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket), + lists:foldl(fun try_to_connect_and_send/2, State, Packets). -spec terminate(state()) -> {}. @@ -89,6 +92,12 @@ terminate(#state{sock=SockOpt}) -> %% Transport %% ============================================================================ +-spec try_to_connect_and_send(binary(), state()) -> + state(). +try_to_connect_and_send(<>, #state{}=State1) -> + State2 = try_to_connect_if_no_socket(State1), + try_to_send(State2, Payload). + -spec try_to_send(state(), binary()) -> state(). try_to_send(#state{sock=none}=State, _) -> @@ -133,10 +142,12 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> %% Serialization %% ============================================================================ --spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> - binary(). -beam_stats_queue_to_binary(Q) -> - iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). +-spec beam_stats_queue_to_packets(beam_stats_consumer:queue(), non_neg_integer()) -> + [binary()]. +beam_stats_queue_to_packets(Q, NumMsgsPerPacket) -> + MsgBins = lists:append([beam_stats_to_bins(B) || B <- queue:to_list(Q)]), + MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), + lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). -spec beam_stats_to_bins(beam_stats:t()) -> [binary()]. diff --git a/test/beam_stats_consumer_statsd_SUITE.erl b/test/beam_stats_consumer_statsd_SUITE.erl index dd01455..20a2de7 100644 --- a/test/beam_stats_consumer_statsd_SUITE.erl +++ b/test/beam_stats_consumer_statsd_SUITE.erl @@ -52,7 +52,7 @@ t_send(_Cfg) -> BEAMStats = #beam_stats { timestamp = {1, 2, 3} , node_id = 'node_foo@host_bar' - , memory = [{mem_type_foo, 1}] + , memory = [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}] , io_bytes_in = 3 , io_bytes_out = 7 , context_switches = 5 @@ -67,18 +67,37 @@ t_send(_Cfg) -> {_, State1} = beam_stats_consumer_statsd:init(Options), State2 = beam_stats_consumer_statsd:consume(BEAMStatsQ, State1), {} = beam_stats_consumer_statsd:terminate(State2), - ResultOfReceive = gen_udp:recv(ServerSocket, 0), + ResultOfReceive1 = gen_udp:recv(ServerSocket, 0), + {ok, {_, _, PacketReceived1}} = ResultOfReceive1, + ResultOfReceive2 = gen_udp:recv(ServerSocket, 0), + {ok, {_, _, PacketReceived2}} = ResultOfReceive2, ok = gen_udp:close(ServerSocket), - {ok, {_, _, Data}} = ResultOfReceive, - ct:log("Packet: ~n~s~n", [Data]), - << "beam_stats.node_foo_host_bar.io.bytes_in:3|g\n" - , "beam_stats.node_foo_host_bar.io.bytes_out:7|g\n" - , "beam_stats.node_foo_host_bar.context_switches:5|g\n" - , "beam_stats.node_foo_host_bar.reductions:9|g\n" - , "beam_stats.node_foo_host_bar.run_queue:17|g\n" - , "beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g\n" - , "beam_stats.node_foo_host_bar.ets_table.size.foo.foo:5|g\n" - , "beam_stats.node_foo_host_bar.ets_table.memory.foo.foo:25|g\n" - , "beam_stats.node_foo_host_bar.ets_table.size.bar.37:8|g\n" - , "beam_stats.node_foo_host_bar.ets_table.memory.bar.37:38|g\n" - >> = Data. + ct:log("PacketReceived1: ~n~s~n", [PacketReceived1]), + ct:log("PacketReceived2: ~n~s~n", [PacketReceived2]), + PacketsCombined = <>, + ct:log("PacketsCombined: ~n~s~n", [PacketsCombined]), + MsgsExpected = + [ <<"beam_stats.node_foo_host_bar.io.bytes_in:3|g">> + , <<"beam_stats.node_foo_host_bar.io.bytes_out:7|g">> + , <<"beam_stats.node_foo_host_bar.context_switches:5|g">> + , <<"beam_stats.node_foo_host_bar.reductions:9|g">> + , <<"beam_stats.node_foo_host_bar.run_queue:17|g">> + , <<"beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g">> + , <<"beam_stats.node_foo_host_bar.memory.mem_type_bar:2|g">> + , <<"beam_stats.node_foo_host_bar.memory.mem_type_baz:3|g">> + , <<"beam_stats.node_foo_host_bar.ets_table.size.foo.foo:5|g">> + , <<"beam_stats.node_foo_host_bar.ets_table.memory.foo.foo:25|g">> + , <<"beam_stats.node_foo_host_bar.ets_table.size.bar.37:8|g">> + , <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:38|g">> + ], + MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]), + RemoveExpectedFromReceived = + fun (Expected, Received) -> + ct:log( + "Looking for expected msg ~p in remaining received ~p~n", + [Expected, Received] + ), + true = lists:member(Expected, Received), + Received -- [Expected] + end, + [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected). -- 2.20.1