{ timestamp :: erlang:timestamp()
, node_id :: atom()
, memory :: [{atom(), non_neg_integer()}]
+ , io_bytes_in :: non_neg_integer()
+ , io_bytes_out :: non_neg_integer()
%, statistics :: [{atom() , term()}]
%, system :: [{atom() , term()}]
%, process :: [{atom() , term()}]
{application, beam_stats,
[
{description, "Periodic VM stats production and consumption."},
- {vsn, "0.1.1"},
+ {vsn, "0.2.0"},
{registered, []},
{applications,
[ kernel
[ t/0
]).
--export(
- [ collect/0
- ]).
-
--define(T, #?MODULE).
-
-type t() ::
- ?T{}.
-
--spec collect() ->
- t().
-collect() ->
- ?T
- { timestamp = os:timestamp()
- , node_id = erlang:node()
- , memory = erlang:memory()
- }.
+ #?MODULE{}.
beam_stats_to_bins(#beam_stats
{ node_id = NodeID
, memory = Memory
+ , io_bytes_in = IOBytesIn
+ , io_bytes_out = IOBytesOut
}
) ->
NodeIDBin = node_id_to_bin(NodeID),
- Msgs1 = memory_to_msgs(Memory),
+ Msgs1 =
+ [ io_bytes_in_to_msg(IOBytesIn)
+ , io_bytes_out_to_msg(IOBytesOut)
+ | memory_to_msgs(Memory)
+ ],
Msgs2 = [statsd_msg_add_name_prefix(M, NodeIDBin) || M <- Msgs1],
[statsd_msg_to_bin(M) || M <- Msgs2].
+-spec io_bytes_in_to_msg(non_neg_integer()) ->
+ statsd_msg().
+io_bytes_in_to_msg(IOBytesIn) ->
+ #statsd_msg
+ { name = <<"io.bytes_in">>
+ , value = IOBytesIn
+ , type = gauge
+ }.
+
+-spec io_bytes_out_to_msg(non_neg_integer()) ->
+ statsd_msg().
+io_bytes_out_to_msg(IOBytesOut) ->
+ #statsd_msg
+ { name = <<"io.bytes_out">>
+ , value = IOBytesOut
+ , type = gauge
+ }.
+
-spec memory_to_msgs([{atom(), non_neg_integer()}]) ->
[statsd_msg()].
memory_to_msgs(Memory) ->
-record(state,
{ consumers = ordsets:new() :: ordsets:ordset(pid())
+ , stats_state :: beam_stats_state:t()
}).
-type state() ::
init([]) ->
ok = schedule_first_production(),
Consumers = ordsets:new(),
- {ok, #state{consumers=Consumers}}.
+ StatsState = beam_stats_state:new(),
+ {ok, #state{consumers=Consumers, stats_state=StatsState}}.
handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
Consumers2 = ordsets:add_element(PID, Consumers1),
Consumers2 = ordsets:del_element(PID, Consumers1),
{noreply, State#state{consumers=Consumers2}}.
-handle_call(?FORCE_PRODUCTION, _From, State) ->
- {} = produce(State),
- {reply, {}, State}.
+handle_call(?FORCE_PRODUCTION, _From, State1) ->
+ State2 = produce(State1),
+ {reply, {}, State2}.
-handle_info(?SIGNAL_PRODUCTION, #state{}=State) ->
- {} = produce(State),
+handle_info(?SIGNAL_PRODUCTION, #state{}=State1) ->
+ State2 = produce(State1),
ok = schedule_next_production(),
- {noreply, State}.
+ {noreply, State2}.
%% ============================================================================
%% Private
%% ============================================================================
-spec produce(state()) ->
- {}.
-produce(#state{consumers=ConsumersSet}) ->
+ state().
+produce(#state{consumers=ConsumersSet, stats_state=StatsState1}=State) ->
+ StatsState2 = beam_stats_state:update(StatsState1),
+ Stats = beam_stats_state:export(StatsState2),
ConsumersList = ordsets:to_list(ConsumersSet),
- ok = collect_and_push_to_consumers(ConsumersList),
- {}.
+ ok = push_to_consumers(Stats, ConsumersList),
+ State#state{stats_state = StatsState2}.
-spec schedule_first_production() ->
ok.
_ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION),
ok.
--spec collect_and_push_to_consumers([pid()]) ->
+-spec push_to_consumers(beam_stats:t(), [pid()]) ->
ok.
-collect_and_push_to_consumers(Consumers) ->
- BEAMStats = beam_stats:collect(),
- Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end,
+push_to_consumers(Stats, Consumers) ->
+ Push = fun (Consumer) -> gen_server:cast(Consumer, Stats) end,
lists:foreach(Push, Consumers).
--- /dev/null
+-module(beam_stats_state).
+
+-include("include/beam_stats.hrl").
+
+-export_type(
+ [ t/0
+ ]).
+
+-export(
+ [ new/0
+ , update/1
+ , export/1
+ ]).
+
+-record(?MODULE,
+ { timestamp :: erlang:timestamp()
+ , node_id :: atom()
+ , memory :: [{atom(), non_neg_integer()}]
+ , previous_io_bytes_in :: non_neg_integer()
+ , previous_io_bytes_out :: non_neg_integer()
+ , current_io_bytes_in :: non_neg_integer()
+ , current_io_bytes_out :: non_neg_integer()
+ }).
+
+-define(T, #?MODULE).
+
+-opaque t() ::
+ ?T{}.
+
+-spec new() ->
+ t().
+new() ->
+ { {input , CurrentIOBytesIn}
+ , {output , CurrentIOBytesOut}
+ } = erlang:statistics(io),
+ ?T
+ { timestamp = os:timestamp()
+ , node_id = erlang:node()
+ , memory = erlang:memory()
+ , previous_io_bytes_in = 0
+ , previous_io_bytes_out = 0
+ , current_io_bytes_in = CurrentIOBytesIn
+ , current_io_bytes_out = CurrentIOBytesOut
+ }.
+
+-spec update(t()) ->
+ t().
+update(?T
+ { previous_io_bytes_in = PreviousIOBytesIn
+ , previous_io_bytes_out = PreviousIOBytesOut
+ }
+) ->
+ { {input , CurrentIOBytesIn}
+ , {output , CurrentIOBytesOut}
+ } = erlang:statistics(io),
+ ?T
+ { timestamp = os:timestamp()
+ , node_id = erlang:node()
+ , memory = erlang:memory()
+ , previous_io_bytes_in = PreviousIOBytesIn
+ , previous_io_bytes_out = PreviousIOBytesOut
+ , current_io_bytes_in = CurrentIOBytesIn
+ , current_io_bytes_out = CurrentIOBytesOut
+ }.
+
+-spec export(t()) ->
+ beam_stats:t().
+export(
+ ?T
+ { timestamp = Timestamp
+ , node_id = NodeID
+ , memory = Memory
+ , previous_io_bytes_in = PreviousIOBytesIn
+ , previous_io_bytes_out = PreviousIOBytesOut
+ , current_io_bytes_in = CurrentIOBytesIn
+ , current_io_bytes_out = CurrentIOBytesOut
+ }
+) ->
+ #beam_stats
+ { timestamp = Timestamp
+ , node_id = NodeID
+ , memory = Memory
+ , io_bytes_in = CurrentIOBytesIn - PreviousIOBytesIn
+ , io_bytes_out = CurrentIOBytesOut - PreviousIOBytesOut
+ }.
{ timestamp = {1, 2, 3}
, node_id = 'node_foo@host_bar'
, memory = [{mem_type_foo, 1}]
+ , io_bytes_in = 3
+ , io_bytes_out = 7
},
ServerPort = 8125,
{ok, ServerSocket} = gen_udp:open(ServerPort, [binary, {active, false}]),
ResultOfReceive = gen_udp:recv(ServerSocket, 0),
ok = gen_udp:close(ServerSocket),
{ok, {_, _, Data}} = ResultOfReceive,
- <<"beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g\n">> = Data.
+ << "beam_stats.node_foo_host_bar.io.bytes_in:3|g\n"
+ , "beam_stats.node_foo_host_bar.io.bytes_out:7|g\n"
+ , "beam_stats.node_foo_host_bar.memory.mem_type_foo:1|g\n"
+ >> = Data.