X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_consumer_statsd.erl;h=cfd1e6d3c9eae264c007ea45d554cd0c28e5df8d;hb=4bb8ddfe1a69842312c68d25aa3f69db53280921;hp=1ce0afb0ec40e834ab1f1be6da7dbbc5ce03d92a;hpb=8c0788b23299b3d09f255cdfb1932f10da3545a2;p=beam_stats.git diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index 1ce0afb..cfd1e6d 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -2,6 +2,9 @@ -include("include/beam_stats.hrl"). -include("include/beam_stats_ets_table.hrl"). +-include("include/beam_stats_process.hrl"). +-include("include/beam_stats_process_ancestry.hrl"). +-include("include/beam_stats_processes.hrl"). -include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). @@ -23,6 +26,7 @@ | {dst_port , inet:port_number()} | {src_port , inet:port_number()} | {num_msgs_per_packet , non_neg_integer()} + | {static_node_name , binary()} . -define(DEFAULT_DST_HOST, "localhost"). @@ -48,6 +52,7 @@ , dst_port :: inet:port_number() , src_port :: inet:port_number() , num_msgs_per_packet :: non_neg_integer() + , static_node_name :: hope_option:t(binary()) }). -type state() :: @@ -67,19 +72,27 @@ init(Options) -> DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), + StaticNodeNameOpt = hope_kv_list:get(Options, static_node_name), State = #state { sock = none , dst_host = DstHost , dst_port = DstPort , src_port = SrcPort , num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt }, {ConsumptionInterval, State}. -spec consume(beam_stats_consumer:queue(), state()) -> state(). -consume(Q, #state{num_msgs_per_packet=NumMsgsPerPacket}=State) -> - Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket), +consume( + Q, + #state + { num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt + }=State +) -> + Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt), lists:foldl(fun try_to_connect_and_send/2, State, Packets). -spec terminate(state()) -> @@ -142,14 +155,18 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> %% Serialization %% ============================================================================ --spec beam_stats_queue_to_packets(beam_stats_consumer:queue(), non_neg_integer()) -> +-spec beam_stats_queue_to_packets( + beam_stats_consumer:queue(), + non_neg_integer(), + hope_option:t(binary()) +) -> [binary()]. -beam_stats_queue_to_packets(Q, NumMsgsPerPacket) -> - MsgBins = lists:append([beam_stats_to_bins(B) || B <- queue:to_list(Q)]), +beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> + MsgBins = lists:append([beam_stats_to_bins(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). --spec beam_stats_to_bins(beam_stats:t()) -> +-spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) -> [binary()]. beam_stats_to_bins(#beam_stats { node_id = NodeID @@ -160,9 +177,11 @@ beam_stats_to_bins(#beam_stats , reductions = Reductions , run_queue = RunQueue , ets = ETS - } + , processes = Processes + }, + StaticNodeNameOpt ) -> - NodeIDBin = node_id_to_bin(NodeID), + NodeIDBin = hope_option:get(StaticNodeNameOpt, node_id_to_bin(NodeID)), Msgs1 = [ io_bytes_in_to_msg(IOBytesIn) , io_bytes_out_to_msg(IOBytesOut) @@ -171,7 +190,8 @@ beam_stats_to_bins(#beam_stats , run_queue_to_msg(RunQueue) | memory_to_msgs(Memory) ] - ++ ets_to_msgs(ETS), + ++ ets_to_msgs(ETS) + ++ procs_to_msgs(Processes), Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], [statsd_msg_to_bin(M) || M <- Msgs2]. @@ -220,6 +240,115 @@ io_bytes_out_to_msg(IOBytesOut) -> , type = gauge }. +-spec procs_to_msgs(beam_stats_processes:t()) -> + [statsd_msg()]. +procs_to_msgs( + #beam_stats_processes + { individual_stats = Procs + , count_all = CountAll + , count_exiting = CountExiting + , count_garbage_collecting = CountGarbageCollecting + , count_registered = CountRegistered + , count_runnable = CountRunnable + , count_running = CountRunning + , count_suspended = CountSuspended + , count_waiting = CountWaiting + } +) -> + [ gauge(<<"processes_count_all">> , CountAll) + , gauge(<<"processes_count_exiting">> , CountExiting) + , gauge(<<"processes_count_garbage_collecting">>, CountGarbageCollecting) + , gauge(<<"processes_count_registered">> , CountRegistered) + , gauge(<<"processes_count_runnable">> , CountRunnable) + , gauge(<<"processes_count_running">> , CountRunning) + , gauge(<<"processes_count_suspended">> , CountSuspended) + , gauge(<<"processes_count_waiting">> , CountWaiting) + | lists:append([proc_to_msgs(P) || P <- Procs]) + ]. + +-spec proc_to_msgs(beam_stats_process:t()) -> + [statsd_msg()]. +proc_to_msgs( + #beam_stats_process + { pid = Pid + , memory = Memory + , total_heap_size = TotalHeapSize + , stack_size = StackSize + , message_queue_len = MsgQueueLen + }=Process +) -> + Origin = beam_stats_process:get_best_known_origin(Process), + OriginBin = proc_origin_to_bin(Origin), + PidBin = pid_to_bin(Pid), + OriginDotPid = <>, + [ gauge(<<"process_memory." , OriginDotPid/binary>>, Memory) + , gauge(<<"process_total_heap_size." , OriginDotPid/binary>>, TotalHeapSize) + , gauge(<<"process_stack_size." , OriginDotPid/binary>>, StackSize) + , gauge(<<"process_message_queue_len." , OriginDotPid/binary>>, MsgQueueLen) + ]. + +-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> + binary(). +proc_origin_to_bin({registered_name, Name}) -> + atom_to_binary(Name, utf8); +proc_origin_to_bin({ancestry, Ancestry}) -> + #beam_stats_process_ancestry + { raw_initial_call = InitCallRaw + , otp_initial_call = InitCallOTPOpt + , otp_ancestors = AncestorsOpt + } = Ancestry, + Blank = <<"NONE">>, + InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1), + InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank), + AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1), + AncestorsBin = hope_option:get(AncestorsBinOpt , Blank), + InitCallRawBin = mfa_to_bin(InitCallRaw), + << InitCallRawBin/binary + , "--" + , InitCallOTPBin/binary + , "--" + , AncestorsBin/binary + >>. + +ancestors_to_bin([]) -> + <<>>; +ancestors_to_bin([A | Ancestors]) -> + ABin = ancestor_to_bin(A), + case ancestors_to_bin(Ancestors) + of <<>> -> + ABin + ; <> -> + <> + end. + +ancestor_to_bin(A) when is_atom(A) -> + atom_to_binary(A, utf8); +ancestor_to_bin(A) when is_pid(A) -> + pid_to_bin(A). + +pid_to_bin(Pid) -> + PidList = erlang:pid_to_list(Pid), + PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]), + re:replace(PidBin , "[><]", "" , [global, {return, binary}]). + +-spec mfa_to_bin(mfa()) -> + binary(). +mfa_to_bin({Module, Function, Arity}) -> + ModuleBin = atom_to_binary(Module , utf8), + FunctionBin = atom_to_binary(Function, utf8), + ArityBin = erlang:integer_to_binary(Arity), + <>. + + +-spec gauge(binary(), integer()) -> + statsd_msg(). +gauge(<>, Value) when is_integer(Value) -> + #statsd_msg + { name = Name + , value = Value + , type = gauge + }. + -spec ets_to_msgs(beam_stats_ets:t()) -> [statsd_msg()]. ets_to_msgs(PerTableStats) ->