X-Git-Url: https://git.xandkar.net/?p=beam_stats.git;a=blobdiff_plain;f=src%2Fbeam_stats_consumer_statsd.erl;h=edc5e5bcd8093927526b8ca3437c4e2b3ae09931;hp=1ce0afb0ec40e834ab1f1be6da7dbbc5ce03d92a;hb=a3fdbec97580ab23bdb68e61d4321d08b1c599da;hpb=8c0788b23299b3d09f255cdfb1932f10da3545a2 diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index 1ce0afb..edc5e5b 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -1,7 +1,6 @@ -module(beam_stats_consumer_statsd). -include("include/beam_stats.hrl"). --include("include/beam_stats_ets_table.hrl"). -include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). @@ -23,38 +22,25 @@ | {dst_port , inet:port_number()} | {src_port , inet:port_number()} | {num_msgs_per_packet , non_neg_integer()} + | {static_node_name , binary()} . -define(DEFAULT_DST_HOST, "localhost"). -define(DEFAULT_DST_PORT, 8125). -define(DEFAULT_SRC_PORT, 8124). --type metric_type() :: - % TODO: Add other metric types - gauge. - --record(statsd_msg, - { name :: binary() - , value :: non_neg_integer() - , type :: metric_type() - }). - --type statsd_msg() :: - #statsd_msg{}. - -record(state, { sock :: hope_option:t(gen_udp:socket()) , dst_host :: inet:ip_address() | inet:hostname() , dst_port :: inet:port_number() , src_port :: inet:port_number() , num_msgs_per_packet :: non_neg_integer() + , static_node_name :: hope_option:t(binary()) }). -type state() :: #state{}. --define(PATH_PREFIX, "beam_stats"). - %% ============================================================================ %% Consumer implementation %% ============================================================================ @@ -67,38 +53,45 @@ init(Options) -> DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), + StaticNodeNameOpt = hope_kv_list:get(Options, static_node_name), State = #state { sock = none , dst_host = DstHost , dst_port = DstPort , src_port = SrcPort , num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt }, {ConsumptionInterval, State}. -spec consume(beam_stats_consumer:queue(), state()) -> state(). -consume(Q, #state{num_msgs_per_packet=NumMsgsPerPacket}=State) -> - Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket), +consume( + Q, + #state + { num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt + }=State +) -> + Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt), lists:foldl(fun try_to_connect_and_send/2, State, Packets). -spec terminate(state()) -> {}. terminate(#state{sock=SockOpt}) -> - ok = hope_option:iter(SockOpt, fun gen_udp:close/1), - {}. + hope_option:iter(SockOpt, fun gen_udp:close/1). %% ============================================================================ %% Transport %% ============================================================================ --spec try_to_connect_and_send(binary(), state()) -> +-spec try_to_connect_and_send(iolist(), state()) -> state(). -try_to_connect_and_send(<>, #state{}=State1) -> +try_to_connect_and_send(Payload, #state{}=State1) -> State2 = try_to_connect_if_no_socket(State1), try_to_send(State2, Payload). --spec try_to_send(state(), binary()) -> +-spec try_to_send(state(), iolist()) -> state(). try_to_send(#state{sock=none}=State, _) -> ?log_error("Sending failed. No socket in state."), @@ -117,7 +110,7 @@ try_to_send( State ; {error, _}=Error -> ?log_error( - "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", + "gen_udp:send(~p, ~p, ~p, Payload) -> ~p", [Sock, DstHost, DstPort, Error] ), % TODO: Do something with unsent messages? @@ -142,164 +135,22 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> %% Serialization %% ============================================================================ --spec beam_stats_queue_to_packets(beam_stats_consumer:queue(), non_neg_integer()) -> - [binary()]. -beam_stats_queue_to_packets(Q, NumMsgsPerPacket) -> - MsgBins = lists:append([beam_stats_to_bins(B) || B <- queue:to_list(Q)]), - MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), - lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). - --spec beam_stats_to_bins(beam_stats:t()) -> - [binary()]. -beam_stats_to_bins(#beam_stats - { node_id = NodeID - , memory = Memory - , io_bytes_in = IOBytesIn - , io_bytes_out = IOBytesOut - , context_switches = ContextSwitches - , reductions = Reductions - , run_queue = RunQueue - , ets = ETS - } +-spec beam_stats_queue_to_packets( + beam_stats_consumer:queue(), + non_neg_integer(), + hope_option:t(binary()) ) -> - NodeIDBin = node_id_to_bin(NodeID), - Msgs1 = - [ io_bytes_in_to_msg(IOBytesIn) - , io_bytes_out_to_msg(IOBytesOut) - , context_switches_to_msg(ContextSwitches) - , reductions_to_msg(Reductions) - , run_queue_to_msg(RunQueue) - | memory_to_msgs(Memory) - ] - ++ ets_to_msgs(ETS), - Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], - [statsd_msg_to_bin(M) || M <- Msgs2]. - --spec run_queue_to_msg(non_neg_integer()) -> - statsd_msg(). -run_queue_to_msg(RunQueue) -> - #statsd_msg - { name = <<"run_queue">> - , value = RunQueue - , type = gauge - }. - --spec reductions_to_msg(non_neg_integer()) -> - statsd_msg(). -reductions_to_msg(Reductions) -> - #statsd_msg - { name = <<"reductions">> - , value = Reductions - , type = gauge - }. - --spec context_switches_to_msg(non_neg_integer()) -> - statsd_msg(). -context_switches_to_msg(ContextSwitches) -> - #statsd_msg - { name = <<"context_switches">> - , value = ContextSwitches - , type = gauge - }. - --spec io_bytes_in_to_msg(non_neg_integer()) -> - statsd_msg(). -io_bytes_in_to_msg(IOBytesIn) -> - #statsd_msg - { name = <<"io.bytes_in">> - , value = IOBytesIn - , type = gauge - }. - --spec io_bytes_out_to_msg(non_neg_integer()) -> - statsd_msg(). -io_bytes_out_to_msg(IOBytesOut) -> - #statsd_msg - { name = <<"io.bytes_out">> - , value = IOBytesOut - , type = gauge - }. - --spec ets_to_msgs(beam_stats_ets:t()) -> - [statsd_msg()]. -ets_to_msgs(PerTableStats) -> - NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), - lists:append(NestedMsgs). - --spec ets_table_to_msgs(beam_stats_ets_table:t()) -> - [statsd_msg()]. -ets_table_to_msgs(#beam_stats_ets_table - { id = ID - , name = Name - , size = Size - , memory = Memory - } -) -> - IDBin = beam_stats_ets_table:id_to_bin(ID), - NameBin = atom_to_binary(Name, latin1), - NameAndID = <>, - SizeMsg = - #statsd_msg - { name = <<"ets_table.size.", NameAndID/binary>> - , value = Size - , type = gauge - }, - MemoryMsg = - #statsd_msg - { name = <<"ets_table.memory.", NameAndID/binary>> - , value = Memory - , type = gauge - }, - [SizeMsg, MemoryMsg]. - --spec memory_to_msgs([{atom(), non_neg_integer()}]) -> - [statsd_msg()]. -memory_to_msgs(Memory) -> - [memory_component_to_statsd_msg(MC) || MC <- Memory]. - --spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> - statsd_msg(). -memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> - MemTypeBin = atom_to_binary(MemType, latin1), - #statsd_msg - { name = <<"memory.", MemTypeBin/binary>> - , value = MemSize - , type = gauge - }. - --spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> - statsd_msg(). -statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <>) -> - Prefix = <>, - Name2 = <>, - Msg#statsd_msg{name=Name2}. - --spec statsd_msg_to_bin(statsd_msg()) -> - binary(). -statsd_msg_to_bin( - #statsd_msg - { name = <> - , value = Value - , type = Type = gauge - } -) when Value >= 0 -> - TypeBin = metric_type_to_bin(Type), - ValueBin = integer_to_binary(Value), - << Name/binary - , ":" - , ValueBin/binary - , "|" - , TypeBin/binary - , "\n" - >>. - --spec metric_type_to_bin(metric_type()) -> - binary(). -metric_type_to_bin(gauge) -> - <<"g">>. - --spec node_id_to_bin(node()) -> - binary(). -node_id_to_bin(NodeID) -> - NodeIDBin = atom_to_binary(NodeID, utf8), - re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). + [iolist()]. +beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> + MsgIOLists = lists:append([beam_stats_to_iolists(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), + hope_list:divide(MsgIOLists, NumMsgsPerPacket). + +-spec beam_stats_to_iolists(beam_stats:t(), hope_option:t(binary())) -> + [iolist()]. +beam_stats_to_iolists(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) -> + NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID), + NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault), + MsgsGraphite = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin), + MsgsStatsD = + lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite), + lists:map(fun beam_stats_msg_statsd_gauge:to_iolist/1, MsgsStatsD).