Merge pull request #12 from ibnfirnas/factor-and-abstract-graphite-msg 0.10.0
authorSiraaj Khandkar <siraaj@khandkar.net>
Thu, 24 Sep 2015 01:57:15 +0000 (21:57 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Thu, 24 Sep 2015 01:57:15 +0000 (21:57 -0400)
Factor and abstract graphite msg

include/beam_stats_msg_graphite.hrl [new file with mode: 0644]
include/beam_stats_msg_statsd_gauge.hrl [new file with mode: 0644]
src/beam_stats.app.src
src/beam_stats_consumer_graphite.erl
src/beam_stats_consumer_statsd.erl
src/beam_stats_msg_graphite.erl [new file with mode: 0644]
src/beam_stats_msg_statsd_gauge.erl [new file with mode: 0644]

diff --git a/include/beam_stats_msg_graphite.hrl b/include/beam_stats_msg_graphite.hrl
new file mode 100644 (file)
index 0000000..56df5cc
--- /dev/null
@@ -0,0 +1,5 @@
+-record(beam_stats_msg_graphite,
+    { path      :: [binary()]
+    , value     :: integer()
+    , timestamp :: erlang:timestamp()
+    }).
diff --git a/include/beam_stats_msg_statsd_gauge.hrl b/include/beam_stats_msg_statsd_gauge.hrl
new file mode 100644 (file)
index 0000000..445c8a7
--- /dev/null
@@ -0,0 +1,4 @@
+-record(beam_stats_msg_statsd_gauge,
+    { name  :: binary()
+    , value :: non_neg_integer()
+    }).
index 3dca90d..e9dcfff 100644 (file)
@@ -1,7 +1,7 @@
 {application, beam_stats,
  [
   {description, "Periodic VM stats production and consumption."},
-  {vsn, "0.9.0"},
+  {vsn, "0.10.0"},
   {registered, []},
   {applications,
     [ kernel
index 6e0d5dd..036f6c9 100644 (file)
@@ -32,7 +32,7 @@
 -type state() ::
     #state{}.
 
--define(GRAPHITE_PATH_PREFIX, "beam_stats").
+-define(PATH_PREFIX         , <<"beam_stats">>).
 -define(DEFAULT_HOST        , "localhost").
 -define(DEFAULT_PORT        , 2003).
 -define(DEFAULT_TIMEOUT     , 5000).
@@ -109,59 +109,14 @@ try_to_connect_if_no_socket(
 -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
     binary().
 beam_stats_queue_to_binary(Q) ->
-    Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)],
+    Bins = [beam_stats_to_bins(B) || B <- queue:to_list(Q)],
     iolist_to_binary(Bins).
 
--spec beam_stats_to_bin(beam_stats:t()) ->
-    binary().
-beam_stats_to_bin(#beam_stats
-    { timestamp = Timestamp
-    , node_id   = NodeID
-    , memory    = Memory
-    }
-) ->
-    TimestampInt = timestamp_to_integer(Timestamp),
-    TimestampBin = integer_to_binary(TimestampInt),
-    <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
-    MemoryPairToBin = make_pair_to_bin(NodeIDBin, TimestampBin, <<"memory">>),
-    MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
-    MemoryBins     = lists:map(MemoryPairToBin, MemoryBinPairs),
-    AllBins =
-        [ MemoryBins
-        ],
-    iolist_to_binary(AllBins).
-
--spec timestamp_to_integer(erlang:timestamp()) ->
-    non_neg_integer().
-timestamp_to_integer({Megaseconds, Seconds, _}) ->
-    Megaseconds * 1000000 + Seconds.
-
--spec make_pair_to_bin(binary(), binary(), binary()) ->
-    fun(({binary(), binary()}) -> binary()).
-make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>, <<Type/binary>>) ->
-    fun ({<<K/binary>>, <<V/binary>>}) ->
-        << ?GRAPHITE_PATH_PREFIX
-         , "."
-         , NodeID/binary
-         , "."
-         , Type/binary
-         , "."
-         , K/binary
-         , " "
-         , V/binary
-         , " "
-         , TimestampBin/binary
-         , "\n"
-        >>
-    end.
-
--spec node_id_to_bin(node()) ->
-    binary().
-node_id_to_bin(NodeID) ->
-    NodeIDBin = atom_to_binary(NodeID, utf8),
-    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
-
--spec atom_int_to_bin_bin({atom(), integer()}) ->
-    {binary(), binary()}.
-atom_int_to_bin_bin({K, V}) ->
-    {atom_to_binary(K, latin1), integer_to_binary(V)}.
+-spec beam_stats_to_bins(beam_stats:t()) ->
+    [binary()].
+beam_stats_to_bins(#beam_stats{}=BeamStats) ->
+    MsgAddPrefix =
+        fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end,
+    Msgs1 = beam_stats_msg_graphite:of_beam_stats(BeamStats),
+    Msgs2 = lists:map(MsgAddPrefix, Msgs1),
+    lists:map(fun beam_stats_msg_graphite:to_bin/1, Msgs2).
index cfd1e6d..06945e1 100644 (file)
@@ -1,10 +1,6 @@
 -module(beam_stats_consumer_statsd).
 
 -include("include/beam_stats.hrl").
--include("include/beam_stats_ets_table.hrl").
--include("include/beam_stats_process.hrl").
--include("include/beam_stats_process_ancestry.hrl").
--include("include/beam_stats_processes.hrl").
 -include("beam_stats_logging.hrl").
 
 -behaviour(beam_stats_consumer).
 -define(DEFAULT_DST_PORT, 8125).
 -define(DEFAULT_SRC_PORT, 8124).
 
--type metric_type() ::
-    % TODO: Add other metric types
-    gauge.
-
--record(statsd_msg,
-    { name  :: binary()
-    , value :: non_neg_integer()
-    , type  :: metric_type()
-    }).
-
--type statsd_msg() ::
-    #statsd_msg{}.
-
 -record(state,
     { sock     :: hope_option:t(gen_udp:socket())
     , dst_host :: inet:ip_address() | inet:hostname()
@@ -58,7 +41,7 @@
 -type state() ::
     #state{}.
 
--define(PATH_PREFIX, "beam_stats").
+-define(PATH_PREFIX, <<"beam_stats">>).
 
 %% ============================================================================
 %% Consumer implementation
@@ -168,267 +151,13 @@ beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) ->
 
 -spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) ->
     [binary()].
-beam_stats_to_bins(#beam_stats
-    { node_id = NodeID
-    , memory  = Memory
-    , io_bytes_in  = IOBytesIn
-    , io_bytes_out = IOBytesOut
-    , context_switches = ContextSwitches
-    , reductions       = Reductions
-    , run_queue        = RunQueue
-    , ets              = ETS
-    , processes        = Processes
-    },
-    StaticNodeNameOpt
-) ->
-    NodeIDBin = hope_option:get(StaticNodeNameOpt, node_id_to_bin(NodeID)),
-    Msgs1 =
-        [ io_bytes_in_to_msg(IOBytesIn)
-        , io_bytes_out_to_msg(IOBytesOut)
-        , context_switches_to_msg(ContextSwitches)
-        , reductions_to_msg(Reductions)
-        , run_queue_to_msg(RunQueue)
-        | memory_to_msgs(Memory)
-        ]
-        ++ ets_to_msgs(ETS)
-        ++ procs_to_msgs(Processes),
-    Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1],
-    [statsd_msg_to_bin(M) || M <- Msgs2].
-
--spec run_queue_to_msg(non_neg_integer()) ->
-    statsd_msg().
-run_queue_to_msg(RunQueue) ->
-    #statsd_msg
-    { name  = <<"run_queue">>
-    , value = RunQueue
-    , type  = gauge
-    }.
-
--spec reductions_to_msg(non_neg_integer()) ->
-    statsd_msg().
-reductions_to_msg(Reductions) ->
-    #statsd_msg
-    { name  = <<"reductions">>
-    , value = Reductions
-    , type  = gauge
-    }.
-
--spec context_switches_to_msg(non_neg_integer()) ->
-    statsd_msg().
-context_switches_to_msg(ContextSwitches) ->
-    #statsd_msg
-    { name  = <<"context_switches">>
-    , value = ContextSwitches
-    , type  = gauge
-    }.
-
--spec io_bytes_in_to_msg(non_neg_integer()) ->
-    statsd_msg().
-io_bytes_in_to_msg(IOBytesIn) ->
-    #statsd_msg
-    { name  = <<"io.bytes_in">>
-    , value = IOBytesIn
-    , type  = gauge
-    }.
-
--spec io_bytes_out_to_msg(non_neg_integer()) ->
-    statsd_msg().
-io_bytes_out_to_msg(IOBytesOut) ->
-    #statsd_msg
-    { name  = <<"io.bytes_out">>
-    , value = IOBytesOut
-    , type  = gauge
-    }.
-
--spec procs_to_msgs(beam_stats_processes:t()) ->
-    [statsd_msg()].
-procs_to_msgs(
-    #beam_stats_processes
-    { individual_stats         = Procs
-    , count_all                = CountAll
-    , count_exiting            = CountExiting
-    , count_garbage_collecting = CountGarbageCollecting
-    , count_registered         = CountRegistered
-    , count_runnable           = CountRunnable
-    , count_running            = CountRunning
-    , count_suspended          = CountSuspended
-    , count_waiting            = CountWaiting
-    }
-) ->
-    [ gauge(<<"processes_count_all">>               , CountAll)
-    , gauge(<<"processes_count_exiting">>           , CountExiting)
-    , gauge(<<"processes_count_garbage_collecting">>, CountGarbageCollecting)
-    , gauge(<<"processes_count_registered">>        , CountRegistered)
-    , gauge(<<"processes_count_runnable">>          , CountRunnable)
-    , gauge(<<"processes_count_running">>           , CountRunning)
-    , gauge(<<"processes_count_suspended">>         , CountSuspended)
-    , gauge(<<"processes_count_waiting">>           , CountWaiting)
-    | lists:append([proc_to_msgs(P) || P <- Procs])
-    ].
-
--spec proc_to_msgs(beam_stats_process:t()) ->
-    [statsd_msg()].
-proc_to_msgs(
-    #beam_stats_process
-    { pid               = Pid
-    , memory            = Memory
-    , total_heap_size   = TotalHeapSize
-    , stack_size        = StackSize
-    , message_queue_len = MsgQueueLen
-    }=Process
-) ->
-    Origin = beam_stats_process:get_best_known_origin(Process),
-    OriginBin = proc_origin_to_bin(Origin),
-    PidBin = pid_to_bin(Pid),
-    OriginDotPid = <<OriginBin/binary, ".", PidBin/binary>>,
-    [ gauge(<<"process_memory."            , OriginDotPid/binary>>, Memory)
-    , gauge(<<"process_total_heap_size."   , OriginDotPid/binary>>, TotalHeapSize)
-    , gauge(<<"process_stack_size."        , OriginDotPid/binary>>, StackSize)
-    , gauge(<<"process_message_queue_len." , OriginDotPid/binary>>, MsgQueueLen)
-    ].
-
--spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
-    binary().
-proc_origin_to_bin({registered_name, Name}) ->
-    atom_to_binary(Name, utf8);
-proc_origin_to_bin({ancestry, Ancestry}) ->
-    #beam_stats_process_ancestry
-    { raw_initial_call  = InitCallRaw
-    , otp_initial_call  = InitCallOTPOpt
-    , otp_ancestors     = AncestorsOpt
-    } = Ancestry,
-    Blank             = <<"NONE">>,
-    InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt   , fun mfa_to_bin/1),
-    InitCallOTPBin    = hope_option:get(InitCallOTPBinOpt, Blank),
-    AncestorsBinOpt   = hope_option:map(AncestorsOpt     , fun ancestors_to_bin/1),
-    AncestorsBin      = hope_option:get(AncestorsBinOpt  , Blank),
-    InitCallRawBin    = mfa_to_bin(InitCallRaw),
-    << InitCallRawBin/binary
-     , "--"
-     , InitCallOTPBin/binary
-     , "--"
-     , AncestorsBin/binary
-    >>.
-
-ancestors_to_bin([]) ->
-    <<>>;
-ancestors_to_bin([A | Ancestors]) ->
-    ABin = ancestor_to_bin(A),
-    case ancestors_to_bin(Ancestors)
-    of  <<>> ->
-            ABin
-    ;   <<AncestorsBin/binary>> ->
-            <<ABin/binary, "-", AncestorsBin/binary>>
-    end.
-
-ancestor_to_bin(A) when is_atom(A) ->
-    atom_to_binary(A, utf8);
-ancestor_to_bin(A) when is_pid(A) ->
-    pid_to_bin(A).
-
-pid_to_bin(Pid) ->
-    PidList = erlang:pid_to_list(Pid),
-    PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
-             re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
-
--spec mfa_to_bin(mfa()) ->
-    binary().
-mfa_to_bin({Module, Function, Arity}) ->
-    ModuleBin   = atom_to_binary(Module  , utf8),
-    FunctionBin = atom_to_binary(Function, utf8),
-    ArityBin    = erlang:integer_to_binary(Arity),
-    <<ModuleBin/binary, "-", FunctionBin/binary, "-", ArityBin/binary>>.
-
-
--spec gauge(binary(), integer()) ->
-    statsd_msg().
-gauge(<<Name/binary>>, Value) when is_integer(Value) ->
-    #statsd_msg
-    { name  = Name
-    , value = Value
-    , type  = gauge
-    }.
-
--spec ets_to_msgs(beam_stats_ets:t()) ->
-    [statsd_msg()].
-ets_to_msgs(PerTableStats) ->
-    NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats),
-    lists:append(NestedMsgs).
-
--spec ets_table_to_msgs(beam_stats_ets_table:t()) ->
-    [statsd_msg()].
-ets_table_to_msgs(#beam_stats_ets_table
-    { id     = ID
-    , name   = Name
-    , size   = Size
-    , memory = Memory
-    }
-) ->
-    IDBin   = beam_stats_ets_table:id_to_bin(ID),
-    NameBin = atom_to_binary(Name, latin1),
-    NameAndID = <<NameBin/binary, ".", IDBin/binary>>,
-    SizeMsg =
-        #statsd_msg
-        { name  = <<"ets_table.size.", NameAndID/binary>>
-        , value = Size
-        , type  = gauge
-        },
-    MemoryMsg =
-        #statsd_msg
-        { name  = <<"ets_table.memory.", NameAndID/binary>>
-        , value = Memory
-        , type  = gauge
-        },
-    [SizeMsg, MemoryMsg].
-
--spec memory_to_msgs([{atom(), non_neg_integer()}]) ->
-    [statsd_msg()].
-memory_to_msgs(Memory) ->
-    [memory_component_to_statsd_msg(MC) || MC <- Memory].
-
--spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) ->
-    statsd_msg().
-memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 ->
-    MemTypeBin = atom_to_binary(MemType, latin1),
-    #statsd_msg
-    { name  = <<"memory.", MemTypeBin/binary>>
-    , value = MemSize
-    , type  = gauge
-    }.
-
--spec statsd_msg_add_name_prefix(statsd_msg(), binary()) ->
-    statsd_msg().
-statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) ->
-    Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>,
-    Name2 = <<Prefix/binary, Name1/binary>>,
-    Msg#statsd_msg{name=Name2}.
-
--spec statsd_msg_to_bin(statsd_msg()) ->
-    binary().
-statsd_msg_to_bin(
-    #statsd_msg
-    { name  = <<Name/binary>>
-    , value = Value
-    , type  = Type = gauge
-    }
-) when Value >= 0 ->
-    TypeBin = metric_type_to_bin(Type),
-    ValueBin = integer_to_binary(Value),
-    << Name/binary
-     , ":"
-     , ValueBin/binary
-     , "|"
-     , TypeBin/binary
-     , "\n"
-    >>.
-
--spec metric_type_to_bin(metric_type()) ->
-    binary().
-metric_type_to_bin(gauge) ->
-    <<"g">>.
-
--spec node_id_to_bin(node()) ->
-    binary().
-node_id_to_bin(NodeID) ->
-    NodeIDBin = atom_to_binary(NodeID, utf8),
-    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
+beam_stats_to_bins(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) ->
+    NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID),
+    NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault),
+    GraphiteMsgAddPrefix =
+        fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end,
+    MsgsGraphite1 = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin),
+    MsgsGraphite2 = lists:map(GraphiteMsgAddPrefix, MsgsGraphite1),
+    MsgsStatsD =
+        lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite2),
+    lists:map(fun beam_stats_msg_statsd_gauge:to_bin/1, MsgsStatsD).
diff --git a/src/beam_stats_msg_graphite.erl b/src/beam_stats_msg_graphite.erl
new file mode 100644 (file)
index 0000000..de18c0d
--- /dev/null
@@ -0,0 +1,266 @@
+-module(beam_stats_msg_graphite).
+
+-include("include/beam_stats.hrl").
+-include("include/beam_stats_ets_table.hrl").
+-include("include/beam_stats_msg_graphite.hrl").
+-include("include/beam_stats_process.hrl").
+-include("include/beam_stats_process_ancestry.hrl").
+-include("include/beam_stats_processes.hrl").
+
+-export_type(
+    [ t/0
+    ]).
+
+-export(
+    [ of_beam_stats/1
+    , of_beam_stats/2
+    , to_bin/1
+    , path_to_bin/1
+    , add_path_prefix/2
+    , node_id_to_bin/1
+    ]).
+
+-define(T, #?MODULE).
+
+-type t() ::
+    ?T{}.
+
+%% ============================================================================
+%% API
+%% ============================================================================
+
+-spec of_beam_stats(beam_stats:t()) ->
+    [t()].
+of_beam_stats(#beam_stats{node_id=NodeID}=BeamStats) ->
+    NodeIDBin = node_id_to_bin(NodeID),
+    of_beam_stats(BeamStats, NodeIDBin).
+
+-spec of_beam_stats(beam_stats:t(), binary()) ->
+    [t()].
+of_beam_stats(#beam_stats
+    { timestamp = Timestamp
+    , node_id   = _
+    , memory    = Memory
+    , io_bytes_in      = IOBytesIn
+    , io_bytes_out     = IOBytesOut
+    , context_switches = ContextSwitches
+    , reductions       = Reductions
+    , run_queue        = RunQueue
+    , ets              = ETS
+    , processes        = Processes
+    },
+    <<NodeID/binary>>
+) ->
+    Ts = Timestamp,
+    N = NodeID,
+    [ cons([N, <<"io">>               , <<"bytes_in">> ], IOBytesIn      , Ts)
+    , cons([N, <<"io">>               , <<"bytes_out">>], IOBytesOut     , Ts)
+    , cons([N, <<"context_switches">>                  ], ContextSwitches, Ts)
+    , cons([N, <<"reductions">>                        ], Reductions     , Ts)
+    , cons([N, <<"run_queue">>                         ], RunQueue       , Ts)
+    | of_memory(Memory, NodeID, Ts)
+    ]
+    ++ of_ets(ETS, NodeID, Ts)
+    ++ of_processes(Processes, NodeID, Ts).
+
+-spec to_bin(t()) ->
+    binary().
+to_bin(
+    ?T
+    { path      = Path
+    , value     = Value
+    , timestamp = Timestamp
+    }
+) ->
+    PathBin = path_to_bin(Path),
+    ValueBin = integer_to_binary(Value),
+    TimestampInt = timestamp_to_integer(Timestamp),
+    TimestampBin = integer_to_binary(TimestampInt),
+    <<PathBin/binary, " ", ValueBin/binary, " ", TimestampBin/binary>>.
+
+-spec add_path_prefix(t(), binary()) ->
+    t().
+add_path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
+    T?T{path = [Prefix | Path]}.
+
+-spec path_to_bin([binary()]) ->
+    binary().
+path_to_bin(Path) ->
+    bin_join(Path, <<".">>).
+
+-spec node_id_to_bin(node()) ->
+    binary().
+node_id_to_bin(NodeID) ->
+    NodeIDBin = atom_to_binary(NodeID, utf8),
+    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
+
+%% ============================================================================
+%% Helpers
+%% ============================================================================
+
+-spec bin_join([binary()], binary()) ->
+    binary().
+bin_join([]                         , <<_/binary>>  ) -> <<>>;
+bin_join([<<B/binary>> | []]        , <<_/binary>>  ) -> B;
+bin_join([<<B/binary>> | [_|_]=Bins], <<Sep/binary>>) ->
+    BinsBin = bin_join(Bins, Sep),
+    <<B/binary, Sep/binary, BinsBin/binary>>.
+
+-spec timestamp_to_integer(erlang:timestamp()) ->
+    non_neg_integer().
+timestamp_to_integer({Megaseconds, Seconds, _}) ->
+    Megaseconds * 1000000 + Seconds.
+
+-spec of_memory([{atom(), non_neg_integer()}], binary(), erlang:timestamp()) ->
+    [t()].
+of_memory(Memory, <<NodeID/binary>>, Timestamp) ->
+    ComponentToMessage =
+        fun ({Key, Value}) ->
+            KeyBin = atom_to_binary(Key, latin1),
+            cons([NodeID, <<"memory">>, KeyBin], Value, Timestamp)
+        end,
+    lists:map(ComponentToMessage, Memory).
+
+-spec of_ets(beam_stats_ets:t(), binary(), erlang:timestamp()) ->
+    [t()].
+of_ets(PerTableStats, <<NodeID/binary>>, Timestamp) ->
+    OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end,
+    NestedMsgs = lists:map(OfEtsTable, PerTableStats),
+    lists:append(NestedMsgs).
+
+-spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) ->
+    [t()].
+of_ets_table(#beam_stats_ets_table
+    { id     = ID
+    , name   = Name
+    , size   = Size
+    , memory = Memory
+    },
+    <<NodeID/binary>>,
+    Timestamp
+) ->
+    IDBin     = beam_stats_ets_table:id_to_bin(ID),
+    NameBin   = atom_to_binary(Name, latin1),
+    NameAndID = [NameBin, IDBin],
+    [ cons([NodeID, <<"ets_table">>, <<"size">>   | NameAndID], Size  , Timestamp)
+    , cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp)
+    ].
+
+-spec of_processes(beam_stats_processes:t(), binary(), erlang:timestamp()) ->
+    [t()].
+of_processes(
+    #beam_stats_processes
+    { individual_stats         = Processes
+    , count_all                = CountAll
+    , count_exiting            = CountExiting
+    , count_garbage_collecting = CountGarbageCollecting
+    , count_registered         = CountRegistered
+    , count_runnable           = CountRunnable
+    , count_running            = CountRunning
+    , count_suspended          = CountSuspended
+    , count_waiting            = CountWaiting
+    },
+    <<NodeID/binary>>,
+    Timestamp
+) ->
+    OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end,
+    PerProcessMsgsNested = lists:map(OfProcess, Processes),
+    PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested),
+    Ts = Timestamp,
+    N  = NodeID,
+    [ cons([N, <<"processes_count_all">>               ], CountAll              , Ts)
+    , cons([N, <<"processes_count_exiting">>           ], CountExiting          , Ts)
+    , cons([N, <<"processes_count_garbage_collecting">>], CountGarbageCollecting, Ts)
+    , cons([N, <<"processes_count_registered">>        ], CountRegistered       , Ts)
+    , cons([N, <<"processes_count_runnable">>          ], CountRunnable         , Ts)
+    , cons([N, <<"processes_count_running">>           ], CountRunning          , Ts)
+    , cons([N, <<"processes_count_suspended">>         ], CountSuspended        , Ts)
+    , cons([N, <<"processes_count_waiting">>           ], CountWaiting          , Ts)
+    | PerProcessMsgsFlattened
+    ].
+
+-spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) ->
+    [t()].
+of_process(
+    #beam_stats_process
+    { pid               = Pid
+    , memory            = Memory
+    , total_heap_size   = TotalHeapSize
+    , stack_size        = StackSize
+    , message_queue_len = MsgQueueLen
+    }=Process,
+    <<NodeID/binary>>,
+    Timestamp
+) ->
+    Origin = beam_stats_process:get_best_known_origin(Process),
+    OriginBin = proc_origin_to_bin(Origin),
+    PidBin = pid_to_bin(Pid),
+    OriginAndPid = [OriginBin, PidBin],
+    Ts = Timestamp,
+    N  = NodeID,
+    [ cons([N, <<"process_memory">>            | OriginAndPid], Memory        , Ts)
+    , cons([N, <<"process_total_heap_size">>   | OriginAndPid], TotalHeapSize , Ts)
+    , cons([N, <<"process_stack_size">>        | OriginAndPid], StackSize     , Ts)
+    , cons([N, <<"process_message_queue_len">> | OriginAndPid], MsgQueueLen   , Ts)
+    ].
+
+-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
+    binary().
+proc_origin_to_bin({registered_name, Name}) ->
+    atom_to_binary(Name, utf8);
+proc_origin_to_bin({ancestry, Ancestry}) ->
+    #beam_stats_process_ancestry
+    { raw_initial_call  = InitCallRaw
+    , otp_initial_call  = InitCallOTPOpt
+    , otp_ancestors     = AncestorsOpt
+    } = Ancestry,
+    Blank             = <<"NONE">>,
+    InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt   , fun mfa_to_bin/1),
+    InitCallOTPBin    = hope_option:get(InitCallOTPBinOpt, Blank),
+    AncestorsBinOpt   = hope_option:map(AncestorsOpt     , fun ancestors_to_bin/1),
+    AncestorsBin      = hope_option:get(AncestorsBinOpt  , Blank),
+    InitCallRawBin    = mfa_to_bin(InitCallRaw),
+    << InitCallRawBin/binary
+     , "--"
+     , InitCallOTPBin/binary
+     , "--"
+     , AncestorsBin/binary
+    >>.
+
+ancestors_to_bin([]) ->
+    <<>>;
+ancestors_to_bin([A | Ancestors]) ->
+    ABin = ancestor_to_bin(A),
+    case ancestors_to_bin(Ancestors)
+    of  <<>> ->
+            ABin
+    ;   <<AncestorsBin/binary>> ->
+            <<ABin/binary, "-", AncestorsBin/binary>>
+    end.
+
+ancestor_to_bin(A) when is_atom(A) ->
+    atom_to_binary(A, utf8);
+ancestor_to_bin(A) when is_pid(A) ->
+    pid_to_bin(A).
+
+pid_to_bin(Pid) ->
+    PidList = erlang:pid_to_list(Pid),
+    PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
+             re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
+
+-spec mfa_to_bin(mfa()) ->
+    binary().
+mfa_to_bin({Module, Function, Arity}) ->
+    ModuleBin   = atom_to_binary(Module  , utf8),
+    FunctionBin = atom_to_binary(Function, utf8),
+    ArityBin    = erlang:integer_to_binary(Arity),
+    <<ModuleBin/binary, "-", FunctionBin/binary, "-", ArityBin/binary>>.
+
+-spec cons([binary()], integer(), erlang:timestamp()) ->
+    t().
+cons(Path, Value, Timestamp) ->
+    ?T
+    { path      = Path
+    , value     = Value
+    , timestamp = Timestamp
+    }.
diff --git a/src/beam_stats_msg_statsd_gauge.erl b/src/beam_stats_msg_statsd_gauge.erl
new file mode 100644 (file)
index 0000000..07e5d02
--- /dev/null
@@ -0,0 +1,47 @@
+-module(beam_stats_msg_statsd_gauge).
+
+-include("include/beam_stats_msg_graphite.hrl").
+-include("include/beam_stats_msg_statsd_gauge.hrl").
+
+-export_type(
+    [ t/0
+    ]).
+
+-export(
+    [ of_msg_graphite/1
+    , to_bin/1
+    ]).
+
+-define(T, #?MODULE).
+
+-type t() ::
+    ?T{}.
+
+of_msg_graphite(
+    #beam_stats_msg_graphite
+    { path      = Path
+    , value     = Value
+    , timestamp = _Timestamp
+    }
+) ->
+    PathBin = beam_stats_msg_graphite:path_to_bin(Path),
+    cons(PathBin, Value).
+
+-spec cons(binary(), non_neg_integer()) ->
+    t().
+cons(<<Name/binary>>, Value) ->
+    ?T
+    { name  = Name
+    , value = Value
+    }.
+
+-spec to_bin(t()) ->
+    binary().
+to_bin(
+    ?T
+    { name  = <<Name/binary>>
+    , value = Value
+    }
+) when Value >= 0 ->
+    ValueBin = integer_to_binary(Value),
+    << Name/binary, ":", ValueBin/binary, "|g\n">>.
This page took 0.07366 seconds and 4 git commands to generate.