From: Siraaj Khandkar Date: Tue, 15 Sep 2015 21:13:06 +0000 (-0400) Subject: Merge pull request #2 from ibnfirnas/per-ets-table-stats X-Git-Tag: 0.6.0 X-Git-Url: https://git.xandkar.net/?a=commitdiff_plain;h=refs%2Ftags%2F0.6.0;hp=28c1d03aafd6e29afd911f6e291cd46d9591575d;p=beam_stats.git Merge pull request #2 from ibnfirnas/per-ets-table-stats Per ets table stats --- diff --git a/README.md b/README.md index 593d6a6..3a55536 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Essentially like `folsomite`, but different. Different in the following ways: , {dst_host , "localhost"} , {dst_port , 8125} , {src_port , 8124} + , {num_msgs_per_packet , 10} ]} , {beam_stats_consumer_graphite, [ {consumption_interval , 60000} diff --git a/include/beam_stats.hrl b/include/beam_stats.hrl index 8698310..5624270 100644 --- a/include/beam_stats.hrl +++ b/include/beam_stats.hrl @@ -7,10 +7,11 @@ , context_switches :: non_neg_integer() , reductions :: non_neg_integer() , run_queue :: non_neg_integer() + , ets :: beam_stats_ets:t() + %, statistics :: [{atom() , term()}] %, system :: [{atom() , term()}] %, process :: [{atom() , term()}] %, port :: [{atom() , term()}] - %, ets :: [{atom() , term()}] %, dets :: [{atom() , term()}] }). diff --git a/include/beam_stats_ets_table.hrl b/include/beam_stats_ets_table.hrl new file mode 100644 index 0000000..e69e58e --- /dev/null +++ b/include/beam_stats_ets_table.hrl @@ -0,0 +1,6 @@ +-record(beam_stats_ets_table, + { id :: beam_stats_ets_table:id() + , name :: atom() + , size :: non_neg_integer() + , memory :: non_neg_integer() + }). diff --git a/rebar.config b/rebar.config index 8a4f3f1..f1816dc 100644 --- a/rebar.config +++ b/rebar.config @@ -1,5 +1,5 @@ { deps -, [ {hope, ".*", {git, "https://github.com/ibnfirnas/hope.git", {tag, "3.7.0"}}} +, [ {hope, ".*", {git, "https://github.com/ibnfirnas/hope.git", {tag, "3.8.0"}}} ] }. diff --git a/src/beam_stats.app.src b/src/beam_stats.app.src index 9eb72ab..05fb345 100644 --- a/src/beam_stats.app.src +++ b/src/beam_stats.app.src @@ -1,7 +1,7 @@ {application, beam_stats, [ {description, "Periodic VM stats production and consumption."}, - {vsn, "0.5.2"}, + {vsn, "0.6.0"}, {registered, []}, {applications, [ kernel diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index 7a06d68..1ce0afb 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -1,6 +1,7 @@ -module(beam_stats_consumer_statsd). -include("include/beam_stats.hrl"). +-include("include/beam_stats_ets_table.hrl"). -include("beam_stats_logging.hrl"). -behaviour(beam_stats_consumer). @@ -21,6 +22,7 @@ | {dst_host , inet:ip_address() | inet:hostname()} | {dst_port , inet:port_number()} | {src_port , inet:port_number()} + | {num_msgs_per_packet , non_neg_integer()} . -define(DEFAULT_DST_HOST, "localhost"). @@ -45,6 +47,7 @@ , dst_host :: inet:ip_address() | inet:hostname() , dst_port :: inet:port_number() , src_port :: inet:port_number() + , num_msgs_per_packet :: non_neg_integer() }). -type state() :: @@ -63,20 +66,21 @@ init(Options) -> DstHost = hope_kv_list:get(Options, dst_host, ?DEFAULT_DST_HOST), DstPort = hope_kv_list:get(Options, dst_port, ?DEFAULT_DST_PORT), SrcPort = hope_kv_list:get(Options, src_port, ?DEFAULT_SRC_PORT), + NumMsgsPerPacket = hope_kv_list:get(Options, num_msgs_per_packet, 10), State = #state { sock = none , dst_host = DstHost , dst_port = DstPort , src_port = SrcPort + , num_msgs_per_packet = NumMsgsPerPacket }, {ConsumptionInterval, State}. -spec consume(beam_stats_consumer:queue(), state()) -> state(). -consume(Q, #state{}=State1) -> - Payload = beam_stats_queue_to_binary(Q), - State2 = try_to_connect_if_no_socket(State1), - try_to_send(State2, Payload). +consume(Q, #state{num_msgs_per_packet=NumMsgsPerPacket}=State) -> + Packets = beam_stats_queue_to_packets(Q, NumMsgsPerPacket), + lists:foldl(fun try_to_connect_and_send/2, State, Packets). -spec terminate(state()) -> {}. @@ -88,6 +92,12 @@ terminate(#state{sock=SockOpt}) -> %% Transport %% ============================================================================ +-spec try_to_connect_and_send(binary(), state()) -> + state(). +try_to_connect_and_send(<>, #state{}=State1) -> + State2 = try_to_connect_if_no_socket(State1), + try_to_send(State2, Payload). + -spec try_to_send(state(), binary()) -> state(). try_to_send(#state{sock=none}=State, _) -> @@ -132,10 +142,12 @@ try_to_connect_if_no_socket(#state{sock=none, src_port=SrcPort}=State) -> %% Serialization %% ============================================================================ --spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> - binary(). -beam_stats_queue_to_binary(Q) -> - iolist_to_binary([beam_stats_to_bins(B) || B <- queue:to_list(Q)]). +-spec beam_stats_queue_to_packets(beam_stats_consumer:queue(), non_neg_integer()) -> + [binary()]. +beam_stats_queue_to_packets(Q, NumMsgsPerPacket) -> + MsgBins = lists:append([beam_stats_to_bins(B) || B <- queue:to_list(Q)]), + MsgBinsChucks = hope_list:divide(MsgBins, NumMsgsPerPacket), + lists:map(fun erlang:iolist_to_binary/1, MsgBinsChucks). -spec beam_stats_to_bins(beam_stats:t()) -> [binary()]. @@ -147,6 +159,7 @@ beam_stats_to_bins(#beam_stats , context_switches = ContextSwitches , reductions = Reductions , run_queue = RunQueue + , ets = ETS } ) -> NodeIDBin = node_id_to_bin(NodeID), @@ -157,7 +170,8 @@ beam_stats_to_bins(#beam_stats , reductions_to_msg(Reductions) , run_queue_to_msg(RunQueue) | memory_to_msgs(Memory) - ], + ] + ++ ets_to_msgs(ETS), Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], [statsd_msg_to_bin(M) || M <- Msgs2]. @@ -206,6 +220,38 @@ io_bytes_out_to_msg(IOBytesOut) -> , type = gauge }. +-spec ets_to_msgs(beam_stats_ets:t()) -> + [statsd_msg()]. +ets_to_msgs(PerTableStats) -> + NestedMsgs = lists:map(fun ets_table_to_msgs/1, PerTableStats), + lists:append(NestedMsgs). + +-spec ets_table_to_msgs(beam_stats_ets_table:t()) -> + [statsd_msg()]. +ets_table_to_msgs(#beam_stats_ets_table + { id = ID + , name = Name + , size = Size + , memory = Memory + } +) -> + IDBin = beam_stats_ets_table:id_to_bin(ID), + NameBin = atom_to_binary(Name, latin1), + NameAndID = <>, + SizeMsg = + #statsd_msg + { name = <<"ets_table.size.", NameAndID/binary>> + , value = Size + , type = gauge + }, + MemoryMsg = + #statsd_msg + { name = <<"ets_table.memory.", NameAndID/binary>> + , value = Memory + , type = gauge + }, + [SizeMsg, MemoryMsg]. + -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> [statsd_msg()]. memory_to_msgs(Memory) -> diff --git a/src/beam_stats_ets.erl b/src/beam_stats_ets.erl new file mode 100644 index 0000000..3965765 --- /dev/null +++ b/src/beam_stats_ets.erl @@ -0,0 +1,17 @@ +-module(beam_stats_ets). + +-export_type( + [ t/0 + ]). + +-export( + [ collect/0 + ]). + +-type t() :: + [beam_stats_ets_table:t()]. + +-spec collect() -> + t(). +collect() -> + lists:map(fun beam_stats_ets_table:of_id/1, ets:all()). diff --git a/src/beam_stats_ets_table.erl b/src/beam_stats_ets_table.erl new file mode 100644 index 0000000..b603656 --- /dev/null +++ b/src/beam_stats_ets_table.erl @@ -0,0 +1,44 @@ +-module(beam_stats_ets_table). + +-include("include/beam_stats_ets_table.hrl"). + +-export_type( + [ t/0 + , id/0 + ]). + +-export( + [ of_id/1 + , id_to_bin/1 + ]). + +-type id() :: + atom() + | ets:tid() + % integer() is just a workaround, to let us mock ets:tid(), which is + % opaque, but represented as an integer, without Dialyzer complaining. + | integer() + . + +-type t() :: + #?MODULE{}. + +-spec of_id(id()) -> + t(). +of_id(ID) -> + WordSize = erlang:system_info(wordsize), + NumberOfWords = ets:info(ID, memory), + NumberOfBytes = NumberOfWords * WordSize, + #?MODULE + { id = ID + , name = ets:info(ID, name) + , size = ets:info(ID, size) + , memory = NumberOfBytes + }. + +-spec id_to_bin(atom() | ets:tid()) -> + binary(). +id_to_bin(ID) when is_atom(ID) -> + atom_to_binary(ID, latin1); +id_to_bin(ID) when is_integer(ID) -> + integer_to_binary(ID). diff --git a/src/beam_stats_state.erl b/src/beam_stats_state.erl index f21f231..7c41fc0 100644 --- a/src/beam_stats_state.erl +++ b/src/beam_stats_state.erl @@ -15,6 +15,7 @@ -record(snapshots, { memory :: [{atom(), non_neg_integer()}] , run_queue :: non_neg_integer() + , ets :: beam_stats_ets:t() }). -type snapshots() :: @@ -83,6 +84,7 @@ export( #snapshots { memory = Memory , run_queue = RunQueue + , ets = ETS } , deltas = #deltas @@ -111,12 +113,14 @@ export( , context_switches = CurrentContextSwitches - PreviousContextSwitches , reductions = Reductions , run_queue = RunQueue + , ets = ETS }. snapshots_new() -> #snapshots { memory = erlang:memory() , run_queue = erlang:statistics(run_queue) + , ets = beam_stats_ets:collect() }. deltas_new() -> diff --git a/test/beam_stats_consumer_statsd_SUITE.erl b/test/beam_stats_consumer_statsd_SUITE.erl index f0cffb0..20a2de7 100644 --- a/test/beam_stats_consumer_statsd_SUITE.erl +++ b/test/beam_stats_consumer_statsd_SUITE.erl @@ -1,6 +1,7 @@ -module(beam_stats_consumer_statsd_SUITE). -include_lib("beam_stats/include/beam_stats.hrl"). +-include_lib("beam_stats/include/beam_stats_ets_table.hrl"). -export( [ all/0 @@ -33,15 +34,31 @@ groups() -> %% ============================================================================= t_send(_Cfg) -> + ETSTableStatsFoo = + #beam_stats_ets_table + { id = foo + , name = foo + , size = 5 + , memory = 25 + }, + ETSTableStatsBar = + #beam_stats_ets_table + { id = 37 + , name = bar + , size = 8 + , memory = 38 + }, + % TODO: Indent #beam_stats as #beam_stats_ets_table BEAMStats = #beam_stats { timestamp = {1, 2, 3} , node_id = 'node_foo@host_bar' - , memory = [{mem_type_foo, 1}] + , memory = [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}] , io_bytes_in = 3 , io_bytes_out = 7 , context_switches = 5 , reductions = 9 , run_queue = 17 + , ets = [ETSTableStatsFoo, ETSTableStatsBar] }, ServerPort = 8125, {ok, ServerSocket} = gen_udp:open(ServerPort, [binary, {active, false}]), @@ -50,14 +67,37 @@ t_send(_Cfg) -> {_, State1} = beam_stats_consumer_statsd:init(Options), State2 = beam_stats_consumer_statsd:consume(BEAMStatsQ, State1), {} = beam_stats_consumer_statsd:terminate(State2), - ResultOfReceive = gen_udp:recv(ServerSocket, 0), + ResultOfReceive1 = gen_udp:recv(ServerSocket, 0), + {ok, {_, _, PacketReceived1}} = ResultOfReceive1, + ResultOfReceive2 = gen_udp:recv(ServerSocket, 0), + {ok, {_, _, PacketReceived2}} = ResultOfReceive2, ok = gen_udp:close(ServerSocket), - {ok, {_, _, Data}} = ResultOfReceive, - ct:log("Packet: ~n~s~n", [Data]), - << "beam_stats.node_foo_host_bar.io.bytes_in:3|g\n" - , "beam_stats.node_foo_host_bar.io.bytes_out:7|g\n" - , "beam_stats.node_foo_host_bar.context_switches:5|g\n" - , "beam_stats.node_foo_host_bar.reductions:9|g\n" - , "beam_stats.node_foo_host_bar.run_queue:17|g\n" - , "beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g\n" - >> = Data. + ct:log("PacketReceived1: ~n~s~n", [PacketReceived1]), + ct:log("PacketReceived2: ~n~s~n", [PacketReceived2]), + PacketsCombined = <>, + ct:log("PacketsCombined: ~n~s~n", [PacketsCombined]), + MsgsExpected = + [ <<"beam_stats.node_foo_host_bar.io.bytes_in:3|g">> + , <<"beam_stats.node_foo_host_bar.io.bytes_out:7|g">> + , <<"beam_stats.node_foo_host_bar.context_switches:5|g">> + , <<"beam_stats.node_foo_host_bar.reductions:9|g">> + , <<"beam_stats.node_foo_host_bar.run_queue:17|g">> + , <<"beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g">> + , <<"beam_stats.node_foo_host_bar.memory.mem_type_bar:2|g">> + , <<"beam_stats.node_foo_host_bar.memory.mem_type_baz:3|g">> + , <<"beam_stats.node_foo_host_bar.ets_table.size.foo.foo:5|g">> + , <<"beam_stats.node_foo_host_bar.ets_table.memory.foo.foo:25|g">> + , <<"beam_stats.node_foo_host_bar.ets_table.size.bar.37:8|g">> + , <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:38|g">> + ], + MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]), + RemoveExpectedFromReceived = + fun (Expected, Received) -> + ct:log( + "Looking for expected msg ~p in remaining received ~p~n", + [Expected, Received] + ), + true = lists:member(Expected, Received), + Received -- [Expected] + end, + [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected).