Upgrade hope to 4.0.0, which uses empty tuple as unit
[beam_stats.git] / src / beam_stats_consumer_statsd.erl
index 1ce0afb..edc5e5b 100644 (file)
@@ -1,7 +1,6 @@
 -module(beam_stats_consumer_statsd).
 
 -include("include/beam_stats.hrl").
--include("include/beam_stats_ets_table.hrl").
 -include("beam_stats_logging.hrl").
 
 -behaviour(beam_stats_consumer).
     | {dst_port             , inet:port_number()}
     | {src_port             , inet:port_number()}
     | {num_msgs_per_packet  , non_neg_integer()}
+    | {static_node_name     , binary()}
     .
 
 -define(DEFAULT_DST_HOST, "localhost").
 -define(DEFAULT_DST_PORT, 8125).
 -define(DEFAULT_SRC_PORT, 8124).
 
--type metric_type() ::
-    % TODO: Add other metric types
-    gauge.
-
--record(statsd_msg,
-    { name  :: binary()
-    , value :: non_neg_integer()
-    , type  :: metric_type()
-    }).
-
--type statsd_msg() ::
-    #statsd_msg{}.
-
 -record(state,
     { sock     :: hope_option:t(gen_udp:socket())
     , dst_host :: inet:ip_address() | inet:hostname()
     , dst_port :: inet:port_number()
     , src_port :: inet:port_number()
     , num_msgs_per_packet :: non_neg_integer()
+    , static_node_name    :: hope_option:t(binary())
     }).
 
 -type state() ::
     #state{}.
 
--define(PATH_PREFIX, "beam_stats").
-
 %% ============================================================================
 %% Consumer implementation
 %% ============================================================================
@@ -67,38 +53,45 @@ init(Options) ->
     DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT),
     SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT),
     NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10),
+    StaticNodeNameOpt = hope_kv_list:get(Options, static_node_name),
     State = #state
         { sock     = none
         , dst_host = DstHost
         , dst_port = DstPort
         , src_port = SrcPort
         , num_msgs_per_packet = NumMsgsPerPacket
+        , static_node_name    = StaticNodeNameOpt
         },
     {ConsumptionInterval, State}.
 
 -spec consume(beam_stats_consumer:queue(), state()) ->
     state().
