Limit number of StatsD messages per packet.
[beam_stats.git] / src / beam_stats_consumer_statsd.erl
index 112bb91..1ce0afb 100644 (file)
@@ -22,6 +22,7 @@
     | {dst_host             , inet:ip_address() | inet:hostname()}
     | {dst_port             , inet:port_number()}
     | {src_port             , inet:port_number()}
+    | {num_msgs_per_packet  , non_neg_integer()}
     .
 
 -define(DEFAULT_DST_HOST, "localhost").
@@ -46,6 +47,7 @@
     , dst_host :: inet:ip_address() | inet:hostname()
     , dst_port :: inet:port_number()
     , src_port :: inet:port_number()
+    , num_msgs_per_packet :: non_neg_integer()
     }).
 
 -type state() ::
@@ -64,20 +66,21 @@ init(Options) ->
     DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST),
     DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT),
     SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT),
+    NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10),
     State = #state
         { sock     = none
         , dst_host = DstHost
         , dst_port = DstPort
         , src_port = SrcPort
+        , num_msgs_per_packet = NumMsgsPerPacket
         },
     {ConsumptionInterval, State}.
 
 -spec consume(beam_stats_consumer:queue(), state()) ->
     state().
-consume(Q, #state{}=State1) ->
-    Payload = beam_stats_queue_to_binary(Q),
-    State2 = try_to_connect_if_no_socket(State1),
-    try_to_send(State2, Payload).
+consume(Q, #state{num_msgs_per_packet=NumMsgsPerPacket}=State) ->
+    Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket),
+    lists:foldl(fun try_to_connect_and_send/2, State, Packets).
 
 -spec terminate(state()) ->
     {}.
@@ -89,6 +92,12 @@ terminate(#state{sock=SockOpt}) ->
 %% Transport
 %% ============================================================================
 
+-spec try_to_connect_and_send(binary(), state()) ->
+    state().
+try_to_connect_and_send(<<Payload/binary>>, #state{}=State1) ->
+    State2 = try_to_connect_if_no_socket(State1),
+    try_to_send(State2, Payload).
+
 -spec try_to_send(state(), binary()) ->
     state().
 try_to_send(#state{sock=none}=State, _) ->
@@ -133,10 +142,12 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) ->
 %% Serialization
 %% ============================================================================
 
--spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
-    binary().
-beam_stats_queue_to_binary(Q) ->
-    iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]).
+-spec beam_stats_queue_to_packets(beam_stats_consumer:queue(), non_neg_integer()) ->
+    [binary()].
+beam_stats_queue_to_packets(Q, NumMsgsPerPacket) ->
+    MsgBins = lists:append([beam_stats_to_bins(B) || B <- queue:to_list(Q)]),
+    MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket),
+    lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks).
 
 -spec beam_stats_to_bins(beam_stats:t()) ->
     [binary()].
This page took 0.028427 seconds and 4 git commands to generate.