From: Siraaj Khandkar Date: Mon, 24 Aug 2015 23:22:01 +0000 (-0400) Subject: Track IO deltas. X-Git-Tag: 0.2.0 X-Git-Url: https://git.xandkar.net/?a=commitdiff_plain;h=c9905036f81c30795a67adc9e3bb8e122bb7f44c;p=beam_stats.git Track IO deltas. --- diff --git a/include/beam_stats.hrl b/include/beam_stats.hrl index 8b851ed..db26c6d 100644 --- a/include/beam_stats.hrl +++ b/include/beam_stats.hrl @@ -2,6 +2,8 @@ { timestamp :: erlang:timestamp() , node_id :: atom() , memory :: [{atom(), non_neg_integer()}] + , io_bytes_in :: non_neg_integer() + , io_bytes_out :: non_neg_integer() %, statistics :: [{atom() , term()}] %, system :: [{atom() , term()}] %, process :: [{atom() , term()}] diff --git a/src/beam_stats.app.src b/src/beam_stats.app.src index b54b2c7..489c32c 100644 --- a/src/beam_stats.app.src +++ b/src/beam_stats.app.src @@ -1,7 +1,7 @@ {application, beam_stats, [ {description, "Periodic VM stats production and consumption."}, - {vsn, "0.1.1"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [ kernel diff --git a/src/beam_stats.erl b/src/beam_stats.erl index e388396..1522568 100644 --- a/src/beam_stats.erl +++ b/src/beam_stats.erl @@ -6,20 +6,5 @@ [ t/0 ]). --export( - [ collect/0 - ]). - --define(T, #?MODULE). - -type t() :: - ?T{}. - --spec collect() -> - t(). -collect() -> - ?T - { timestamp = os:timestamp() - , node_id = erlang:node() - , memory = erlang:memory() - }. + #?MODULE{}. diff --git a/src/beam_stats_consumer_statsd.erl b/src/beam_stats_consumer_statsd.erl index f5828bf..c23ead1 100644 --- a/src/beam_stats_consumer_statsd.erl +++ b/src/beam_stats_consumer_statsd.erl @@ -142,13 +142,37 @@ beam_stats_queue_to_binary(Q) -> beam_stats_to_bins(#beam_stats { node_id = NodeID , memory = Memory + , io_bytes_in = IOBytesIn + , io_bytes_out = IOBytesOut } ) -> NodeIDBin = node_id_to_bin(NodeID), - Msgs1 = memory_to_msgs(Memory), + Msgs1 = + [ io_bytes_in_to_msg(IOBytesIn) + , io_bytes_out_to_msg(IOBytesOut) + | memory_to_msgs(Memory) + ], Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1], [statsd_msg_to_bin(M) || M <- Msgs2]. +-spec io_bytes_in_to_msg(non_neg_integer()) -> + statsd_msg(). +io_bytes_in_to_msg(IOBytesIn) -> + #statsd_msg + { name = <<"io.bytes_in">> + , value = IOBytesIn + , type = gauge + }. + +-spec io_bytes_out_to_msg(non_neg_integer()) -> + statsd_msg(). +io_bytes_out_to_msg(IOBytesOut) -> + #statsd_msg + { name = <<"io.bytes_out">> + , value = IOBytesOut + , type = gauge + }. + -spec memory_to_msgs([{atom(), non_neg_integer()}]) -> [statsd_msg()]. memory_to_msgs(Memory) -> diff --git a/src/beam_stats_producer.erl b/src/beam_stats_producer.erl index cd0933c..67765d3 100644 --- a/src/beam_stats_producer.erl +++ b/src/beam_stats_producer.erl @@ -33,6 +33,7 @@ -record(state, { consumers = ordsets:new() :: ordsets:ordset(pid()) + , stats_state :: beam_stats_state:t() }). -type state() :: @@ -77,7 +78,8 @@ terminate(_Reason, _State) -> init([]) -> ok = schedule_first_production(), Consumers = ordsets:new(), - {ok, #state{consumers=Consumers}}. + StatsState = beam_stats_state:new(), + {ok, #state{consumers=Consumers, stats_state=StatsState}}. handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) -> Consumers2 = ordsets:add_element(PID, Consumers1), @@ -87,25 +89,27 @@ handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) -> Consumers2 = ordsets:del_element(PID, Consumers1), {noreply, State#state{consumers=Consumers2}}. -handle_call(?FORCE_PRODUCTION, _From, State) -> - {} = produce(State), - {reply, {}, State}. +handle_call(?FORCE_PRODUCTION, _From, State1) -> + State2 = produce(State1), + {reply, {}, State2}. -handle_info(?SIGNAL_PRODUCTION, #state{}=State) -> - {} = produce(State), +handle_info(?SIGNAL_PRODUCTION, #state{}=State1) -> + State2 = produce(State1), ok = schedule_next_production(), - {noreply, State}. + {noreply, State2}. %% ============================================================================ %% Private %% ============================================================================ -spec produce(state()) -> - {}. -produce(#state{consumers=ConsumersSet}) -> + state(). +produce(#state{consumers=ConsumersSet, stats_state=StatsState1}=State) -> + StatsState2 = beam_stats_state:update(StatsState1), + Stats = beam_stats_state:export(StatsState2), ConsumersList = ordsets:to_list(ConsumersSet), - ok = collect_and_push_to_consumers(ConsumersList), - {}. + ok = push_to_consumers(Stats, ConsumersList), + State#state{stats_state = StatsState2}. -spec schedule_first_production() -> ok. @@ -120,9 +124,8 @@ schedule_next_production() -> _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION), ok. --spec collect_and_push_to_consumers([pid()]) -> +-spec push_to_consumers(beam_stats:t(), [pid()]) -> ok. -collect_and_push_to_consumers(Consumers) -> - BEAMStats = beam_stats:collect(), - Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end, +push_to_consumers(Stats, Consumers) -> + Push = fun (Consumer) -> gen_server:cast(Consumer, Stats) end, lists:foreach(Push, Consumers). diff --git a/src/beam_stats_state.erl b/src/beam_stats_state.erl new file mode 100644 index 0000000..552b5b1 --- /dev/null +++ b/src/beam_stats_state.erl @@ -0,0 +1,85 @@ +-module(beam_stats_state). + +-include("include/beam_stats.hrl"). + +-export_type( + [ t/0 + ]). + +-export( + [ new/0 + , update/1 + , export/1 + ]). + +-record(?MODULE, + { timestamp :: erlang:timestamp() + , node_id :: atom() + , memory :: [{atom(), non_neg_integer()}] + , previous_io_bytes_in :: non_neg_integer() + , previous_io_bytes_out :: non_neg_integer() + , current_io_bytes_in :: non_neg_integer() + , current_io_bytes_out :: non_neg_integer() + }). + +-define(T, #?MODULE). + +-opaque t() :: + ?T{}. + +-spec new() -> + t(). +new() -> + { {input , CurrentIOBytesIn} + , {output , CurrentIOBytesOut} + } = erlang:statistics(io), + ?T + { timestamp = os:timestamp() + , node_id = erlang:node() + , memory = erlang:memory() + , previous_io_bytes_in = 0 + , previous_io_bytes_out = 0 + , current_io_bytes_in = CurrentIOBytesIn + , current_io_bytes_out = CurrentIOBytesOut + }. + +-spec update(t()) -> + t(). +update(?T + { previous_io_bytes_in = PreviousIOBytesIn + , previous_io_bytes_out = PreviousIOBytesOut + } +) -> + { {input , CurrentIOBytesIn} + , {output , CurrentIOBytesOut} + } = erlang:statistics(io), + ?T + { timestamp = os:timestamp() + , node_id = erlang:node() + , memory = erlang:memory() + , previous_io_bytes_in = PreviousIOBytesIn + , previous_io_bytes_out = PreviousIOBytesOut + , current_io_bytes_in = CurrentIOBytesIn + , current_io_bytes_out = CurrentIOBytesOut + }. + +-spec export(t()) -> + beam_stats:t(). +export( + ?T + { timestamp = Timestamp + , node_id = NodeID + , memory = Memory + , previous_io_bytes_in = PreviousIOBytesIn + , previous_io_bytes_out = PreviousIOBytesOut + , current_io_bytes_in = CurrentIOBytesIn + , current_io_bytes_out = CurrentIOBytesOut + } +) -> + #beam_stats + { timestamp = Timestamp + , node_id = NodeID + , memory = Memory + , io_bytes_in = CurrentIOBytesIn - PreviousIOBytesIn + , io_bytes_out = CurrentIOBytesOut - PreviousIOBytesOut + }. diff --git a/test/beam_stats_consumer_statsd_SUITE.erl b/test/beam_stats_consumer_statsd_SUITE.erl index 9c7e175..161eac1 100644 --- a/test/beam_stats_consumer_statsd_SUITE.erl +++ b/test/beam_stats_consumer_statsd_SUITE.erl @@ -37,6 +37,8 @@ t_send(_Cfg) -> { timestamp = {1, 2, 3} , node_id = 'node_foo@host_bar' , memory = [{mem_type_foo, 1}] + , io_bytes_in = 3 + , io_bytes_out = 7 }, ServerPort = 8125, {ok, ServerSocket} = gen_udp:open(ServerPort, [binary, {active, false}]), @@ -48,4 +50,7 @@ t_send(_Cfg) -> ResultOfReceive = gen_udp:recv(ServerSocket, 0), ok = gen_udp:close(ServerSocket), {ok, {_, _, Data}} = ResultOfReceive, - <<"beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g\n">> = Data. + << "beam_stats.node_foo_host_bar.io.bytes_in:3|g\n" + , "beam_stats.node_foo_host_bar.io.bytes_out:7|g\n" + , "beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g\n" + >> = Data.