From: Siraaj Khandkar Date: Thu, 24 Sep 2015 01:57:15 +0000 (-0400) Subject: Merge pull request #12 from ibnfirnas/factor-and-abstract-graphite-msg X-Git-Tag: 0.10.0 X-Git-Url: https://git.xandkar.net/?a=commitdiff_plain;h=79691831e7e9c0f29834bd60a0f7518645d010ba;hp=010bff4874ddc0eee49d65edd03bcfbe0a702198;p=beam_stats.git Merge pull request #12 from ibnfirnas/factor-and-abstract-graphite-msg Factor and abstract graphite msg --- diff --git a/include/beam_stats_msg_graphite.hrl b/include/beam_stats_msg_graphite.hrl new file mode 100644 index 0000000..56df5cc --- /dev/null +++ b/include/beam_stats_msg_graphite.hrl @@ -0,0 +1,5 @@ +-record(beam_stats_msg_graphite, + { path :: [binary()] + , value :: integer() + , timestamp :: erlang:timestamp() + }). diff --git a/include/beam_stats_msg_statsd_gauge.hrl b/include/beam_stats_msg_statsd_gauge.hrl new file mode 100644 index 0000000..445c8a7 --- /dev/null +++ b/include/beam_stats_msg_statsd_gauge.hrl @@ -0,0 +1,4 @@ +-record(beam_stats_msg_statsd_gauge, + { name :: binary() + , value :: non_neg_integer() + }). diff --git a/src/beam_stats.app.src b/src/beam_stats.app.src index 3dca90d..e9dcfff 100644 --- a/src/beam_stats.app.src +++ b/src/beam_stats.app.src @@ -1,7 +1,7 @@ {application, beam_stats, [ {description, "Periodic VM stats production and consumption."}, - {vsn, "0.9.0"}, + {vsn, "0.10.0"}, {registered, []}, {applications, [ kernel diff --git a/src/beam_stats_consumer_graphite.erl b/src/beam_stats_consumer_graphite.erl index 6e0d5dd..036f6c9 100644 --- a/src/beam_stats_consumer_graphite.erl +++ b/src/beam_stats_consumer_graphite.erl @@ -32,7 +32,7 @@ -type state() :: #state{}. --define(GRAPHITE_PATH_PREFIX, "beam_stats"). +-define(PATH_PREFIX , <<"beam_stats">>). -define(DEFAULT_HOST , "localhost"). -define(DEFAULT_PORT , 2003). -define(DEFAULT_TIMEOUT , 5000). @@ -109,59 +109,14 @@ try_to_connect_if_no_socket( -spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> binary(). beam_stats_queue_to_binary(Q) -> - Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)], + Bins = [beam_stats_to_bins(B) || B <- queue:to_list(Q)], iolist_to_binary(Bins). --spec beam_stats_to_bin(beam_stats:t()) -> - binary(). -beam_stats_to_bin(#beam_stats - { timestamp = Timestamp - , node_id = NodeID - , memory = Memory - } -) -> - TimestampInt = timestamp_to_integer(Timestamp), - TimestampBin = integer_to_binary(TimestampInt), - <> = node_id_to_bin(NodeID), - MemoryPairToBin = make_pair_to_bin(NodeIDBin, TimestampBin, <<"memory">>), - MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory), - MemoryBins = lists:map(MemoryPairToBin, MemoryBinPairs), - AllBins = - [ MemoryBins - ], - iolist_to_binary(AllBins). - --spec timestamp_to_integer(erlang:timestamp()) -> - non_neg_integer(). -timestamp_to_integer({Megaseconds, Seconds, _}) -> - Megaseconds * 1000000 + Seconds. - --spec make_pair_to_bin(binary(), binary(), binary()) -> - fun(({binary(), binary()}) -> binary()). -make_pair_to_bin(<>, <>, <>) -> - fun ({<>, <>}) -> - << ?GRAPHITE_PATH_PREFIX - , "." - , NodeID/binary - , "." - , Type/binary - , "." - , K/binary - , " " - , V/binary - , " " - , TimestampBin/binary - , "\n" - >> - end. - --spec node_id_to_bin(node()) -> - binary(). -node_id_to_bin(NodeID) -> - NodeIDBin = atom_to_binary(NodeID, utf8), - re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). - --spec atom_int_to_bin_bin({atom(), integer()}) -> - {binary(), binary()}. -atom_int_to_bin_bin({K, V}) -> - {atom_to_binary(K, latin1), integer_to_binary(V)}. +-spec beam_stats_to_bins(beam_stats:t()) -> + [binary()]. +beam_stats_to_bins(#beam_stats{}=BeamStats) -> + MsgAddPrefix = + fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end, + Msgs1 = beam_stats_msg_graphite:of_beam_stats(BeamStats), + Msgs2 = lists:map(MsgAddPrefix, Msgs1), + lists:map(fun beam_stats_msg_graphite:to_bin/1, Msgs2). diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index cfd1e6d..06945e1 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -1,10 +1,6 @@ -module(beam_stats_consumer_statsd). -include("include/beam_stats.hrl"). --include("include/beam_stats_ets_table.hrl"). --include("include/beam_stats_process.hrl"). --include("include/beam_stats_process_ancestry.hrl"). --include("include/beam_stats_processes.hrl"). -include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). @@ -33,19 +29,6 @@ -define(DEFAULT_DST_PORT, 8125). -define(DEFAULT_SRC_PORT, 8124). --type metric_type() :: - % TODO: Add other metric types - gauge. - --record(statsd_msg, - { name :: binary() - , value :: non_neg_integer() - , type :: metric_type() - }). - --type statsd_msg() :: - #statsd_msg{}. - -record(state, { sock :: hope_option:t(gen_udp:socket()) , dst_host :: inet:ip_address() | inet:hostname() @@ -58,7 +41,7 @@ -type state() :: #state{}. --define(PATH_PREFIX, "beam_stats"). +-define(PATH_PREFIX, <<"beam_stats">>). %% ============================================================================ %% Consumer implementation @@ -168,267 +151,13 @@ beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> -spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) -> [binary()]. -beam_stats_to_bins(#beam_stats - { node_id = NodeID - , memory = Memory - , io_bytes_in = IOBytesIn - , io_bytes_out = IOBytesOut - , context_switches = ContextSwitches - , reductions = Reductions - , run_queue = RunQueue - , ets = ETS - , processes = Processes - }, - StaticNodeNameOpt -) -> - NodeIDBin = hope_option:get(StaticNodeNameOpt, node_id_to_bin(NodeID)), - Msgs1 = - [ io_bytes_in_to_msg(IOBytesIn) - , io_bytes_out_to_msg(IOBytesOut) - , context_switches_to_msg(ContextSwitches) - , reductions_to_msg(Reductions) - , run_queue_to_msg(RunQueue) - | memory_to_msgs(Memory) - ] - ++ ets_to_msgs(ETS) - ++ procs_to_msgs(Processes), - Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], - [statsd_msg_to_bin(M) || M <- Msgs2]. - --spec run_queue_to_msg(non_neg_integer()) -> - statsd_msg(). -run_queue_to_msg(RunQueue) -> - #statsd_msg - { name = <<"run_queue">> - , value = RunQueue - , type = gauge - }. - --spec reductions_to_msg(non_neg_integer()) -> - statsd_msg(). -reductions_to_msg(Reductions) -> - #statsd_msg - { name = <<"reductions">> - , value = Reductions - , type = gauge - }. - --spec context_switches_to_msg(non_neg_integer()) -> - statsd_msg(). -context_switches_to_msg(ContextSwitches) -> - #statsd_msg - { name = <<"context_switches">> - , value = ContextSwitches - , type = gauge - }. - --spec io_bytes_in_to_msg(non_neg_integer()) -> - statsd_msg(). -io_bytes_in_to_msg(IOBytesIn) -> - #statsd_msg - { name = <<"io.bytes_in">> - , value = IOBytesIn - , type = gauge - }. - --spec io_bytes_out_to_msg(non_neg_integer()) -> - statsd_msg(). -io_bytes_out_to_msg(IOBytesOut) -> - #statsd_msg - { name = <<"io.bytes_out">> - , value = IOBytesOut - , type = gauge - }. - --spec procs_to_msgs(beam_stats_processes:t()) -> - [statsd_msg()]. -procs_to_msgs( - #beam_stats_processes - { individual_stats = Procs - , count_all = CountAll - , count_exiting = CountExiting - , count_garbage_collecting = CountGarbageCollecting - , count_registered = CountRegistered - , count_runnable = CountRunnable - , count_running = CountRunning - , count_suspended = CountSuspended - , count_waiting = CountWaiting - } -) -> - [ gauge(<<"processes_count_all">> , CountAll) - , gauge(<<"processes_count_exiting">> , CountExiting) - , gauge(<<"processes_count_garbage_collecting">>, CountGarbageCollecting) - , gauge(<<"processes_count_registered">> , CountRegistered) - , gauge(<<"processes_count_runnable">> , CountRunnable) - , gauge(<<"processes_count_running">> , CountRunning) - , gauge(<<"processes_count_suspended">> , CountSuspended) - , gauge(<<"processes_count_waiting">> , CountWaiting) - | lists:append([proc_to_msgs(P) || P <- Procs]) - ]. - --spec proc_to_msgs(beam_stats_process:t()) -> - [statsd_msg()]. -proc_to_msgs( - #beam_stats_process - { pid = Pid - , memory = Memory - , total_heap_size = TotalHeapSize - , stack_size = StackSize - , message_queue_len = MsgQueueLen - }=Process -) -> - Origin = beam_stats_process:get_best_known_origin(Process), - OriginBin = proc_origin_to_bin(Origin), - PidBin = pid_to_bin(Pid), - OriginDotPid = <>, - [ gauge(<<"process_memory." , OriginDotPid/binary>>, Memory) - , gauge(<<"process_total_heap_size." , OriginDotPid/binary>>, TotalHeapSize) - , gauge(<<"process_stack_size." , OriginDotPid/binary>>, StackSize) - , gauge(<<"process_message_queue_len." , OriginDotPid/binary>>, MsgQueueLen) - ]. - --spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> - binary(). -proc_origin_to_bin({registered_name, Name}) -> - atom_to_binary(Name, utf8); -proc_origin_to_bin({ancestry, Ancestry}) -> - #beam_stats_process_ancestry - { raw_initial_call = InitCallRaw - , otp_initial_call = InitCallOTPOpt - , otp_ancestors = AncestorsOpt - } = Ancestry, - Blank = <<"NONE">>, - InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1), - InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank), - AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1), - AncestorsBin = hope_option:get(AncestorsBinOpt , Blank), - InitCallRawBin = mfa_to_bin(InitCallRaw), - << InitCallRawBin/binary - , "--" - , InitCallOTPBin/binary - , "--" - , AncestorsBin/binary - >>. - -ancestors_to_bin([]) -> - <<>>; -ancestors_to_bin([A | Ancestors]) -> - ABin = ancestor_to_bin(A), - case ancestors_to_bin(Ancestors) - of <<>> -> - ABin - ; <> -> - <> - end. - -ancestor_to_bin(A) when is_atom(A) -> - atom_to_binary(A, utf8); -ancestor_to_bin(A) when is_pid(A) -> - pid_to_bin(A). - -pid_to_bin(Pid) -> - PidList = erlang:pid_to_list(Pid), - PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]), - re:replace(PidBin , "[><]", "" , [global, {return, binary}]). - --spec mfa_to_bin(mfa()) -> - binary(). -mfa_to_bin({Module, Function, Arity}) -> - ModuleBin = atom_to_binary(Module , utf8), - FunctionBin = atom_to_binary(Function, utf8), - ArityBin = erlang:integer_to_binary(Arity), - <>. - - --spec gauge(binary(), integer()) -> - statsd_msg(). -gauge(<>, Value) when is_integer(Value) -> - #statsd_msg - { name = Name - , value = Value - , type = gauge - }. - --spec ets_to_msgs(beam_stats_ets:t()) -> - [statsd_msg()]. -ets_to_msgs(PerTableStats) -> - NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), - lists:append(NestedMsgs). - --spec ets_table_to_msgs(beam_stats_ets_table:t()) -> - [statsd_msg()]. -ets_table_to_msgs(#beam_stats_ets_table - { id = ID - , name = Name - , size = Size - , memory = Memory - } -) -> - IDBin = beam_stats_ets_table:id_to_bin(ID), - NameBin = atom_to_binary(Name, latin1), - NameAndID = <>, - SizeMsg = - #statsd_msg - { name = <<"ets_table.size.", NameAndID/binary>> - , value = Size - , type = gauge - }, - MemoryMsg = - #statsd_msg - { name = <<"ets_table.memory.", NameAndID/binary>> - , value = Memory - , type = gauge - }, - [SizeMsg, MemoryMsg]. - --spec memory_to_msgs([{atom(), non_neg_integer()}]) -> - [statsd_msg()]. -memory_to_msgs(Memory) -> - [memory_component_to_statsd_msg(MC) || MC <- Memory]. - --spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> - statsd_msg(). -memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> - MemTypeBin = atom_to_binary(MemType, latin1), - #statsd_msg - { name = <<"memory.", MemTypeBin/binary>> - , value = MemSize - , type = gauge - }. - --spec statsd_msg_add_name_prefix(statsd_msg(), binary()) -> - statsd_msg(). -statsd_msg_add_name_prefix(#statsd_msg{name=Name1}=Msg, <>) -> - Prefix = <>, - Name2 = <>, - Msg#statsd_msg{name=Name2}. - --spec statsd_msg_to_bin(statsd_msg()) -> - binary(). -statsd_msg_to_bin( - #statsd_msg - { name = <> - , value = Value - , type = Type = gauge - } -) when Value >= 0 -> - TypeBin = metric_type_to_bin(Type), - ValueBin = integer_to_binary(Value), - << Name/binary - , ":" - , ValueBin/binary - , "|" - , TypeBin/binary - , "\n" - >>. - --spec metric_type_to_bin(metric_type()) -> - binary(). -metric_type_to_bin(gauge) -> - <<"g">>. - --spec node_id_to_bin(node()) -> - binary(). -node_id_to_bin(NodeID) -> - NodeIDBin = atom_to_binary(NodeID, utf8), - re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). +beam_stats_to_bins(#beam_stats{node_id=NodeID}=BeamStats, StaticNodeNameOpt) -> + NodeIDBinDefault = beam_stats_msg_graphite:node_id_to_bin(NodeID), + NodeIDBin = hope_option:get(StaticNodeNameOpt, NodeIDBinDefault), + GraphiteMsgAddPrefix = + fun (M) -> beam_stats_msg_graphite:add_path_prefix(M, ?PATH_PREFIX) end, + MsgsGraphite1 = beam_stats_msg_graphite:of_beam_stats(BeamStats, NodeIDBin), + MsgsGraphite2 = lists:map(GraphiteMsgAddPrefix, MsgsGraphite1), + MsgsStatsD = + lists:map(fun beam_stats_msg_statsd_gauge:of_msg_graphite/1, MsgsGraphite2), + lists:map(fun beam_stats_msg_statsd_gauge:to_bin/1, MsgsStatsD). diff --git a/src/beam_stats_msg_graphite.erl b/src/beam_stats_msg_graphite.erl new file mode 100644 index 0000000..de18c0d --- /dev/null +++ b/src/beam_stats_msg_graphite.erl @@ -0,0 +1,266 @@ +-module(beam_stats_msg_graphite). + +-include("include/beam_stats.hrl"). +-include("include/beam_stats_ets_table.hrl"). +-include("include/beam_stats_msg_graphite.hrl"). +-include("include/beam_stats_process.hrl"). +-include("include/beam_stats_process_ancestry.hrl"). +-include("include/beam_stats_processes.hrl"). + +-export_type( + [ t/0 + ]). + +-export( + [ of_beam_stats/1 + , of_beam_stats/2 + , to_bin/1 + , path_to_bin/1 + , add_path_prefix/2 + , node_id_to_bin/1 + ]). + +-define(T, #?MODULE). + +-type t() :: + ?T{}. + +%% ============================================================================ +%% API +%% ============================================================================ + +-spec of_beam_stats(beam_stats:t()) -> + [t()]. +of_beam_stats(#beam_stats{node_id=NodeID}=BeamStats) -> + NodeIDBin = node_id_to_bin(NodeID), + of_beam_stats(BeamStats, NodeIDBin). + +-spec of_beam_stats(beam_stats:t(), binary()) -> + [t()]. +of_beam_stats(#beam_stats + { timestamp = Timestamp + , node_id = _ + , memory = Memory + , io_bytes_in = IOBytesIn + , io_bytes_out = IOBytesOut + , context_switches = ContextSwitches + , reductions = Reductions + , run_queue = RunQueue + , ets = ETS + , processes = Processes + }, + <> +) -> + Ts = Timestamp, + N = NodeID, + [ cons([N, <<"io">> , <<"bytes_in">> ], IOBytesIn , Ts) + , cons([N, <<"io">> , <<"bytes_out">>], IOBytesOut , Ts) + , cons([N, <<"context_switches">> ], ContextSwitches, Ts) + , cons([N, <<"reductions">> ], Reductions , Ts) + , cons([N, <<"run_queue">> ], RunQueue , Ts) + | of_memory(Memory, NodeID, Ts) + ] + ++ of_ets(ETS, NodeID, Ts) + ++ of_processes(Processes, NodeID, Ts). + +-spec to_bin(t()) -> + binary(). +to_bin( + ?T + { path = Path + , value = Value + , timestamp = Timestamp + } +) -> + PathBin = path_to_bin(Path), + ValueBin = integer_to_binary(Value), + TimestampInt = timestamp_to_integer(Timestamp), + TimestampBin = integer_to_binary(TimestampInt), + <>. + +-spec add_path_prefix(t(), binary()) -> + t(). +add_path_prefix(?T{path=Path}=T, <>) -> + T?T{path = [Prefix | Path]}. + +-spec path_to_bin([binary()]) -> + binary(). +path_to_bin(Path) -> + bin_join(Path, <<".">>). + +-spec node_id_to_bin(node()) -> + binary(). +node_id_to_bin(NodeID) -> + NodeIDBin = atom_to_binary(NodeID, utf8), + re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). + +%% ============================================================================ +%% Helpers +%% ============================================================================ + +-spec bin_join([binary()], binary()) -> + binary(). +bin_join([] , <<_/binary>> ) -> <<>>; +bin_join([<> | []] , <<_/binary>> ) -> B; +bin_join([<> | [_|_]=Bins], <>) -> + BinsBin = bin_join(Bins, Sep), + <>. + +-spec timestamp_to_integer(erlang:timestamp()) -> + non_neg_integer(). +timestamp_to_integer({Megaseconds, Seconds, _}) -> + Megaseconds * 1000000 + Seconds. + +-spec of_memory([{atom(), non_neg_integer()}], binary(), erlang:timestamp()) -> + [t()]. +of_memory(Memory, <>, Timestamp) -> + ComponentToMessage = + fun ({Key, Value}) -> + KeyBin = atom_to_binary(Key, latin1), + cons([NodeID, <<"memory">>, KeyBin], Value, Timestamp) + end, + lists:map(ComponentToMessage, Memory). + +-spec of_ets(beam_stats_ets:t(), binary(), erlang:timestamp()) -> + [t()]. +of_ets(PerTableStats, <>, Timestamp) -> + OfEtsTable = fun (Table) -> of_ets_table(Table, NodeID, Timestamp) end, + NestedMsgs = lists:map(OfEtsTable, PerTableStats), + lists:append(NestedMsgs). + +-spec of_ets_table(beam_stats_ets_table:t(), binary(), erlang:timestamp()) -> + [t()]. +of_ets_table(#beam_stats_ets_table + { id = ID + , name = Name + , size = Size + , memory = Memory + }, + <>, + Timestamp +) -> + IDBin = beam_stats_ets_table:id_to_bin(ID), + NameBin = atom_to_binary(Name, latin1), + NameAndID = [NameBin, IDBin], + [ cons([NodeID, <<"ets_table">>, <<"size">> | NameAndID], Size , Timestamp) + , cons([NodeID, <<"ets_table">>, <<"memory">> | NameAndID], Memory, Timestamp) + ]. + +-spec of_processes(beam_stats_processes:t(), binary(), erlang:timestamp()) -> + [t()]. +of_processes( + #beam_stats_processes + { individual_stats = Processes + , count_all = CountAll + , count_exiting = CountExiting + , count_garbage_collecting = CountGarbageCollecting + , count_registered = CountRegistered + , count_runnable = CountRunnable + , count_running = CountRunning + , count_suspended = CountSuspended + , count_waiting = CountWaiting + }, + <>, + Timestamp +) -> + OfProcess = fun (P) -> of_process(P, NodeID, Timestamp) end, + PerProcessMsgsNested = lists:map(OfProcess, Processes), + PerProcessMsgsFlattened = lists:append(PerProcessMsgsNested), + Ts = Timestamp, + N = NodeID, + [ cons([N, <<"processes_count_all">> ], CountAll , Ts) + , cons([N, <<"processes_count_exiting">> ], CountExiting , Ts) + , cons([N, <<"processes_count_garbage_collecting">>], CountGarbageCollecting, Ts) + , cons([N, <<"processes_count_registered">> ], CountRegistered , Ts) + , cons([N, <<"processes_count_runnable">> ], CountRunnable , Ts) + , cons([N, <<"processes_count_running">> ], CountRunning , Ts) + , cons([N, <<"processes_count_suspended">> ], CountSuspended , Ts) + , cons([N, <<"processes_count_waiting">> ], CountWaiting , Ts) + | PerProcessMsgsFlattened + ]. + +-spec of_process(beam_stats_process:t(), binary(), erlang:timestamp()) -> + [t()]. +of_process( + #beam_stats_process + { pid = Pid + , memory = Memory + , total_heap_size = TotalHeapSize + , stack_size = StackSize + , message_queue_len = MsgQueueLen + }=Process, + <>, + Timestamp +) -> + Origin = beam_stats_process:get_best_known_origin(Process), + OriginBin = proc_origin_to_bin(Origin), + PidBin = pid_to_bin(Pid), + OriginAndPid = [OriginBin, PidBin], + Ts = Timestamp, + N = NodeID, + [ cons([N, <<"process_memory">> | OriginAndPid], Memory , Ts) + , cons([N, <<"process_total_heap_size">> | OriginAndPid], TotalHeapSize , Ts) + , cons([N, <<"process_stack_size">> | OriginAndPid], StackSize , Ts) + , cons([N, <<"process_message_queue_len">> | OriginAndPid], MsgQueueLen , Ts) + ]. + +-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> + binary(). +proc_origin_to_bin({registered_name, Name}) -> + atom_to_binary(Name, utf8); +proc_origin_to_bin({ancestry, Ancestry}) -> + #beam_stats_process_ancestry + { raw_initial_call = InitCallRaw + , otp_initial_call = InitCallOTPOpt + , otp_ancestors = AncestorsOpt + } = Ancestry, + Blank = <<"NONE">>, + InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1), + InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank), + AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1), + AncestorsBin = hope_option:get(AncestorsBinOpt , Blank), + InitCallRawBin = mfa_to_bin(InitCallRaw), + << InitCallRawBin/binary + , "--" + , InitCallOTPBin/binary + , "--" + , AncestorsBin/binary + >>. + +ancestors_to_bin([]) -> + <<>>; +ancestors_to_bin([A | Ancestors]) -> + ABin = ancestor_to_bin(A), + case ancestors_to_bin(Ancestors) + of <<>> -> + ABin + ; <> -> + <> + end. + +ancestor_to_bin(A) when is_atom(A) -> + atom_to_binary(A, utf8); +ancestor_to_bin(A) when is_pid(A) -> + pid_to_bin(A). + +pid_to_bin(Pid) -> + PidList = erlang:pid_to_list(Pid), + PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]), + re:replace(PidBin , "[><]", "" , [global, {return, binary}]). + +-spec mfa_to_bin(mfa()) -> + binary(). +mfa_to_bin({Module, Function, Arity}) -> + ModuleBin = atom_to_binary(Module , utf8), + FunctionBin = atom_to_binary(Function, utf8), + ArityBin = erlang:integer_to_binary(Arity), + <>. + +-spec cons([binary()], integer(), erlang:timestamp()) -> + t(). +cons(Path, Value, Timestamp) -> + ?T + { path = Path + , value = Value + , timestamp = Timestamp + }. diff --git a/src/beam_stats_msg_statsd_gauge.erl b/src/beam_stats_msg_statsd_gauge.erl new file mode 100644 index 0000000..07e5d02 --- /dev/null +++ b/src/beam_stats_msg_statsd_gauge.erl @@ -0,0 +1,47 @@ +-module(beam_stats_msg_statsd_gauge). + +-include("include/beam_stats_msg_graphite.hrl"). +-include("include/beam_stats_msg_statsd_gauge.hrl"). + +-export_type( + [ t/0 + ]). + +-export( + [ of_msg_graphite/1 + , to_bin/1 + ]). + +-define(T, #?MODULE). + +-type t() :: + ?T{}. + +of_msg_graphite( + #beam_stats_msg_graphite + { path = Path + , value = Value + , timestamp = _Timestamp + } +) -> + PathBin = beam_stats_msg_graphite:path_to_bin(Path), + cons(PathBin, Value). + +-spec cons(binary(), non_neg_integer()) -> + t(). +cons(<>, Value) -> + ?T + { name = Name + , value = Value + }. + +-spec to_bin(t()) -> + binary(). +to_bin( + ?T + { name = <> + , value = Value + } +) when Value >= 0 -> + ValueBin = integer_to_binary(Value), + << Name/binary, ":", ValueBin/binary, "|g\n">>.