Implement StatsD consumer.
authorSiraaj Khandkar <siraaj@khandkar.net>
Tue, 11 Aug 2015 21:13:49 +0000 (17:13 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Tue, 11 Aug 2015 21:13:49 +0000 (17:13 -0400)
src/beam_stats_consumer_statsd.erl [new file with mode: 0644]

diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl
new file mode 100644 (file)
index 0000000..cfd6cbf
--- /dev/null
@@ -0,0 +1,196 @@
+-module(beam_stats_consumer_statsd).
+
+-include("include/beam_stats.hrl").
+
+-behaviour(beam_stats_consumer).
+
+-export_type(
+    [ option/0
+    ]).
+
+-export(
+    [ init/1
+    , consume/2
+    , terminate/1
+    ]).
+
+-type option() ::
+      {consumption_interval , erlang:time()}
+    | {dst_host             , inet:ip_address() | inet:hostname()}
+    | {dst_port             , inet:port_number()}
+    | {src_port             , inet:port_number()}
+    .
+
+-define(DEFAULT_DST_HOST, "localhost").
+-define(DEFAULT_DST_PORT, 8125).
+-define(DEFAULT_SRC_PORT, 8124).
+
+-type metric_type() ::
+    % TODO: Add other metric types
+    gauge.
+
+-record(statsd_msg,
+    { name  :: binary()
+    , value :: non_neg_integer()
+    , type  :: metric_type()
+    }).
+
+-type statsd_msg() ::
+    #statsd_msg{}.
+
+-record(state,
+    { sock     :: hope_option:t(gen_udp:socket())
+    , dst_host :: inet:ip_address() | inet:hostname()
+    , dst_port :: inet:port_number()
+    , src_port :: inet:port_number()
+    }).
+
+-type state() ::
+    #state{}.
+
+-define(PATH_PREFIX, "beam_stats").
+
+%% ============================================================================
+%% Consumer implementation
+%% ============================================================================
+
+-spec init([option()]) ->
+    {erlang:time(), state()}.
+init(Options) ->
+    ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
+    DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST),
+    DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT),
+    SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT),
+    State = #state
+        { sock     = none
+        , dst_host = DstHost
+        , dst_port = DstPort
+        , src_port = SrcPort
+        },
+    {ConsumptionInterval, State}.
+
+-spec consume(beam_stats_consumer:queue(), state()) ->
+    state().
+consume(Q, #state{}=State1) ->
+    Payload = beam_stats_queue_to_binary(Q),
+    State2 = try_to_connect_if_no_socket(State1),
+    try_to_send(State2, Payload).
+
+-spec terminate(state()) ->
+    {}.
+terminate(#state{sock=SockOpt}) ->
+    ok = hope_option:iter(SockOpt, fun gen_udp:close/1),
+    {}.
+
+%% ============================================================================
+%% Transport
+%% ============================================================================
+
+-spec try_to_send(state(), binary()) ->
+    state().
+try_to_send(#state{sock=none}=State, _) ->
+    io:format("error: socket closed~n"),
+    % TODO: Maybe schedule retry?
+    State;
+try_to_send(
+    #state
+    { sock     = {some, Sock}
+    , dst_host = DstHost
+    , dst_port = DstPort
+    }=State,
+    Payload
+) ->
+    case gen_udp:send(Sock, DstHost, DstPort, Payload)
+    of  ok ->
+            State
+    ;   {error, _}=Error ->
+            io:format("error: gen_udp:send/4 failed: ~p~n", [Error]),
+            % TODO: Do something with unsent messages?
+            ok = gen_udp:close(Sock),
+            State#state{sock=none}
+    end.
+
+-spec try_to_connect_if_no_socket(state()) ->
+    state().
+try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
+    State;
+try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) ->
+    case gen_udp:open(SrcPort)
+    of  {ok, Sock} ->
+            State#state{sock = {some, Sock}}
+    ;   {error, _}=Error ->
+            io:format("error: gen_udp:open/1 failed: ~p~n", [Error]),
+            State#state{sock = none}
+    end.
+
+%% ============================================================================
+%% Serialization
+%% ============================================================================
+
+-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
+    binary().
+beam_stats_queue_to_binary(Q) ->
+    iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]).
+
+-spec beam_stats_to_bins(beam_stats:t()) ->
+    [binary()].
+beam_stats_to_bins(#beam_stats
+    { node_id = NodeID
+    , memory  = Memory
+    }
+) ->
+    NodeIDBin = node_id_to_bin(NodeID),
+    Msgs1 = memory_to_msgs(Memory),
+    Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1],
+    [statsd_msg_to_bin(M) || M <- Msgs2].
+
+-spec memory_to_msgs(erlang:memory()) ->
+    [statsd_msg()].
+memory_to_msgs(Memory) ->
+    [memory_component_to_statsd_msg(MC) || MC <- Memory].
+
+-spec memory_component_to_statsd_msg({erlang:memory_type(), non_neg_integer()}) ->
+    statsd_msg().
+memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 ->
+    #statsd_msg
+    { name  = atom_to_binary(MemType, latin1)
+    , value = MemSize
+    , type  = gauge
+    }.
+
+-spec statsd_msg_add_name_prefix(statsd_msg(), binary()) ->
+    statsd_msg().
+statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) ->
+    Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>,
+    Name2 = <<Prefix/binary, Name1/binary>>,
+    Msg#statsd_msg{name=Name2}.
+
+-spec statsd_msg_to_bin(statsd_msg()) ->
+    binary().
+statsd_msg_to_bin(
+    #statsd_msg
+    { name  = <<Name/binary>>
+    , value = Value
+    , type  = Type = gauge
+    }
+) when Value >= 0 ->
+    TypeBin = metric_type_to_bin(Type),
+    ValueBin = integer_to_binary(Value),
+    << Name/binary
+     , ":"
+     , ValueBin/binary
+     , "|"
+     , TypeBin/binary
+     , "\n"
+    >>.
+
+-spec metric_type_to_bin(metric_type()) ->
+    binary().
+metric_type_to_bin(gauge) ->
+    <<"g">>.
+
+-spec node_id_to_bin(node()) ->
+    binary().
+node_id_to_bin(NodeID) ->
+    NodeIDBin = atom_to_binary(NodeID, utf8),
+    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
This page took 0.031816 seconds and 4 git commands to generate.