X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_consumer_statsd.erl;h=1ce0afb0ec40e834ab1f1be6da7dbbc5ce03d92a;hb=8c0788b23299b3d09f255cdfb1932f10da3545a2;hp=70ed8dcd8cb5ae557e1235ea869d68d79721b8bc;hpb=5681263ff2c3c3336fbc501c31365f452039c394;p=beam_stats.git diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index 70ed8dc..1ce0afb 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -1,6 +1,7 @@ -module(beam_stats_consumer_statsd). -include("include/beam_stats.hrl"). +-include("include/beam_stats_ets_table.hrl"). -include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). @@ -21,6 +22,7 @@ | {dst_host , inet:ip_address() | inet:hostname()} | {dst_port , inet:port_number()} | {src_port , inet:port_number()} + | {num_msgs_per_packet , non_neg_integer()} . -define(DEFAULT_DST_HOST, "localhost"). @@ -45,6 +47,7 @@ , dst_host :: inet:ip_address() | inet:hostname() , dst_port :: inet:port_number() , src_port :: inet:port_number() + , num_msgs_per_packet :: non_neg_integer() }). -type state() :: @@ -63,20 +66,21 @@ init(Options) -> DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), + NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), State = #state { sock = none , dst_host = DstHost , dst_port = DstPort , src_port = SrcPort + , num_msgs_per_packet = NumMsgsPerPacket }, {ConsumptionInterval, State}. -spec consume(beam_stats_consumer:queue(), state()) -> state(). -consume(Q, #state{}=State1) -> - Payload = beam_stats_queue_to_binary(Q), - State2 = try_to_connect_if_no_socket(State1), - try_to_send(State2, Payload). +consume(Q, #state{num_msgs_per_packet=NumMsgsPerPacket}=State) -> + Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket), + lists:foldl(fun try_to_connect_and_send/2, State, Packets). -spec terminate(state()) -> {}. @@ -88,6 +92,12 @@ terminate(#state{sock=SockOpt}) -> %% Transport %% ============================================================================ +-spec try_to_connect_and_send(binary(), state()) -> + state(). +try_to_connect_and_send(<>, #state{}=State1) -> + State2 = try_to_connect_if_no_socket(State1), + try_to_send(State2, Payload). + -spec try_to_send(state(), binary()) -> state(). try_to_send(#state{sock=none}=State, _) -> @@ -132,23 +142,116 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> %% Serialization %% ============================================================================ --spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> - binary(). -beam_stats_queue_to_binary(Q) -> - iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). +-spec beam_stats_queue_to_packets(beam_stats_consumer:queue(), non_neg_integer()) -> + [binary()]. +beam_stats_queue_to_packets(Q, NumMsgsPerPacket) -> + MsgBins = lists:append([beam_stats_to_bins(B) || B <- queue:to_list(Q)]), + MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), + lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). -spec beam_stats_to_bins(beam_stats:t()) -> [binary()]. beam_stats_to_bins(#beam_stats { node_id = NodeID , memory = Memory + , io_bytes_in = IOBytesIn + , io_bytes_out = IOBytesOut + , context_switches = ContextSwitches + , reductions = Reductions + , run_queue = RunQueue + , ets = ETS } ) -> NodeIDBin = node_id_to_bin(NodeID), - Msgs1 = memory_to_msgs(Memory), + Msgs1 = + [ io_bytes_in_to_msg(IOBytesIn) + , io_bytes_out_to_msg(IOBytesOut) + , context_switches_to_msg(ContextSwitches) + , reductions_to_msg(Reductions) + , run_queue_to_msg(RunQueue) + | memory_to_msgs(Memory) + ] + ++ ets_to_msgs(ETS), Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], [statsd_msg_to_bin(M) || M <- Msgs2]. +-spec run_queue_to_msg(non_neg_integer()) -> + statsd_msg(). +run_queue_to_msg(RunQueue) -> + #statsd_msg + { name = <<"run_queue">> + , value = RunQueue + , type = gauge + }. + +-spec reductions_to_msg(non_neg_integer()) -> + statsd_msg(). +reductions_to_msg(Reductions) -> + #statsd_msg + { name = <<"reductions">> + , value = Reductions + , type = gauge + }. + +-spec context_switches_to_msg(non_neg_integer()) -> + statsd_msg(). +context_switches_to_msg(ContextSwitches) -> + #statsd_msg + { name = <<"context_switches">> + , value = ContextSwitches + , type = gauge + }. + +-spec io_bytes_in_to_msg(non_neg_integer()) -> + statsd_msg(). +io_bytes_in_to_msg(IOBytesIn) -> + #statsd_msg + { name = <<"io.bytes_in">> + , value = IOBytesIn + , type = gauge + }. + +-spec io_bytes_out_to_msg(non_neg_integer()) -> + statsd_msg(). +io_bytes_out_to_msg(IOBytesOut) -> + #statsd_msg + { name = <<"io.bytes_out">> + , value = IOBytesOut + , type = gauge + }. + +-spec ets_to_msgs(beam_stats_ets:t()) -> + [statsd_msg()]. +ets_to_msgs(PerTableStats) -> + NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), + lists:append(NestedMsgs). + +-spec ets_table_to_msgs(beam_stats_ets_table:t()) -> + [statsd_msg()]. +ets_table_to_msgs(#beam_stats_ets_table + { id = ID + , name = Name + , size = Size + , memory = Memory + } +) -> + IDBin = beam_stats_ets_table:id_to_bin(ID), + NameBin = atom_to_binary(Name, latin1), + NameAndID = <>, + SizeMsg = + #statsd_msg + { name = <<"ets_table.size.", NameAndID/binary>> + , value = Size + , type = gauge + }, + MemoryMsg = + #statsd_msg + { name = <<"ets_table.memory.", NameAndID/binary>> + , value = Memory + , type = gauge + }, + [SizeMsg, MemoryMsg]. + -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> [statsd_msg()]. memory_to_msgs(Memory) -> @@ -157,8 +260,9 @@ memory_to_msgs(Memory) -> -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> statsd_msg(). memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> + MemTypeBin = atom_to_binary(MemType, latin1), #statsd_msg - { name = atom_to_binary(MemType, latin1) + { name = <<"memory.", MemTypeBin/binary>> , value = MemSize , type = gauge }.