X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_consumer_statsd.erl;h=cfd1e6d3c9eae264c007ea45d554cd0c28e5df8d;hb=2cd7cf62e9b1e456c0ccc963d92a9170c3af8251;hp=6baf459397af889171a41cea7545be6e7135fed4;hpb=76aefffb4a92500ad4664d120b5ef2fec80e7988;p=beam_stats.git diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index 6baf459..cfd1e6d 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -1,6 +1,11 @@ -module(beam_stats_consumer_statsd). -include("include/beam_stats.hrl"). +-include("include/beam_stats_ets_table.hrl"). +-include("include/beam_stats_process.hrl"). +-include("include/beam_stats_process_ancestry.hrl"). +-include("include/beam_stats_processes.hrl"). +-include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). @@ -15,20 +20,13 @@ , terminate/1 ]). -%% Tests (to be run by CT): --export( - [ ct_test__beam_stats_to_bins/1 - , ct_test__memory_component_to_statsd_msg/1 - , ct_test__statsd_msg_add_name_prefix/1 - , ct_test__statsd_msg_to_bin/1 - , ct_test__node_id_to_bin/1 - ]). - -type option() :: {consumption_interval , non_neg_integer()} | {dst_host , inet:ip_address() | inet:hostname()} | {dst_port , inet:port_number()} | {src_port , inet:port_number()} + | {num_msgs_per_packet , non_neg_integer()} + | {static_node_name , binary()} . -define(DEFAULT_DST_HOST, "localhost"). @@ -53,6 +51,8 @@ , dst_host :: inet:ip_address() | inet:hostname() , dst_port :: inet:port_number() , src_port :: inet:port_number() + , num_msgs_per_packet :: non_neg_integer() + , static_node_name :: hope_option:t(binary()) }). -type state() :: @@ -71,20 +71,29 @@ init(Options) -> DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), + NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), + StaticNodeNameOpt = hope_kv_list:get(Options, static_node_name), State = #state { sock = none , dst_host = DstHost , dst_port = DstPort , src_port = SrcPort + , num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt }, {ConsumptionInterval, State}. -spec consume(beam_stats_consumer:queue(), state()) -> state(). -consume(Q, #state{}=State1) -> - Payload = beam_stats_queue_to_binary(Q), - State2 = try_to_connect_if_no_socket(State1), - try_to_send(State2, Payload). +consume( + Q, + #state + { num_msgs_per_packet = NumMsgsPerPacket + , static_node_name = StaticNodeNameOpt + }=State +) -> + Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt), + lists:foldl(fun try_to_connect_and_send/2, State, Packets). -spec terminate(state()) -> {}. @@ -96,10 +105,16 @@ terminate(#state{sock=SockOpt}) -> %% Transport %% ============================================================================ +-spec try_to_connect_and_send(binary(), state()) -> + state(). +try_to_connect_and_send(<>, #state{}=State1) -> + State2 = try_to_connect_if_no_socket(State1), + try_to_send(State2, Payload). + -spec try_to_send(state(), binary()) -> state(). try_to_send(#state{sock=none}=State, _) -> - io:format("error: socket closed~n"), + ?log_error("Sending failed. No socket in state."), % TODO: Maybe schedule retry? State; try_to_send( @@ -114,7 +129,10 @@ try_to_send( of ok -> State ; {error, _}=Error -> - io:format("error: gen_udp:send/4 failed: ~p~n", [Error]), + ?log_error( + "gen_udp:send(~p, ~p, ~p, ~p) -> ~p", + [Sock, DstHost, DstPort, Error] + ), % TODO: Do something with unsent messages? ok = gen_udp:close(Sock), State#state{sock=none} @@ -129,7 +147,7 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> of {ok, Sock} -> State#state{sock = {some, Sock}} ; {error, _}=Error -> - io:format("error: gen_udp:open/1 failed: ~p~n", [Error]), + ?log_error("gen_udp:open(~p) -> ~p", [SrcPort, Error]), State#state{sock = none} end. @@ -137,23 +155,232 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> %% Serialization %% ============================================================================ --spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> - binary(). -beam_stats_queue_to_binary(Q) -> - iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). +-spec beam_stats_queue_to_packets( + beam_stats_consumer:queue(), + non_neg_integer(), + hope_option:t(binary()) +) -> + [binary()]. +beam_stats_queue_to_packets(Q, NumMsgsPerPacket, StaticNodeNameOpt) -> + MsgBins = lists:append([beam_stats_to_bins(B, StaticNodeNameOpt) || B <- queue:to_list(Q)]), + MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), + lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). --spec beam_stats_to_bins(beam_stats:t()) -> +-spec beam_stats_to_bins(beam_stats:t(), hope_option:t(binary())) -> [binary()]. beam_stats_to_bins(#beam_stats { node_id = NodeID , memory = Memory - } + , io_bytes_in = IOBytesIn + , io_bytes_out = IOBytesOut + , context_switches = ContextSwitches + , reductions = Reductions + , run_queue = RunQueue + , ets = ETS + , processes = Processes + }, + StaticNodeNameOpt ) -> - NodeIDBin = node_id_to_bin(NodeID), - Msgs1 = memory_to_msgs(Memory), + NodeIDBin = hope_option:get(StaticNodeNameOpt, node_id_to_bin(NodeID)), + Msgs1 = + [ io_bytes_in_to_msg(IOBytesIn) + , io_bytes_out_to_msg(IOBytesOut) + , context_switches_to_msg(ContextSwitches) + , reductions_to_msg(Reductions) + , run_queue_to_msg(RunQueue) + | memory_to_msgs(Memory) + ] + ++ ets_to_msgs(ETS) + ++ procs_to_msgs(Processes), Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], [statsd_msg_to_bin(M) || M <- Msgs2]. +-spec run_queue_to_msg(non_neg_integer()) -> + statsd_msg(). +run_queue_to_msg(RunQueue) -> + #statsd_msg + { name = <<"run_queue">> + , value = RunQueue + , type = gauge + }. + +-spec reductions_to_msg(non_neg_integer()) -> + statsd_msg(). +reductions_to_msg(Reductions) -> + #statsd_msg + { name = <<"reductions">> + , value = Reductions + , type = gauge + }. + +-spec context_switches_to_msg(non_neg_integer()) -> + statsd_msg(). +context_switches_to_msg(ContextSwitches) -> + #statsd_msg + { name = <<"context_switches">> + , value = ContextSwitches + , type = gauge + }. + +-spec io_bytes_in_to_msg(non_neg_integer()) -> + statsd_msg(). +io_bytes_in_to_msg(IOBytesIn) -> + #statsd_msg + { name = <<"io.bytes_in">> + , value = IOBytesIn + , type = gauge + }. + +-spec io_bytes_out_to_msg(non_neg_integer()) -> + statsd_msg(). +io_bytes_out_to_msg(IOBytesOut) -> + #statsd_msg + { name = <<"io.bytes_out">> + , value = IOBytesOut + , type = gauge + }. + +-spec procs_to_msgs(beam_stats_processes:t()) -> + [statsd_msg()]. +procs_to_msgs( + #beam_stats_processes + { individual_stats = Procs + , count_all = CountAll + , count_exiting = CountExiting + , count_garbage_collecting = CountGarbageCollecting + , count_registered = CountRegistered + , count_runnable = CountRunnable + , count_running = CountRunning + , count_suspended = CountSuspended + , count_waiting = CountWaiting + } +) -> + [ gauge(<<"processes_count_all">> , CountAll) + , gauge(<<"processes_count_exiting">> , CountExiting) + , gauge(<<"processes_count_garbage_collecting">>, CountGarbageCollecting) + , gauge(<<"processes_count_registered">> , CountRegistered) + , gauge(<<"processes_count_runnable">> , CountRunnable) + , gauge(<<"processes_count_running">> , CountRunning) + , gauge(<<"processes_count_suspended">> , CountSuspended) + , gauge(<<"processes_count_waiting">> , CountWaiting) + | lists:append([proc_to_msgs(P) || P <- Procs]) + ]. + +-spec proc_to_msgs(beam_stats_process:t()) -> + [statsd_msg()]. +proc_to_msgs( + #beam_stats_process + { pid = Pid + , memory = Memory + , total_heap_size = TotalHeapSize + , stack_size = StackSize + , message_queue_len = MsgQueueLen + }=Process +) -> + Origin = beam_stats_process:get_best_known_origin(Process), + OriginBin = proc_origin_to_bin(Origin), + PidBin = pid_to_bin(Pid), + OriginDotPid = <>, + [ gauge(<<"process_memory." , OriginDotPid/binary>>, Memory) + , gauge(<<"process_total_heap_size." , OriginDotPid/binary>>, TotalHeapSize) + , gauge(<<"process_stack_size." , OriginDotPid/binary>>, StackSize) + , gauge(<<"process_message_queue_len." , OriginDotPid/binary>>, MsgQueueLen) + ]. + +-spec proc_origin_to_bin(beam_stats_process:best_known_origin()) -> + binary(). +proc_origin_to_bin({registered_name, Name}) -> + atom_to_binary(Name, utf8); +proc_origin_to_bin({ancestry, Ancestry}) -> + #beam_stats_process_ancestry + { raw_initial_call = InitCallRaw + , otp_initial_call = InitCallOTPOpt + , otp_ancestors = AncestorsOpt + } = Ancestry, + Blank = <<"NONE">>, + InitCallOTPBinOpt = hope_option:map(InitCallOTPOpt , fun mfa_to_bin/1), + InitCallOTPBin = hope_option:get(InitCallOTPBinOpt, Blank), + AncestorsBinOpt = hope_option:map(AncestorsOpt , fun ancestors_to_bin/1), + AncestorsBin = hope_option:get(AncestorsBinOpt , Blank), + InitCallRawBin = mfa_to_bin(InitCallRaw), + << InitCallRawBin/binary + , "--" + , InitCallOTPBin/binary + , "--" + , AncestorsBin/binary + >>. + +ancestors_to_bin([]) -> + <<>>; +ancestors_to_bin([A | Ancestors]) -> + ABin = ancestor_to_bin(A), + case ancestors_to_bin(Ancestors) + of <<>> -> + ABin + ; <> -> + <> + end. + +ancestor_to_bin(A) when is_atom(A) -> + atom_to_binary(A, utf8); +ancestor_to_bin(A) when is_pid(A) -> + pid_to_bin(A). + +pid_to_bin(Pid) -> + PidList = erlang:pid_to_list(Pid), + PidBin = re:replace(PidList, "[\.]", "_", [global, {return, binary}]), + re:replace(PidBin , "[><]", "" , [global, {return, binary}]). + +-spec mfa_to_bin(mfa()) -> + binary(). +mfa_to_bin({Module, Function, Arity}) -> + ModuleBin = atom_to_binary(Module , utf8), + FunctionBin = atom_to_binary(Function, utf8), + ArityBin = erlang:integer_to_binary(Arity), + <>. + + +-spec gauge(binary(), integer()) -> + statsd_msg(). +gauge(<>, Value) when is_integer(Value) -> + #statsd_msg + { name = Name + , value = Value + , type = gauge + }. + +-spec ets_to_msgs(beam_stats_ets:t()) -> + [statsd_msg()]. +ets_to_msgs(PerTableStats) -> + NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), + lists:append(NestedMsgs). + +-spec ets_table_to_msgs(beam_stats_ets_table:t()) -> + [statsd_msg()]. +ets_table_to_msgs(#beam_stats_ets_table + { id = ID + , name = Name + , size = Size + , memory = Memory + } +) -> + IDBin = beam_stats_ets_table:id_to_bin(ID), + NameBin = atom_to_binary(Name, latin1), + NameAndID = <>, + SizeMsg = + #statsd_msg + { name = <<"ets_table.size.", NameAndID/binary>> + , value = Size + , type = gauge + }, + MemoryMsg = + #statsd_msg + { name = <<"ets_table.memory.", NameAndID/binary>> + , value = Memory + , type = gauge + }, + [SizeMsg, MemoryMsg]. + -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> [statsd_msg()]. memory_to_msgs(Memory) -> @@ -162,8 +389,9 @@ memory_to_msgs(Memory) -> -spec memory_component_to_statsd_msg({atom(), non_neg_integer()}) -> statsd_msg(). memory_component_to_statsd_msg({MemType, MemSize}) when MemSize >= 0 -> + MemTypeBin = atom_to_binary(MemType, latin1), #statsd_msg - { name = atom_to_binary(MemType, latin1) + { name = <<"memory.", MemTypeBin/binary>> , value = MemSize , type = gauge }. @@ -204,28 +432,3 @@ metric_type_to_bin(gauge) -> node_id_to_bin(NodeID) -> NodeIDBin = atom_to_binary(NodeID, utf8), re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). - -%% ============================================================================ -%% Tests -%% ============================================================================ - -ct_test__beam_stats_to_bins(_Cfg) -> - BEAMStats = #beam_stats{node_id = 'node@host.local', memory = [{foo,1}]}, - [<>] = - beam_stats_to_bins(BEAMStats). - -ct_test__memory_component_to_statsd_msg(_Cfg) -> - #statsd_msg{name = <<"foo">>, value = 1, type = gauge} = - memory_component_to_statsd_msg({foo, 1}). - -ct_test__statsd_msg_add_name_prefix(_Cfg) -> - Msg1 = #statsd_msg{name = <<"foo">>, value = 1, type = gauge}, - Msg2 = statsd_msg_add_name_prefix(Msg1, <<"bar">>), - <> = Msg2#statsd_msg.name. - -ct_test__statsd_msg_to_bin(_Cfg) -> - Msg = #statsd_msg{name = <<"foo">>, value = 1, type = gauge}, - <<"foo:1|g\n">> = statsd_msg_to_bin(Msg). - -ct_test__node_id_to_bin(_Cfg) -> - <<"foo_bar_local">> = node_id_to_bin('foo@bar.local').