From: Siraaj Khandkar Date: Thu, 24 Sep 2015 01:14:39 +0000 (-0400) Subject: feat: use new msg abstractions in StatsD consumer. X-Git-Tag: 0.10.0^2~3 X-Git-Url: https://git.xandkar.net/?a=commitdiff_plain;h=ece99ea366e3bc8720528aaf5e1e1563e85b561c;p=beam_stats.git feat: use new msg abstractions in StatsD consumer. --- diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index cfd1e6d..06945e1 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -1,10 +1,6 @@ -module(beam_stats_consumer_statsd). -include("include/beam_stats.hrl"). --include("include/beam_stats_ets_table.hrl"). --include("include/beam_stats_process.hrl"). --include("include/beam_stats_process_ancestry.hrl"). --include("include/beam_stats_processes.hrl"). -include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). @@ -33,19 +29,6 @@ -define(DEFAULT_DST_PORT, 8125). -define(DEFAULT_SRC_PORT, 8124). --type metric_type() :: - % TODO: Add other metric types - gauge. - --record(statsd_msg, - { name :: binary() - , value :: non_neg_integer() - , type :: metric_type() - }). - --type statsd_msg() :: - #statsd_msg{}. - -record(state, { sock :: hope_option:t(gen_udp:socket()) , dst_host :: inet:ip_address() | inet:hostname() @@ -58,7 +41,7 @@ -type state() :: #state{}. --define(PATH_PREFIX, "beam_stats"). +-define(PATH_PREFIX, <<"beam_stats">>). %% ============================================================================ %% Consumer implementation @@ -168,267 +151,13 @@ beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> -spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) -> [binary()]. -beam_stats_to_bins(#beam_stats - { node_id = NodeID - , memory = Memory - , io_bytes_in = IOBytesIn - , io_bytes_out = IOBytesOut - , context_switches = ContextSwitches - , reductions = Reductions - , run_queue = RunQueue - , ets = ETS - , processes = Processes - }, - StaticNodeNameOpt -) -> - NodeIDBin = hope_option:get(StaticNodeNameOpt, node_id_to_bin(NodeID)), - Msgs1 = - [ io_bytes_in_to_msg(IOBytesIn) - , io_bytes_out_to_msg(IOBytesOut) - , context_switches_to_msg(ContextSwitches) - , reductions_to_msg(Reductions) - , run_queue_to_msg(RunQueue) - | memory_to_msgs(Memory) - ] - ++ ets_to_msgs(ETS) - ++ procs_to_msgs(Processes), - Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], - [statsd_msg_to_bin(M) || M <- Msgs2]. - --spec run_queue_to_msg(non_neg_integer()) -> - statsd_msg(). -run_queue_to_msg(RunQueue) -> - #statsd_msg - { name = <<"run_queue">> - , value = RunQueue - , type = gauge - }. - --spec reductions_to_msg(non_neg_integer()) -> - statsd_msg(). -reductions_to_msg(Reductions) -> - #statsd_msg - { name = <<"reductions">> - , value = Reductions - , type = gauge - }. - --spec context_switches_to_msg(non_neg_integer()) -> - statsd_msg(). -context_switches_to_msg(ContextSwitches) -> - #statsd_msg - { name = <<"context_switches">> - , value = ContextSwitches - , type = gauge - }. - --spec io_bytes_in_to_msg(non_neg_integer()) -> - statsd_msg(). -io_bytes_in_to_msg(IOBytesIn) -> - #statsd_msg - { name = <<"io.bytes_in">> - , value = IOBytesIn - , type = gauge - }. - --spec io_bytes_out_to_msg(non_neg_integer()) -> - statsd_msg(). -io_bytes_out_to_msg(IOBytesOut) -> - #statsd_msg - { name = <<"io.bytes_out">> - , value = IOBytesOut - , type = gauge - }. - --spec procs_to_msgs(beam_stats_processes:t()) -> - [statsd_msg()]. -procs_to_msgs( - #beam_stats_processes - { individual_stats = Procs - , count_all = CountAll - , count_exiting = CountExiting - , count_garbage_collecting = CountGarbageCollecting - , count_registered = CountRegistered - , count_runnable = CountRunnable - , count_running = CountRunning - , count_suspended = CountSuspended - , count_waiting = CountWaiting - } -) -> - [ gauge(<<"processes_count_all">> , CountAll) - , gauge(<<"processes_count_exiting">> , CountExiting) - , gauge(<<"processes_count_garbage_collecting">>, CountGarbageCollecting) - , gauge(<<"processes_count_registered">> , CountRegistered) - , gauge(<<"processes_count_runnable">> , CountRunnable) - , gauge(<<"processes_count_running">> , CountRunning) - , gauge(<<"processes_count_suspended">> , CountSuspended) - , gauge(<<"processes_count_waiting">> , CountWaiting) - | lists:append([proc_to_msgs(P) || P <- Procs]) - ]. - --spec proc_to_msgs(beam_stats_process:t()) -> - [statsd_msg()]. -proc_to_msgs( - #beam_stats_process - { pid = Pid - , memory = Memory - , total_heap_size = TotalHeapSize - , stack_size = StackSize - , message_queue_len = MsgQueueLen - }=Process -) -> - Origin = beam_stats_process:get_best_known_origin(Process), - OriginBin = proc_origin_to_bin(Origin), - PidBin = pid_to_bin(Pid), - OriginDotPid = <>, - [ gauge(<<"process_memory." , OriginDotPid/binary>>, Memory) - , gauge(<<"process_total_heap_size." , OriginDotPid/binary>>, TotalHeapSize) - , gauge(<<"process_stack_size." , OriginDotPid/binary>>, StackSize) - , gauge(<<"process_message_queue_len." , OriginDotPid/binary>>, MsgQueueLen) - ]. - --spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> - binary(). -proc_origin_to_bin({registered_name, Name}) -> - atom_to_binary(Name, utf8); -proc_origin_to_bin({ancestry, Ancestry}) -> - #beam_stats_process_ancestry - { raw_initial_call = InitCallRaw - , otp_initial_call = InitCallOTPOpt - , otp_ancestors = AncestorsOpt - } = Ancestry, - Blank = <<"NONE">>, - InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1), - InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank), - AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1), - AncestorsBin = hope_option:get(AncestorsBinOpt , Blank), - InitCallRawBin = mfa_to_bin(InitCallRaw), - << InitCallRawBin/binary - , "--" - , InitCallOTPBin/binary - , "--" - , AncestorsBin/binary - >>. - -ancestors_to_bin([]) -> - <<>>; -ancestors_to_bin([A | Ancestors]) -> - ABin = ancestor_to_bin(A), - case ancestors_to_bin(Ancestors) - of <<>> -> - ABin - ; <> -> - <> - end. - -ancestor_to_bin(A) when is_atom(A) -> - atom_to_binary(A, utf8); -ancestor_to_bin(A) when is_pid(A) -> - pid_to_bin(A). - -pid_to_bin(Pid) -> - PidList = erlang:pid_to_list(Pid), - PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]), - re:replace(PidBin , "[><]", "" , [global, {return, binary}]). - --spec mfa_to_bin(mfa()) -> - binary(). -mfa_to_bin({Module, Function, Arity}) -> - ModuleBin = atom_to_binary(Module , utf8), - FunctionBin = atom_to_binary(Function, utf8), - ArityBin = erlang:integer_to_binary(Arity), - <>. - - --spec gauge(binary(), integer()) -> - statsd_msg(). -gauge(<>, Value) when is_integer(Value) -> - #statsd_msg - { name = Name - , value = Value - , type = gauge - }. - --spec ets_to_msgs(beam_stats_ets:t()) -> - [statsd_msg()]. -ets_to_msgs(PerTableStats) -> - NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), - lists:append(NestedMsgs). - --spec ets_table_to_msgs(beam_stats_ets_table:t()) -> - [statsd_msg()]. -ets_table_to_msgs(#beam_stats_ets_table - { id = ID - , name = Name - , size = Size - , memory = Memory - } -) -> - IDBin = beam_stats_ets_table:id_to_bin(ID), - NameBin = atom_to_binary(Name, latin1), - NameAndID = <>, - SizeMsg = - #statsd_msg - { name = <<"ets_table.size.", NameAndID/binary>> - , value = Size - , type = gauge - }, - MemoryMsg = - #statsd_msg - { name = <<"ets_table.memory.", NameAndID/binary>> - , value = Memory - , type = gauge - }, - [SizeMsg, MemoryMsg]. - --spec memory_to_msgs([{atom(), non_neg_integer()}]) -> - [statsd_msg()]. -memory_to_msgs(Memory) -> - [memory_component_to_statsd_msg(MC) || MC <- Memory]. - --spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> - statsd_msg(). -memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> - MemTypeBin = atom_to_binary(MemType, latin1), - #statsd_msg - { name = <<"memory.", MemTypeBin/binary>> - , value = MemSize - , type = gauge - }. - --spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> - statsd_msg(). -statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <>) -> - Prefix = <>, - Name2 = <>, - Msg#statsd_msg{name=Name2}. - --spec statsd_msg_to_bin(statsd_msg()) -> - binary(). -statsd_msg_to_bin( - #statsd_msg - { name = <> - , value = Value - , type = Type = gauge - } -) when Value >= 0 -> - TypeBin = metric_type_to_bin(Type), - ValueBin = integer_to_binary(Value), - << Name/binary - , ":" - , ValueBin/binary - , "|" - , TypeBin/binary - , "\n" - >>. - --spec metric_type_to_bin(metric_type()) -> - binary(). -metric_type_to_bin(gauge) -> - <<"g">>. - --spec node_id_to_bin(node()) -> - binary(). -node_id_to_bin(NodeID) -> - NodeIDBin = atom_to_binary(NodeID, utf8), - re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). +beam_stats_to_bins(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) -> + NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID), + NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault), + GraphiteMsgAddPrefix = + fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end, + MsgsGraphite1 = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin), + MsgsGraphite2 = lists:map(GraphiteMsgAddPrefix, MsgsGraphite1), + MsgsStatsD = + lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite2), + lists:map(fun beam_stats_msg_statsd_gauge:to_bin/1, MsgsStatsD). diff --git a/src/beam_stats_msg_graphite.erl b/src/beam_stats_msg_graphite.erl index 6883ff1..c88e327 100644 --- a/src/beam_stats_msg_graphite.erl +++ b/src/beam_stats_msg_graphite.erl @@ -13,8 +13,11 @@ -export( [ of_beam_stats/1 + , of_beam_stats/2 , to_bin/1 , path_to_bin/1 + , add_path_prefix/2 + , node_id_to_bin/1 ]). -define(T, #?MODULE). @@ -76,11 +79,22 @@ to_bin( TimestampBin = integer_to_binary(TimestampInt), <>. +-spec add_path_prefix(t(), binary()) -> + t(). +add_path_prefix(?T{path=Path}=T, <>) -> + T?T{path = [Prefix | Path]}. + -spec path_to_bin([binary()]) -> binary(). path_to_bin(Path) -> bin_join(Path, <<".">>). +-spec node_id_to_bin(node()) -> + binary(). +node_id_to_bin(NodeID) -> + NodeIDBin = atom_to_binary(NodeID, utf8), + re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). + %% ============================================================================ %% Helpers %% ============================================================================ @@ -251,9 +265,3 @@ cons(Path, Value, Timestamp) -> , value = Value , timestamp = Timestamp }. - --spec node_id_to_bin(node()) -> - binary(). -node_id_to_bin(NodeID) -> - NodeIDBin = atom_to_binary(NodeID, utf8), - re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).