Track IO deltas. 0.2.0
authorSiraaj Khandkar <siraaj@khandkar.net>
Mon, 24 Aug 2015 23:22:01 +0000 (19:22 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Mon, 24 Aug 2015 23:24:11 +0000 (19:24 -0400)
include/beam_stats.hrl
src/beam_stats.app.src
src/beam_stats.erl
src/beam_stats_consumer_statsd.erl
src/beam_stats_producer.erl
src/beam_stats_state.erl [new file with mode: 0644]
test/beam_stats_consumer_statsd_SUITE.erl

index 8b851ed..db26c6d 100644 (file)
@@ -2,6 +2,8 @@
     { timestamp    :: erlang:timestamp()
     , node_id      :: atom()
     , memory       :: [{atom(), non_neg_integer()}]
+    , io_bytes_in  :: non_neg_integer()
+    , io_bytes_out :: non_neg_integer()
     %, statistics   :: [{atom()       , term()}]
     %, system       :: [{atom()       , term()}]
     %, process      :: [{atom()       , term()}]
index b54b2c7..489c32c 100644 (file)
@@ -1,7 +1,7 @@
 {application, beam_stats,
  [
   {description, "Periodic VM stats production and consumption."},
-  {vsn, "0.1.1"},
+  {vsn, "0.2.0"},
   {registered, []},
   {applications,
     [ kernel
index e388396..1522568 100644 (file)
@@ -6,20 +6,5 @@
     [ t/0
     ]).
 
--export(
-    [ collect/0
-    ]).
-
--define(T, #?MODULE).
-
 -type t() ::
-    ?T{}.
-
--spec collect() ->
-    t().
-collect() ->
-    ?T
-    { timestamp = os:timestamp()
-    , node_id   = erlang:node()
-    , memory    = erlang:memory()
-    }.
+    #?MODULE{}.
index f5828bf..c23ead1 100644 (file)
@@ -142,13 +142,37 @@ beam_stats_queue_to_binary(Q) ->
 beam_stats_to_bins(#beam_stats
     { node_id = NodeID
     , memory  = Memory
+    , io_bytes_in  = IOBytesIn
+    , io_bytes_out = IOBytesOut
     }
 ) ->
     NodeIDBin = node_id_to_bin(NodeID),
-    Msgs1 = memory_to_msgs(Memory),
+    Msgs1 =
+        [ io_bytes_in_to_msg(IOBytesIn)
+        , io_bytes_out_to_msg(IOBytesOut)
+        | memory_to_msgs(Memory)
+        ],
     Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1],
     [statsd_msg_to_bin(M) || M <- Msgs2].
 
+-spec io_bytes_in_to_msg(non_neg_integer()) ->
+    statsd_msg().
+io_bytes_in_to_msg(IOBytesIn) ->
+    #statsd_msg
+    { name  = <<"io.bytes_in">>
+    , value = IOBytesIn
+    , type  = gauge
+    }.
+
+-spec io_bytes_out_to_msg(non_neg_integer()) ->
+    statsd_msg().
+io_bytes_out_to_msg(IOBytesOut) ->
+    #statsd_msg
+    { name  = <<"io.bytes_out">>
+    , value = IOBytesOut
+    , type  = gauge
+    }.
+
 -spec memory_to_msgs([{atom(), non_neg_integer()}]) ->
     [statsd_msg()].
 memory_to_msgs(Memory) ->
index cd0933c..67765d3 100644 (file)
@@ -33,6 +33,7 @@
 
 -record(state,
     { consumers = ordsets:new() :: ordsets:ordset(pid())
+    , stats_state :: beam_stats_state:t()
     }).
 
 -type state() ::
@@ -77,7 +78,8 @@ terminate(_Reason, _State) ->
 init([]) ->
     ok = schedule_first_production(),
     Consumers = ordsets:new(),
-    {ok, #state{consumers=Consumers}}.
+    StatsState = beam_stats_state:new(),
+    {ok, #state{consumers=Consumers, stats_state=StatsState}}.
 
 handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
     Consumers2 = ordsets:add_element(PID, Consumers1),
@@ -87,25 +89,27 @@ handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) ->
     Consumers2 = ordsets:del_element(PID, Consumers1),
     {noreply, State#state{consumers=Consumers2}}.
 
-handle_call(?FORCE_PRODUCTION, _From, State) ->
-    {} = produce(State),
-    {reply, {}, State}.
+handle_call(?FORCE_PRODUCTION, _From, State1) ->
+    State2 = produce(State1),
+    {reply, {}, State2}.
 
-handle_info(?SIGNAL_PRODUCTION, #state{}=State) ->
-    {} = produce(State),
+handle_info(?SIGNAL_PRODUCTION, #state{}=State1) ->
+    State2 = produce(State1),
     ok = schedule_next_production(),
-    {noreply, State}.
+    {noreply, State2}.
 
 %% ============================================================================
 %%  Private
 %% ============================================================================
 
 -spec produce(state()) ->
-    {}.
-produce(#state{consumers=ConsumersSet}) ->
+    state().
+produce(#state{consumers=ConsumersSet, stats_state=StatsState1}=State) ->
+    StatsState2 = beam_stats_state:update(StatsState1),
+    Stats       = beam_stats_state:export(StatsState2),
     ConsumersList = ordsets:to_list(ConsumersSet),
-    ok = collect_and_push_to_consumers(ConsumersList),
-    {}.
+    ok = push_to_consumers(Stats, ConsumersList),
+    State#state{stats_state = StatsState2}.
 
 -spec schedule_first_production() ->
     ok.
@@ -120,9 +124,8 @@ schedule_next_production() ->
     _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION),
     ok.
 
--spec collect_and_push_to_consumers([pid()]) ->
+-spec push_to_consumers(beam_stats:t(), [pid()]) ->
     ok.
-collect_and_push_to_consumers(Consumers) ->
-    BEAMStats = beam_stats:collect(),
-    Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end,
+push_to_consumers(Stats, Consumers) ->
+    Push = fun (Consumer) -> gen_server:cast(Consumer, Stats) end,
     lists:foreach(Push, Consumers).
diff --git a/src/beam_stats_state.erl b/src/beam_stats_state.erl
new file mode 100644 (file)
index 0000000..552b5b1
--- /dev/null
@@ -0,0 +1,85 @@
+-module(beam_stats_state).
+
+-include("include/beam_stats.hrl").
+
+-export_type(
+    [ t/0
+    ]).
+
+-export(
+    [ new/0
+    , update/1
+    , export/1
+    ]).
+
+-record(?MODULE,
+    { timestamp             :: erlang:timestamp()
+    , node_id               :: atom()
+    , memory                :: [{atom(), non_neg_integer()}]
+    , previous_io_bytes_in  :: non_neg_integer()
+    , previous_io_bytes_out :: non_neg_integer()
+    , current_io_bytes_in   :: non_neg_integer()
+    , current_io_bytes_out  :: non_neg_integer()
+    }).
+
+-define(T, #?MODULE).
+
+-opaque t() ::
+    ?T{}.
+
+-spec new() ->
+    t().
+new() ->
+    { {input  , CurrentIOBytesIn}
+    , {output , CurrentIOBytesOut}
+    } = erlang:statistics(io),
+    ?T
+    { timestamp             = os:timestamp()
+    , node_id               = erlang:node()
+    , memory                = erlang:memory()
+    , previous_io_bytes_in  = 0
+    , previous_io_bytes_out = 0
+    , current_io_bytes_in   = CurrentIOBytesIn
+    , current_io_bytes_out  = CurrentIOBytesOut
+    }.
+
+-spec update(t()) ->
+    t().
+update(?T
+    { previous_io_bytes_in  = PreviousIOBytesIn
+    , previous_io_bytes_out = PreviousIOBytesOut
+    }
+) ->
+    { {input  , CurrentIOBytesIn}
+    , {output , CurrentIOBytesOut}
+    } = erlang:statistics(io),
+    ?T
+    { timestamp             = os:timestamp()
+    , node_id               = erlang:node()
+    , memory                = erlang:memory()
+    , previous_io_bytes_in  = PreviousIOBytesIn
+    , previous_io_bytes_out = PreviousIOBytesOut
+    , current_io_bytes_in   = CurrentIOBytesIn
+    , current_io_bytes_out  = CurrentIOBytesOut
+    }.
+
+-spec export(t()) ->
+    beam_stats:t().
+export(
+    ?T
+    { timestamp             = Timestamp
+    , node_id               = NodeID
+    , memory                = Memory
+    , previous_io_bytes_in  = PreviousIOBytesIn
+    , previous_io_bytes_out = PreviousIOBytesOut
+    , current_io_bytes_in   = CurrentIOBytesIn
+    , current_io_bytes_out  = CurrentIOBytesOut
+    }
+) ->
+    #beam_stats
+    { timestamp    = Timestamp
+    , node_id      = NodeID
+    , memory       = Memory
+    , io_bytes_in  = CurrentIOBytesIn  - PreviousIOBytesIn
+    , io_bytes_out = CurrentIOBytesOut - PreviousIOBytesOut
+    }.
index 9c7e175..161eac1 100644 (file)
@@ -37,6 +37,8 @@ t_send(_Cfg) ->
     { timestamp = {1, 2, 3}
     , node_id   = 'node_foo@host_bar'
     , memory    = [{mem_type_foo, 1}]
+    , io_bytes_in  = 3
+    , io_bytes_out = 7
     },
     ServerPort = 8125,
     {ok, ServerSocket} = gen_udp:open(ServerPort, [binary, {active, false}]),
@@ -48,4 +50,7 @@ t_send(_Cfg) ->
     ResultOfReceive = gen_udp:recv(ServerSocket, 0),
     ok = gen_udp:close(ServerSocket),
     {ok, {_, _, Data}} = ResultOfReceive,
-    <<"beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g\n">> = Data.
+    << "beam_stats.node_foo_host_bar.io.bytes_in:3|g\n"
+     , "beam_stats.node_foo_host_bar.io.bytes_out:7|g\n"
+     , "beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g\n"
+    >> = Data.
This page took 0.047841 seconds and 4 git commands to generate.