To simplify collection of deltas.
{application, beam_stats,
[
{description, "Periodic VM stats production and consumption."},
- {vsn, "0.14.1"},
+ {vsn, "0.14.2"},
{registered, []},
{applications,
[ kernel
[ t/0
]).
+-export(
+ [ collect/1
+ ]).
+
+-define(T, #?MODULE).
+
-type t() ::
- #?MODULE{}.
+ ?T{}.
+
+-spec collect(beam_stats_delta:t()) ->
+ t().
+collect(DeltasServer) ->
+ {_, DeltaOfReductions} = beam_stats_source:erlang_statistics(reductions),
+ { {io_bytes_in , DeltaOfIOBytesIn}
+ , {io_bytes_out , DeltaOfIOBytesOut}
+ } = beam_stats_delta:of_io(DeltasServer),
+ ?T
+ { timestamp = beam_stats_source:os_timestamp()
+ , node_id = beam_stats_source:erlang_node()
+ , memory = beam_stats_source:erlang_memory()
+ , io_bytes_in = DeltaOfIOBytesIn
+ , io_bytes_out = DeltaOfIOBytesOut
+ , context_switches = beam_stats_delta:of_context_switches(DeltasServer)
+ , reductions = DeltaOfReductions
+ , run_queue = beam_stats_source:erlang_statistics(run_queue)
+ , ets = beam_stats_ets:collect()
+ , processes = beam_stats_processes:collect()
+ }.
%% ===================================================================
start(_StartType, _StartArgs) ->
- beam_stats_sup:start_link().
+ DeltasServer = beam_stats_delta:start(),
+ beam_stats_sup:start_link(DeltasServer).
stop(_State) ->
ok.
--- /dev/null
+-module(beam_stats_delta).
+
+-export_type(
+ [ t/0
+ ]).
+
+-export(
+ [ start/0
+ , stop/1
+ , of_context_switches/1
+ , of_io/1
+ ]).
+
+-record(?MODULE,
+ { erlang_statistics :: ets:tid()
+ }).
+
+-define(T, #?MODULE).
+
+-opaque t() ::
+ ?T{}.
+
+-spec start() ->
+ t().
+start() ->
+ Options =
+ [ set
+ , public
+ ],
+ ?T
+ { erlang_statistics = ets:new(beam_stats_delta_erlang_statistics, Options)
+ }.
+
+-spec stop(t()) ->
+ {}.
+stop(?T
+ { erlang_statistics = TidErlangStatistics
+ }
+) ->
+ true = ets:delete(TidErlangStatistics),
+ {}.
+
+-spec of_context_switches(t()) ->
+ non_neg_integer().
+of_context_switches(?T{erlang_statistics=Table}) ->
+ Key = context_switches,
+ {Current, 0} = beam_stats_source:erlang_statistics(Key),
+ delta(Table, Key, Current).
+
+-spec of_io(t()) ->
+ { {io_bytes_in , non_neg_integer()}
+ , {io_bytes_out , non_neg_integer()}
+ }.
+of_io(?T{erlang_statistics=Table}) ->
+ Key = io,
+ { {input , CurrentIn}
+ , {output , CurrentOut}
+ } = beam_stats_source:erlang_statistics(Key),
+ DeltaIn = delta(Table, io_bytes_in , CurrentIn),
+ DeltaOut = delta(Table, io_bytes_out, CurrentOut),
+ { {io_bytes_in , DeltaIn}
+ , {io_bytes_out , DeltaOut}
+ }.
+
+-spec delta(ets:tid(), atom(), non_neg_integer()) ->
+ non_neg_integer().
+delta(Table, Key, CurrentTotal) ->
+ PreviousTotalOpt = find(Table, Key),
+ PreviousTotal = hope_option:get(PreviousTotalOpt, 0),
+ save(Table, Key, CurrentTotal),
+ CurrentTotal - PreviousTotal.
+
+-spec find(ets:tid(), term()) ->
+ hope_option:t(term()).
+find(Table, K) ->
+ case ets:lookup(Table, K)
+ of [] -> none
+ ; [{K, V}] -> {some, V}
+ end.
+
+-spec save(ets:tid(), term(), term()) ->
+ {}.
+save(Table, K, V) ->
+ true = ets:insert(Table, {K, V}),
+ {}.
%% API
-export(
- [ start_link/0
+ [ start_link/1
, subscribe/1
, unsubscribe/1
-record(state,
{ consumers = ordsets:new() :: ordsets:ordset(pid())
- , stats_state :: beam_stats_state:t()
+ , deltas_server :: beam_stats_delta:t()
}).
-type state() ::
%% API
%% ============================================================================
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(DeltasServer) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []).
-spec subscribe(pid()) ->
ok.
%% gen_server callbacks
%% ============================================================================
-init([]) ->
+init(DeltasServer) ->
ok = schedule_first_production(),
Consumers = ordsets:new(),
- StatsState = beam_stats_state:new(),
- {ok, #state{consumers=Consumers, stats_state=StatsState}}.
+ {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}.
handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
Consumers2 = ordsets:add_element(PID, Consumers1),
produce(
#state
{ consumers = ConsumersSet
- , stats_state = StatsState1
+ , deltas_server = DeltasServer
}=State,
MsgSendFun
) ->
- StatsState2 = beam_stats_state:update(StatsState1),
- Stats = beam_stats_state:export(StatsState2),
+ Stats = beam_stats:collect(DeltasServer),
ConsumersList = ordsets:to_list(ConsumersSet),
Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
ok = lists:foreach(Send, ConsumersList),
- State#state{stats_state = StatsState2}.
+ State.
-spec schedule_first_production() ->
ok.
+++ /dev/null
--module(beam_stats_state).
-
--include("include/beam_stats.hrl").
--include("include/beam_stats_process.hrl").
--include("include/beam_stats_processes.hrl").
-
--export_type(
- [ t/0
- ]).
-
--export(
- [ new/0
- , update/1
- , export/1
- ]).
-
--record(snapshots,
- { memory :: [{atom(), non_neg_integer()}]
- , processes :: beam_stats_processes:t()
- , run_queue :: non_neg_integer()
- , ets :: beam_stats_ets:t()
- }).
-
--type snapshots() ::
- #snapshots{}.
-
--record(deltas,
- { reductions :: non_neg_integer()
- }).
-
--type deltas() ::
- #deltas{}.
-
--record(totals,
- { io_bytes_in :: non_neg_integer()
- , io_bytes_out :: non_neg_integer()
- , context_switches :: non_neg_integer()
- }).
-
--type totals() ::
- #totals{}.
-
--record(?MODULE,
- { timestamp :: erlang:timestamp()
- , node_id :: atom()
- , snapshots :: snapshots() % Current state
- , deltas :: deltas() % Accumulated since last check
- , totals_previous :: totals() % Accumulated since VM start, as of last state
- , totals_current :: totals() % Accumulated since VM start, as of this state
- }).
-
--define(T, #?MODULE).
-
--opaque t() ::
- ?T{}.
-
--spec new() ->
- t().
-new() ->
- TotalsPrevious = totals_empty(),
- new(TotalsPrevious).
-
--spec new(TotalsPrevious :: totals()) ->
- t().
-new(#totals{}=TotalsPrevious) ->
- ?T
- { timestamp = beam_stats_source:os_timestamp()
- , node_id = beam_stats_source:erlang_node()
- , snapshots = snapshots_new()
- , deltas = deltas_new()
- , totals_previous = TotalsPrevious
- , totals_current = totals_new()
- }.
-
--spec update(t()) ->
- t().
-update(?T{totals_current=TotalsPrevious}) ->
- new(TotalsPrevious).
-
--spec export(t()) ->
- beam_stats:t().
-export(
- ?T
- { timestamp = Timestamp
- , node_id = NodeID
- , snapshots =
- #snapshots
- { memory = Memory
- , processes = Processes
- , run_queue = RunQueue
- , ets = ETS
- }
- , deltas =
- #deltas
- { reductions = Reductions
- }
- , totals_previous =
- #totals
- { io_bytes_in = PreviousIOBytesIn
- , io_bytes_out = PreviousIOBytesOut
- , context_switches = PreviousContextSwitches
- }
- , totals_current =
- #totals
- { io_bytes_in = CurrentIOBytesIn
- , io_bytes_out = CurrentIOBytesOut
- , context_switches = CurrentContextSwitches
- }
- }
-) ->
- #beam_stats
- { timestamp = Timestamp
- , node_id = NodeID
- , memory = Memory
- , io_bytes_in = CurrentIOBytesIn - PreviousIOBytesIn
- , io_bytes_out = CurrentIOBytesOut - PreviousIOBytesOut
- , context_switches = CurrentContextSwitches - PreviousContextSwitches
- , reductions = Reductions
- , run_queue = RunQueue
- , ets = ETS
- , processes = Processes
- }.
-
-snapshots_new() ->
- #snapshots
- { memory = beam_stats_source:erlang_memory()
- , processes = beam_stats_processes:collect()
- , run_queue = beam_stats_source:erlang_statistics(run_queue)
- , ets = beam_stats_ets:collect()
- }.
-
-deltas_new() ->
- {_ReductionsTotal, ReductionsDelta} = beam_stats_source:erlang_statistics(reductions),
- #deltas
- { reductions = ReductionsDelta
- }.
-
-totals_new() ->
- { {input , IOBytesIn}
- , {output , IOBytesOut}
- } = beam_stats_source:erlang_statistics(io),
- {ContextSwitches, 0} = beam_stats_source:erlang_statistics(context_switches),
- #totals
- { io_bytes_in = IOBytesIn
- , io_bytes_out = IOBytesOut
- , context_switches = ContextSwitches
- }.
-
-totals_empty() ->
- #totals
- { io_bytes_in = 0
- , io_bytes_out = 0
- , context_switches = 0
- }.
-behaviour(supervisor).
%% API
--export([start_link/0]).
+-export([start_link/1]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
--define(CHILD(Type, Module),
- {Module, {Module, start_link, []}, permanent, 5000, Type, [Module]}).
+-define(CHILD(Type, Module, Args),
+ {Module, {Module, start_link, Args}, permanent, 5000, Type, [Module]}).
%% ===================================================================
%% API functions
%% ===================================================================
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+start_link(DeltasServer) ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, DeltasServer).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
-init([]) ->
+init(DeltasServer) ->
Children =
- [ ?CHILD(worker , beam_stats_producer)
- , ?CHILD(supervisor , beam_stats_sup_consumers)
- ],
+ [ ?CHILD(worker , beam_stats_producer , [DeltasServer])
+ , ?CHILD(supervisor , beam_stats_sup_consumers, [])
+ ],
SupFlags = {one_for_one, 5, 10},
{ok, {SupFlags, Children}}.
t_full_cycle(_Cfg) ->
meck:new(beam_stats_source),
- BEAMStatsExpected = meck_expect_beam_stats(),
- BEAMStatsComputed = beam_stats_state:export(beam_stats_state:new()),
- ct:log("BEAMStatsExpected: ~p~n", [BEAMStatsExpected]),
- ct:log("BEAMStatsComputed: ~p~n", [BEAMStatsComputed]),
- BEAMStatsExpected = BEAMStatsComputed,
+ _BEAMStatsExpected = meck_expect_beam_stats(),
{ok,[hope,beam_stats]} = application:ensure_all_started(beam_stats),
ct:log("beam_stats started~n"),