-consume(Q, #state{num_msgs_per_packet=NumMsgsPerPacket}=State) ->
-    Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket),
+consume(
+    Q,
+    #state
+    { num_msgs_per_packet = NumMsgsPerPacket
+    , static_node_name    = StaticNodeNameOpt
+    }=State
+) ->
+    Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt),
     lists:foldl(fun try_to_connect_and_send/2, State, Packets).
 
 -spec terminate(state()) ->
     {}.
 terminate(#state{sock=SockOpt}) ->
-    ok = hope_option:iter(SockOpt, fun gen_udp:close/1),
-    {}.
+    hope_option:iter(SockOpt, fun gen_udp:close/1).
 
 %% ============================================================================
 %% Transport
 %% ============================================================================
 
--spec try_to_connect_and_send(binary(), state()) ->
+-spec try_to_connect_and_send(iolist(), state()) ->
     state().
-try_to_connect_and_send(<<Payload/binary>>, #state{}=State1) ->
+try_to_connect_and_send(Payload, #state{}=State1) ->
     State2 = try_to_connect_if_no_socket(State1),
     try_to_send(State2, Payload).
 
--spec try_to_send(state(), binary()) ->
+-spec try_to_send(state(), iolist()) ->
     state().
 try_to_send(#state{sock=none}=State, _) ->
     ?log_error("Sending failed. No socket in state."),
@@ -117,7 +110,7 @@ try_to_send(
             State
     ;   {error, _}=Error ->
             ?log_error(
-                "gen_udp:send(~p, ~p, ~p, ~p) -> ~p",
+                "gen_udp:send(~p, ~p, ~p, Payload) -> ~p",
                 [Sock, DstHost, DstPort, Error]
             ),
             % TODO: Do something with unsent messages?
@@ -142,164 +135,22 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) ->
 %% Serialization
 %% ============================================================================
 
--spec beam_stats_queue_to_packets(beam_stats_consumer:queue(), non_neg_integer()) ->
-    [binary()].
-beam_stats_queue_to_packets(Q, NumMsgsPerPacket) ->
-    MsgBins = lists:append([beam_stats_to_bins(B) || B <- queue:to_list(Q)]),
-    MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket),
-    lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks).
-
--spec beam_stats_to_bins(beam_stats:t()) ->
-    [binary()].
-beam_stats_to_bins(#beam_stats
-    { node_id = NodeID
-    , memory  = Memory
-    , io_bytes_in  = IOBytesIn
-    , io_bytes_out = IOBytesOut
-    , context_switches = ContextSwitches
-    , reductions       = Reductions
-    , run_queue        = RunQueue
-    , ets              = ETS
-    }
+-spec beam_stats_queue_to_packets(
+    beam_stats_consumer:queue(),
+    non_neg_integer(),
+    hope_option:t(binary())
 ) ->
-    NodeIDBin = node_id_to_bin(NodeID),
-    Msgs1 =
-        [ io_bytes_in_to_msg(IOBytesIn)
-        , io_bytes_out_to_msg(IOBytesOut)
-        , context_switches_to_msg(ContextSwitches)
-        , reductions_to_msg(Reductions)
-        , run_queue_to_msg(RunQueue)
-        | memory_to_msgs(Memory)
-        ]
-        ++ ets_to_msgs(ETS),
-    Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1],
-    [statsd_msg_to_bin(M) || M <- Msgs2].
-
--spec run_queue_to_msg(non_neg_integer()) ->
-    statsd_msg().
-run_queue_to_msg(RunQueue) ->
-    #statsd_msg
-    { name  = <<"run_queue">>
-    , value = RunQueue
-    , type  = gauge
-    }.
-
--spec reductions_to_msg(non_neg_integer()) ->
-    statsd_msg().
-reductions_to_msg(Reductions) ->
-    #statsd_msg
-    { name  = <<"reductions">>
-    , value = Reductions
-    , type  = gauge
-    }.
-
--spec context_switches_to_msg(non_neg_integer()) ->
-    statsd_msg().
-context_switches_to_msg(ContextSwitches) ->
-    #statsd_msg
-    { name  = <<"context_switches">>
-    , value = ContextSwitches
-    , type  = gauge
-    }.
-
--spec io_bytes_in_to_msg(non_neg_integer()) ->
-    statsd_msg().
-io_bytes_in_to_msg(IOBytesIn) ->
-    #statsd_msg
-    { name  = <<"io.bytes_in">>
-    , value = IOBytesIn
-    , type  = gauge
-    }.
-
--spec io_bytes_out_to_msg(non_neg_integer()) ->
-    statsd_msg().
-io_bytes_out_to_msg(IOBytesOut) ->
-    #statsd_msg
-    { name  = <<"io.bytes_out">>
-    , value = IOBytesOut
-    , type  = gauge
-    }.
-
--spec ets_to_msgs(beam_stats_ets:t()) ->
-    [statsd_msg()].
-ets_to_msgs(PerTableStats) ->
-    NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats),
-    lists:append(NestedMsgs).
-
--spec ets_table_to_msgs(beam_stats_ets_table:t()) ->
-    [statsd_msg()].
-ets_table_to_msgs(#beam_stats_ets_table
-    { id     = ID
-    , name   = Name
-    , size   = Size
-    , memory = Memory
-    }
-) ->
-    IDBin   = beam_stats_ets_table:id_to_bin(ID),
-    NameBin = atom_to_binary(Name, latin1),
-    NameAndID = <<NameBin/binary, ".", IDBin/binary>>,
-    SizeMsg =
-        #statsd_msg
-        { name  = <<"ets_table.size.", NameAndID/binary>>
-        , value = Size
-        , type  = gauge
-        },
-    MemoryMsg =
-        #statsd_msg
-        { name  = <<"ets_table.memory.", NameAndID/binary>>
-        , value = Memory
-        , type  = gauge
-        },
-    [SizeMsg, MemoryMsg].
-
--spec memory_to_msgs([{atom(), non_neg_integer()}]) ->
-    [statsd_msg()].
-memory_to_msgs(Memory) ->
-    [memory_component_to_statsd_msg(MC) || MC <- Memory].
-
--spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) ->
-    statsd_msg().
-memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 ->
-    MemTypeBin = atom_to_binary(MemType, latin1),
-    #statsd_msg
-    { name  = <<"memory.", MemTypeBin/binary>>
-    , value = MemSize
-    , type  = gauge
-    }.
-
--spec statsd_msg_add_name_prefix(statsd_msg(), binary()) ->
-    statsd_msg().
-statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) ->
-    Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>,
-    Name2 = <<Prefix/binary, Name1/binary>>,
-    Msg#statsd_msg{name=Name2}.
-
--spec statsd_msg_to_bin(statsd_msg()) ->
-    binary().
-statsd_msg_to_bin(
-    #statsd_msg
-    { name  = <<Name/binary>>
-    , value = Value
-    , type  = Type = gauge
-    }
-) when Value >= 0 ->
-    TypeBin = metric_type_to_bin(Type),
-    ValueBin = integer_to_binary(Value),
-    << Name/binary
-     , ":"
-     , ValueBin/binary
-     , "|"
-     , TypeBin/binary
-     , "\n"
-    >>.
-
--spec metric_type_to_bin(metric_type()) ->
-    binary().
-metric_type_to_bin(gauge) ->
-    <<"g">>.
-
--spec node_id_to_bin(node()) ->
-    binary().
-node_id_to_bin(NodeID) ->
-    NodeIDBin = atom_to_binary(NodeID, utf8),
-    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
+    [iolist()].
+beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) ->
+    MsgIOLists = lists:append([beam_stats_to_iolists(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]),
+    hope_list:divide(MsgIOLists, NumMsgsPerPacket).
+
+-spec beam_stats_to_iolists(beam_stats:t(), hope_option:t(binary())) ->
+    [iolist()].
+beam_stats_to_iolists(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) ->
+    NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID),
+    NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault),
+    MsgsGraphite = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin),
+    MsgsStatsD =
+        lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite),
+    lists:map(fun beam_stats_msg_statsd_gauge:to_iolist/1, MsgsStatsD).
This page took 0.030574 seconds and 4 git commands to generate.