X-Git-Url: https://git.xandkar.net/?p=beam_stats.git;a=blobdiff_plain;f=src%2Fbeam_stats_consumer_statsd.erl;h=edc5e5bcd8093927526b8ca3437c4e2b3ae09931;hp=112bb91ec7004282abb125372807cae99c8d3d02;hb=a3fdbec97580ab23bdb68e61d4321d08b1c599da;hpb=a8d431d1d7c542f66aeb1c272a5a2edf3cd9130d diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index 112bb91..edc5e5b 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -1,7 +1,6 @@ -module(beam_stats_consumer_statsd). -include("include/beam_stats.hrl"). --include("include/beam_stats_ets_table.hrl"). -include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). @@ -22,37 +21,26 @@ | {dst_host , inet:ip_address() | inet:hostname()} | {dst_port , inet:port_number()} | {src_port , inet:port_number()} + | {num_msgs_per_packet , non_neg_integer()} + | {static_node_name , binary()} . -define(DEFAULT_DST_HOST, "localhost"). -define(DEFAULT_DST_PORT, 8125). -define(DEFAULT_SRC_PORT, 8124). --type metric_type() :: - % TODO: Add other metric types - gauge. - --record(statsd_msg, - { name :: binary() - , value :: non_neg_integer() - , type :: metric_type() - }). - --type statsd_msg() :: - #statsd_msg{}. - -record(state, { sock :: hope_option:t(gen_udp:socket()) , dst_host :: inet:ip_address() | inet:hostname() , dst_port :: inet:port_number() , src_port :: inet:port_number() + , num_msgs_per_packet :: non_neg_integer() + , static_node_name :: hope_option:t(binary()) }). -type state() :: #state{}. --define(PATH_PREFIX, "beam_stats"). - %% ============================================================================ %% Consumer implementation %% ============================================================================ @@ -64,32 +52,46 @@ init(Options) -> DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), + NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), + StaticNodeNameOpt = hope_kv_list:get(Options, static_node_name), State = #state { sock = none , dst_host = DstHost , dst_port = DstPort , src_port = SrcPort + , num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt }, {ConsumptionInterval, State}. -spec consume(beam_stats_consumer:queue(), state()) -> state(). -consume(Q, #state{}=State1) -> - Payload = beam_stats_queue_to_binary(Q), - State2 = try_to_connect_if_no_socket(State1), - try_to_send(State2, Payload). +consume( + Q, + #state + { num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt + }=State +) -> + Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt), + lists:foldl(fun try_to_connect_and_send/2, State, Packets). -spec terminate(state()) -> {}. terminate(#state{sock=SockOpt}) -> - ok = hope_option:iter(SockOpt, fun gen_udp:close/1), - {}. + hope_option:iter(SockOpt, fun gen_udp:close/1). %% ============================================================================ %% Transport %% ============================================================================ --spec try_to_send(state(), binary()) -> +-spec try_to_connect_and_send(iolist(), state()) -> + state(). +try_to_connect_and_send(Payload, #state{}=State1) -> + State2 = try_to_connect_if_no_socket(State1), + try_to_send(State2, Payload). + +-spec try_to_send(state(), iolist()) -> state(). try_to_send(#state{sock=none}=State, _) -> ?log_error("Sending failed. No socket in state."), @@ -108,7 +110,7 @@ try_to_send( State ; {error, _}=Error -> ?log_error( - "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", + "gen_udp:send(~p, ~p, ~p, Payload) -> ~p", [Sock, DstHost, DstPort, Error] ), % TODO: Do something with unsent messages? @@ -133,162 +135,22 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> %% Serialization %% ============================================================================ --spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> - binary(). -beam_stats_queue_to_binary(Q) -> - iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). - --spec beam_stats_to_bins(beam_stats:t()) -> - [binary()]. -beam_stats_to_bins(#beam_stats - { node_id = NodeID - , memory = Memory - , io_bytes_in = IOBytesIn - , io_bytes_out = IOBytesOut - , context_switches = ContextSwitches - , reductions = Reductions - , run_queue = RunQueue - , ets = ETS - } +-spec beam_stats_queue_to_packets( + beam_stats_consumer:queue(), + non_neg_integer(), + hope_option:t(binary()) ) -> - NodeIDBin = node_id_to_bin(NodeID), - Msgs1 = - [ io_bytes_in_to_msg(IOBytesIn) - , io_bytes_out_to_msg(IOBytesOut) - , context_switches_to_msg(ContextSwitches) - , reductions_to_msg(Reductions) - , run_queue_to_msg(RunQueue) - | memory_to_msgs(Memory) - ] - ++ ets_to_msgs(ETS), - Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], - [statsd_msg_to_bin(M) || M <- Msgs2]. - --spec run_queue_to_msg(non_neg_integer()) -> - statsd_msg(). -run_queue_to_msg(RunQueue) -> - #statsd_msg - { name = <<"run_queue">> - , value = RunQueue - , type = gauge - }. - --spec reductions_to_msg(non_neg_integer()) -> - statsd_msg(). -reductions_to_msg(Reductions) -> - #statsd_msg - { name = <<"reductions">> - , value = Reductions - , type = gauge - }. - --spec context_switches_to_msg(non_neg_integer()) -> - statsd_msg(). -context_switches_to_msg(ContextSwitches) -> - #statsd_msg - { name = <<"context_switches">> - , value = ContextSwitches - , type = gauge - }. - --spec io_bytes_in_to_msg(non_neg_integer()) -> - statsd_msg(). -io_bytes_in_to_msg(IOBytesIn) -> - #statsd_msg - { name = <<"io.bytes_in">> - , value = IOBytesIn - , type = gauge - }. - --spec io_bytes_out_to_msg(non_neg_integer()) -> - statsd_msg(). -io_bytes_out_to_msg(IOBytesOut) -> - #statsd_msg - { name = <<"io.bytes_out">> - , value = IOBytesOut - , type = gauge - }. - --spec ets_to_msgs(beam_stats_ets:t()) -> - [statsd_msg()]. -ets_to_msgs(PerTableStats) -> - NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), - lists:append(NestedMsgs). - --spec ets_table_to_msgs(beam_stats_ets_table:t()) -> - [statsd_msg()]. -ets_table_to_msgs(#beam_stats_ets_table - { id = ID - , name = Name - , size = Size - , memory = Memory - } -) -> - IDBin = beam_stats_ets_table:id_to_bin(ID), - NameBin = atom_to_binary(Name, latin1), - NameAndID = <>, - SizeMsg = - #statsd_msg - { name = <<"ets_table.size.", NameAndID/binary>> - , value = Size - , type = gauge - }, - MemoryMsg = - #statsd_msg - { name = <<"ets_table.memory.", NameAndID/binary>> - , value = Memory - , type = gauge - }, - [SizeMsg, MemoryMsg]. - --spec memory_to_msgs([{atom(), non_neg_integer()}]) -> - [statsd_msg()]. -memory_to_msgs(Memory) -> - [memory_component_to_statsd_msg(MC) || MC <- Memory]. - --spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> - statsd_msg(). -memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> - MemTypeBin = atom_to_binary(MemType, latin1), - #statsd_msg - { name = <<"memory.", MemTypeBin/binary>> - , value = MemSize - , type = gauge - }. - --spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> - statsd_msg(). -statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <>) -> - Prefix = <>, - Name2 = <>, - Msg#statsd_msg{name=Name2}. - --spec statsd_msg_to_bin(statsd_msg()) -> - binary(). -statsd_msg_to_bin( - #statsd_msg - { name = <> - , value = Value - , type = Type = gauge - } -) when Value >= 0 -> - TypeBin = metric_type_to_bin(Type), - ValueBin = integer_to_binary(Value), - << Name/binary - , ":" - , ValueBin/binary - , "|" - , TypeBin/binary - , "\n" - >>. - --spec metric_type_to_bin(metric_type()) -> - binary(). -metric_type_to_bin(gauge) -> - <<"g">>. - --spec node_id_to_bin(node()) -> - binary(). -node_id_to_bin(NodeID) -> - NodeIDBin = atom_to_binary(NodeID, utf8), - re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). + [iolist()]. +beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> + MsgIOLists = lists:append([beam_stats_to_iolists(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), + hope_list:divide(MsgIOLists, NumMsgsPerPacket). + +-spec beam_stats_to_iolists(beam_stats:t(), hope_option:t(binary())) -> + [iolist()]. +beam_stats_to_iolists(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) -> + NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID), + NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault), + MsgsGraphite = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin), + MsgsStatsD = + lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite), + lists:map(fun beam_stats_msg_statsd_gauge:to_iolist/1, MsgsStatsD).