feat: use new msg abstractions in StatsD consumer.
authorSiraaj Khandkar <siraaj@khandkar.net>
Thu, 24 Sep 2015 01:14:39 +0000 (21:14 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Thu, 24 Sep 2015 01:50:32 +0000 (21:50 -0400)
src/beam_stats_consumer_statsd.erl
src/beam_stats_msg_graphite.erl

index cfd1e6d..06945e1 100644 (file)
@@ -1,10 +1,6 @@
 -module(beam_stats_consumer_statsd).
 
 -include("include/beam_stats.hrl").
--include("include/beam_stats_ets_table.hrl").
--include("include/beam_stats_process.hrl").
--include("include/beam_stats_process_ancestry.hrl").
--include("include/beam_stats_processes.hrl").
 -include("beam_stats_logging.hrl").
 
 -behaviour(beam_stats_consumer).
 -define(DEFAULT_DST_PORT, 8125).
 -define(DEFAULT_SRC_PORT, 8124).
 
--type metric_type() ::
-    % TODO: Add other metric types
-    gauge.
-
--record(statsd_msg,
-    { name  :: binary()
-    , value :: non_neg_integer()
-    , type  :: metric_type()
-    }).
-
--type statsd_msg() ::
-    #statsd_msg{}.
-
 -record(state,
     { sock     :: hope_option:t(gen_udp:socket())
     , dst_host :: inet:ip_address() | inet:hostname()
@@ -58,7 +41,7 @@
 -type state() ::
     #state{}.
 
--define(PATH_PREFIX, "beam_stats").
+-define(PATH_PREFIX, <<"beam_stats">>).
 
 %% ============================================================================
 %% Consumer implementation
@@ -168,267 +151,13 @@ beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) ->
 
 -spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) ->
     [binary()].
-beam_stats_to_bins(#beam_stats
-    { node_id = NodeID
-    , memory  = Memory
-    , io_bytes_in  = IOBytesIn
-    , io_bytes_out = IOBytesOut
-    , context_switches = ContextSwitches
-    , reductions       = Reductions
-    , run_queue        = RunQueue
-    , ets              = ETS
-    , processes        = Processes
-    },
-    StaticNodeNameOpt
-) ->
-    NodeIDBin = hope_option:get(StaticNodeNameOpt, node_id_to_bin(NodeID)),
-    Msgs1 =
-        [ io_bytes_in_to_msg(IOBytesIn)
-        , io_bytes_out_to_msg(IOBytesOut)
-        , context_switches_to_msg(ContextSwitches)
-        , reductions_to_msg(Reductions)
-        , run_queue_to_msg(RunQueue)
-        | memory_to_msgs(Memory)
-        ]
-        ++ ets_to_msgs(ETS)
-        ++ procs_to_msgs(Processes),
-    Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1],
-    [statsd_msg_to_bin(M) || M <- Msgs2].
-
--spec run_queue_to_msg(non_neg_integer()) ->
-    statsd_msg().
-run_queue_to_msg(RunQueue) ->
-    #statsd_msg
-    { name  = <<"run_queue">>
-    , value = RunQueue
-    , type  = gauge
-    }.
-
--spec reductions_to_msg(non_neg_integer()) ->
-    statsd_msg().
-reductions_to_msg(Reductions) ->
-    #statsd_msg
-    { name  = <<"reductions">>
-    , value = Reductions
-    , type  = gauge
-    }.
-
--spec context_switches_to_msg(non_neg_integer()) ->
-    statsd_msg().
-context_switches_to_msg(ContextSwitches) ->
-    #statsd_msg
-    { name  = <<"context_switches">>
-    , value = ContextSwitches
-    , type  = gauge
-    }.
-
--spec io_bytes_in_to_msg(non_neg_integer()) ->
-    statsd_msg().
-io_bytes_in_to_msg(IOBytesIn) ->
-    #statsd_msg
-    { name  = <<"io.bytes_in">>
-    , value = IOBytesIn
-    , type  = gauge
-    }.
-
--spec io_bytes_out_to_msg(non_neg_integer()) ->
-    statsd_msg().
-io_bytes_out_to_msg(IOBytesOut) ->
-    #statsd_msg
-    { name  = <<"io.bytes_out">>
-    , value = IOBytesOut
-    , type  = gauge
-    }.
-
--spec procs_to_msgs(beam_stats_processes:t()) ->
-    [statsd_msg()].
-procs_to_msgs(
-    #beam_stats_processes
-    { individual_stats         = Procs
-    , count_all                = CountAll
-    , count_exiting            = CountExiting
-    , count_garbage_collecting = CountGarbageCollecting
-    , count_registered         = CountRegistered
-    , count_runnable           = CountRunnable
-    , count_running            = CountRunning
-    , count_suspended          = CountSuspended
-    , count_waiting            = CountWaiting
-    }
-) ->
-    [ gauge(<<"processes_count_all">>               , CountAll)
-    , gauge(<<"processes_count_exiting">>           , CountExiting)
-    , gauge(<<"processes_count_garbage_collecting">>, CountGarbageCollecting)
-    , gauge(<<"processes_count_registered">>        , CountRegistered)
-    , gauge(<<"processes_count_runnable">>          , CountRunnable)
-    , gauge(<<"processes_count_running">>           , CountRunning)
-    , gauge(<<"processes_count_suspended">>         , CountSuspended)
-    , gauge(<<"processes_count_waiting">>           , CountWaiting)
-    | lists:append([proc_to_msgs(P) || P <- Procs])
-    ].
-
--spec proc_to_msgs(beam_stats_process:t()) ->
-    [statsd_msg()].
-proc_to_msgs(
-    #beam_stats_process
-    { pid               = Pid
-    , memory            = Memory
-    , total_heap_size   = TotalHeapSize
-    , stack_size        = StackSize
-    , message_queue_len = MsgQueueLen
-    }=Process
-) ->
-    Origin = beam_stats_process:get_best_known_origin(Process),
-    OriginBin = proc_origin_to_bin(Origin),
-    PidBin = pid_to_bin(Pid),
-    OriginDotPid = <<OriginBin/binary, ".", PidBin/binary>>,
-    [ gauge(<<"process_memory."            , OriginDotPid/binary>>, Memory)
-    , gauge(<<"process_total_heap_size."   , OriginDotPid/binary>>, TotalHeapSize)
-    , gauge(<<"process_stack_size."        , OriginDotPid/binary>>, StackSize)
-    , gauge(<<"process_message_queue_len." , OriginDotPid/binary>>, MsgQueueLen)
-    ].
-
--spec proc_origin_to_bin(beam_stats_process:best_known_origin()) ->
-    binary().
-proc_origin_to_bin({registered_name, Name}) ->
-    atom_to_binary(Name, utf8);
-proc_origin_to_bin({ancestry, Ancestry}) ->
-    #beam_stats_process_ancestry
-    { raw_initial_call  = InitCallRaw
-    , otp_initial_call  = InitCallOTPOpt
-    , otp_ancestors     = AncestorsOpt
-    } = Ancestry,
-    Blank             = <<"NONE">>,
-    InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt   , fun mfa_to_bin/1),
-    InitCallOTPBin    = hope_option:get(InitCallOTPBinOpt, Blank),
-    AncestorsBinOpt   = hope_option:map(AncestorsOpt     , fun ancestors_to_bin/1),
-    AncestorsBin      = hope_option:get(AncestorsBinOpt  , Blank),
-    InitCallRawBin    = mfa_to_bin(InitCallRaw),
-    << InitCallRawBin/binary
-     , "--"
-     , InitCallOTPBin/binary
-     , "--"
-     , AncestorsBin/binary
-    >>.
-
-ancestors_to_bin([]) ->
-    <<>>;
-ancestors_to_bin([A | Ancestors]) ->
-    ABin = ancestor_to_bin(A),
-    case ancestors_to_bin(Ancestors)
-    of  <<>> ->
-            ABin
-    ;   <<AncestorsBin/binary>> ->
-            <<ABin/binary, "-", AncestorsBin/binary>>
-    end.
-
-ancestor_to_bin(A) when is_atom(A) ->
-    atom_to_binary(A, utf8);
-ancestor_to_bin(A) when is_pid(A) ->
-    pid_to_bin(A).
-
-pid_to_bin(Pid) ->
-    PidList = erlang:pid_to_list(Pid),
-    PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]),
-             re:replace(PidBin , "[><]", "" , [global, {return, binary}]).
-
--spec mfa_to_bin(mfa()) ->
-    binary().
-mfa_to_bin({Module, Function, Arity}) ->
-    ModuleBin   = atom_to_binary(Module  , utf8),
-    FunctionBin = atom_to_binary(Function, utf8),
-    ArityBin    = erlang:integer_to_binary(Arity),
-    <<ModuleBin/binary, "-", FunctionBin/binary, "-", ArityBin/binary>>.
-
-
--spec gauge(binary(), integer()) ->
-    statsd_msg().
-gauge(<<Name/binary>>, Value) when is_integer(Value) ->
-    #statsd_msg
-    { name  = Name
-    , value = Value
-    , type  = gauge
-    }.
-
--spec ets_to_msgs(beam_stats_ets:t()) ->
-    [statsd_msg()].
-ets_to_msgs(PerTableStats) ->
-    NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats),
-    lists:append(NestedMsgs).
-
--spec ets_table_to_msgs(beam_stats_ets_table:t()) ->
-    [statsd_msg()].
-ets_table_to_msgs(#beam_stats_ets_table
-    { id     = ID
-    , name   = Name
-    , size   = Size
-    , memory = Memory
-    }
-) ->
-    IDBin   = beam_stats_ets_table:id_to_bin(ID),
-    NameBin = atom_to_binary(Name, latin1),
-    NameAndID = <<NameBin/binary, ".", IDBin/binary>>,
-    SizeMsg =
-        #statsd_msg
-        { name  = <<"ets_table.size.", NameAndID/binary>>
-        , value = Size
-        , type  = gauge
-        },
-    MemoryMsg =
-        #statsd_msg
-        { name  = <<"ets_table.memory.", NameAndID/binary>>
-        , value = Memory
-        , type  = gauge
-        },
-    [SizeMsg, MemoryMsg].
-
--spec memory_to_msgs([{atom(), non_neg_integer()}]) ->
-    [statsd_msg()].
-memory_to_msgs(Memory) ->
-    [memory_component_to_statsd_msg(MC) || MC <- Memory].
-
--spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) ->
-    statsd_msg().
-memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 ->
-    MemTypeBin = atom_to_binary(MemType, latin1),
-    #statsd_msg
-    { name  = <<"memory.", MemTypeBin/binary>>
-    , value = MemSize
-    , type  = gauge
-    }.
-
--spec statsd_msg_add_name_prefix(statsd_msg(), binary()) ->
-    statsd_msg().
-statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <<NodeID/binary>>) ->
-    Prefix = <<?PATH_PREFIX, ".", NodeID/binary, ".">>,
-    Name2 = <<Prefix/binary, Name1/binary>>,
-    Msg#statsd_msg{name=Name2}.
-
--spec statsd_msg_to_bin(statsd_msg()) ->
-    binary().
-statsd_msg_to_bin(
-    #statsd_msg
-    { name  = <<Name/binary>>
-    , value = Value
-    , type  = Type = gauge
-    }
-) when Value >= 0 ->
-    TypeBin = metric_type_to_bin(Type),
-    ValueBin = integer_to_binary(Value),
-    << Name/binary
-     , ":"
-     , ValueBin/binary
-     , "|"
-     , TypeBin/binary
-     , "\n"
-    >>.
-
--spec metric_type_to_bin(metric_type()) ->
-    binary().
-metric_type_to_bin(gauge) ->
-    <<"g">>.
-
--spec node_id_to_bin(node()) ->
-    binary().
-node_id_to_bin(NodeID) ->
-    NodeIDBin = atom_to_binary(NodeID, utf8),
-    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
+beam_stats_to_bins(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) ->
+    NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID),
+    NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault),
+    GraphiteMsgAddPrefix =
+        fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end,
+    MsgsGraphite1 = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin),
+    MsgsGraphite2 = lists:map(GraphiteMsgAddPrefix, MsgsGraphite1),
+    MsgsStatsD =
+        lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite2),
+    lists:map(fun beam_stats_msg_statsd_gauge:to_bin/1, MsgsStatsD).
index 6883ff1..c88e327 100644 (file)
 
 -export(
     [ of_beam_stats/1
+    , of_beam_stats/2
     , to_bin/1
     , path_to_bin/1
+    , add_path_prefix/2
+    , node_id_to_bin/1
     ]).
 
 -define(T, #?MODULE).
@@ -76,11 +79,22 @@ to_bin(
     TimestampBin = integer_to_binary(TimestampInt),
     <<PathBin/binary, " ", ValueBin/binary, " ", TimestampBin/binary>>.
 
+-spec add_path_prefix(t(), binary()) ->
+    t().
+add_path_prefix(?T{path=Path}=T, <<Prefix/binary>>) ->
+    T?T{path = [Prefix | Path]}.
+
 -spec path_to_bin([binary()]) ->
     binary().
 path_to_bin(Path) ->
     bin_join(Path, <<".">>).
 
+-spec node_id_to_bin(node()) ->
+    binary().
+node_id_to_bin(NodeID) ->
+    NodeIDBin = atom_to_binary(NodeID, utf8),
+    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
+
 %% ============================================================================
 %% Helpers
 %% ============================================================================
@@ -251,9 +265,3 @@ cons(Path, Value, Timestamp) ->
     , value     = Value
     , timestamp = Timestamp
     }.
-
--spec node_id_to_bin(node()) ->
-    binary().
-node_id_to_bin(NodeID) ->
-    NodeIDBin = atom_to_binary(NodeID, utf8),
-    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
This page took 0.059416 seconds and 4 git commands to generate.