Upgrade hope to 4.0.0, which uses empty tuple as unit
[beam_stats.git] / src / beam_stats_consumer_graphite.erl
index be4f1f9..251f92e 100644 (file)
@@ -1,6 +1,7 @@
 -module(beam_stats_consumer_graphite).
 
 -include("include/beam_stats.hrl").
+-include("beam_stats_logging.hrl").
 
 -behaviour(beam_stats_consumer).
 
 -type state() ::
     #state{}.
 
--define(GRAPHITE_PATH_PREFIX, "beam_stats").
--define(DEFAULT_HOST        , "localhost").
--define(DEFAULT_PORT        , 2003).
--define(DEFAULT_TIMEOUT     , 5000).
+-define(DEFAULT_HOST    , "localhost").
+-define(DEFAULT_PORT    , 2003).
+-define(DEFAULT_TIMEOUT , 5000).
 
 -spec init([option()]) ->
     {non_neg_integer(), state()}.
@@ -52,22 +52,21 @@ init(Options) ->
 -spec consume(beam_stats_consumer:queue(), state()) ->
     state().
 consume(Q, #state{}=State1) ->
-    Payload = beam_stats_queue_to_binary(Q),
+    Payload = beam_stats_queue_to_iolists(Q),
     State2 = try_to_connect_if_no_socket(State1),
     try_to_send(State2, Payload).
 
 -spec terminate(state()) ->
     {}.
 terminate(#state{sock=SockOpt}) ->
-    ok = hope_option:iter(SockOpt, fun gen_tcp:close/1),
-    {}.
+    hope_option:iter(SockOpt, fun gen_tcp:close/1).
 
 %% ============================================================================
 
--spec try_to_send(state(), binary()) ->
+-spec try_to_send(state(), iolist()) ->
     state().
 try_to_send(#state{sock=none}=State, _) ->
-    io:format("error: socket closed~n"),
+    ?log_error("Sending failed. No socket in state."),
     % TODO: Maybe schedule retry?
     State;
 try_to_send(#state{sock={some, Sock}}=State, Payload) ->
@@ -75,7 +74,7 @@ try_to_send(#state{sock={some, Sock}}=State, Payload) ->
     of  ok ->
             State
     ;   {error, _}=Error ->
-            io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]),
+            ?log_error("gen_tcp:send(~p, Payload) -> ~p", [Sock, Error]),
             % TODO: Maybe schedule retry?
             ok = gen_tcp:close(Sock),
             State#state{sock=none}
@@ -93,68 +92,25 @@ try_to_connect_if_no_socket(
     , timeout = Timeout
     }=State
 ) ->
-    case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout)
+    Options = [binary, {active, false}],
+    case gen_tcp:connect(Host, Port, Options, Timeout)
     of  {ok, Sock} ->
             State#state{sock = {some, Sock}}
     ;   {error, _}=Error ->
-            io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]),
+            ?log_error(
+                "gen_tcp:connect(~p, ~p, ~p, ~p) -> ~p",
+                [Host, Port, Options, Timeout, Error]
+            ),
             State#state{sock = none}
     end.
 
--spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
-    binary().
-beam_stats_queue_to_binary(Q) ->
-    Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)],
-    iolist_to_binary(Bins).
-
--spec beam_stats_to_bin(beam_stats:t()) ->
-    binary().
-beam_stats_to_bin(#beam_stats
-    { timestamp = Timestamp
-    , node_id   = NodeID
-    , memory    = Memory
-    }
-) ->
-    TimestampInt = timestamp_to_integer(Timestamp),
-    TimestampBin = integer_to_binary(TimestampInt),
-    <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
-    PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
-    MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
-    MemoryBins     = lists:map(PairToBin, MemoryBinPairs),
-    AllBins =
-        [ MemoryBins
-        ],
-    iolist_to_binary(AllBins).
-
--spec timestamp_to_integer(erlang:timestamp()) ->
-    non_neg_integer().
-timestamp_to_integer({Megaseconds, Seconds, _}) ->
-    Megaseconds * 1000000 + Seconds.
-
--spec make_pair_to_bin(binary(), binary()) ->
-    fun(({binary(), binary()}) -> binary()).
-make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
-    fun ({<<K/binary>>, <<V/binary>>}) ->
-        << ?GRAPHITE_PATH_PREFIX
-         , "."
-         , NodeID/binary
-         , "."
-         , K/binary
-         , " "
-         , V/binary
-         , " "
-         , TimestampBin/binary
-         , "\n"
-        >>
-    end.
-
--spec node_id_to_bin(node()) ->
-    binary().
-node_id_to_bin(NodeID) ->
-    NodeIDBin = atom_to_binary(NodeID, utf8),
-    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
+-spec beam_stats_queue_to_iolists(beam_stats_consumer:queue()) ->
+    [iolist()].
+beam_stats_queue_to_iolists(Q) ->
+    [beam_stats_to_iolist(B) || B <- queue:to_list(Q)].
 
--spec atom_int_to_bin_bin({atom(), integer()}) ->
-    {binary(), binary()}.
-atom_int_to_bin_bin({K, V}) ->
-    {atom_to_binary(K, latin1), integer_to_binary(V)}.
+-spec beam_stats_to_iolist(beam_stats:t()) ->
+    [iolist()].
+beam_stats_to_iolist(#beam_stats{}=BeamStats) ->
+    Msgs = beam_stats_msg_graphite:of_beam_stats(BeamStats),
+    lists:map(fun beam_stats_msg_graphite:to_iolist/1, Msgs).
This page took 0.031029 seconds and 4 git commands to generate.