From: Siraaj Khandkar Date: Sun, 27 Sep 2015 20:11:03 +0000 (-0400) Subject: Implement deltas server, replacing beam_stats_state. X-Git-Tag: 0.14.3^2~1 X-Git-Url: https://git.xandkar.net/?a=commitdiff_plain;h=b2c364fd163d74df4914b6d4d5da42dec28368af;p=beam_stats.git Implement deltas server, replacing beam_stats_state. To simplify collection of deltas. --- diff --git a/src/beam_stats.app.src b/src/beam_stats.app.src index b88f464..9c8d943 100644 --- a/src/beam_stats.app.src +++ b/src/beam_stats.app.src @@ -1,7 +1,7 @@ {application, beam_stats, [ {description, "Periodic VM stats production and consumption."}, - {vsn, "0.14.1"}, + {vsn, "0.14.2"}, {registered, []}, {applications, [ kernel diff --git a/src/beam_stats.erl b/src/beam_stats.erl index 1522568..af5167c 100644 --- a/src/beam_stats.erl +++ b/src/beam_stats.erl @@ -6,5 +6,31 @@ [ t/0 ]). +-export( + [ collect/1 + ]). + +-define(T, #?MODULE). + -type t() :: - #?MODULE{}. + ?T{}. + +-spec collect(beam_stats_delta:t()) -> + t(). +collect(DeltasServer) -> + {_, DeltaOfReductions} = beam_stats_source:erlang_statistics(reductions), + { {io_bytes_in , DeltaOfIOBytesIn} + , {io_bytes_out , DeltaOfIOBytesOut} + } = beam_stats_delta:of_io(DeltasServer), + ?T + { timestamp = beam_stats_source:os_timestamp() + , node_id = beam_stats_source:erlang_node() + , memory = beam_stats_source:erlang_memory() + , io_bytes_in = DeltaOfIOBytesIn + , io_bytes_out = DeltaOfIOBytesOut + , context_switches = beam_stats_delta:of_context_switches(DeltasServer) + , reductions = DeltaOfReductions + , run_queue = beam_stats_source:erlang_statistics(run_queue) + , ets = beam_stats_ets:collect() + , processes = beam_stats_processes:collect() + }. diff --git a/src/beam_stats_app.erl b/src/beam_stats_app.erl index c72415c..9eb063d 100644 --- a/src/beam_stats_app.erl +++ b/src/beam_stats_app.erl @@ -10,7 +10,8 @@ %% =================================================================== start(_StartType, _StartArgs) -> - beam_stats_sup:start_link(). + DeltasServer = beam_stats_delta:start(), + beam_stats_sup:start_link(DeltasServer). stop(_State) -> ok. diff --git a/src/beam_stats_delta.erl b/src/beam_stats_delta.erl new file mode 100644 index 0000000..d563507 --- /dev/null +++ b/src/beam_stats_delta.erl @@ -0,0 +1,85 @@ +-module(beam_stats_delta). + +-export_type( + [ t/0 + ]). + +-export( + [ start/0 + , stop/1 + , of_context_switches/1 + , of_io/1 + ]). + +-record(?MODULE, + { erlang_statistics :: ets:tid() + }). + +-define(T, #?MODULE). + +-opaque t() :: + ?T{}. + +-spec start() -> + t(). +start() -> + Options = + [ set + , public + ], + ?T + { erlang_statistics = ets:new(beam_stats_delta_erlang_statistics, Options) + }. + +-spec stop(t()) -> + {}. +stop(?T + { erlang_statistics = TidErlangStatistics + } +) -> + true = ets:delete(TidErlangStatistics), + {}. + +-spec of_context_switches(t()) -> + non_neg_integer(). +of_context_switches(?T{erlang_statistics=Table}) -> + Key = context_switches, + {Current, 0} = beam_stats_source:erlang_statistics(Key), + delta(Table, Key, Current). + +-spec of_io(t()) -> + { {io_bytes_in , non_neg_integer()} + , {io_bytes_out , non_neg_integer()} + }. +of_io(?T{erlang_statistics=Table}) -> + Key = io, + { {input , CurrentIn} + , {output , CurrentOut} + } = beam_stats_source:erlang_statistics(Key), + DeltaIn = delta(Table, io_bytes_in , CurrentIn), + DeltaOut = delta(Table, io_bytes_out, CurrentOut), + { {io_bytes_in , DeltaIn} + , {io_bytes_out , DeltaOut} + }. + +-spec delta(ets:tid(), atom(), non_neg_integer()) -> + non_neg_integer(). +delta(Table, Key, CurrentTotal) -> + PreviousTotalOpt = find(Table, Key), + PreviousTotal = hope_option:get(PreviousTotalOpt, 0), + save(Table, Key, CurrentTotal), + CurrentTotal - PreviousTotal. + +-spec find(ets:tid(), term()) -> + hope_option:t(term()). +find(Table, K) -> + case ets:lookup(Table, K) + of [] -> none + ; [{K, V}] -> {some, V} + end. + +-spec save(ets:tid(), term(), term()) -> + {}. +save(Table, K, V) -> + true = ets:insert(Table, {K, V}), + {}. diff --git a/src/beam_stats_producer.erl b/src/beam_stats_producer.erl index c7ebd5e..dd52692 100644 --- a/src/beam_stats_producer.erl +++ b/src/beam_stats_producer.erl @@ -4,7 +4,7 @@ %% API -export( - [ start_link/0 + [ start_link/1 , subscribe/1 , unsubscribe/1 @@ -31,7 +31,7 @@ -record(state, { consumers = ordsets:new() :: ordsets:ordset(pid()) - , stats_state :: beam_stats_state:t() + , deltas_server :: beam_stats_delta:t() }). -type state() :: @@ -41,8 +41,8 @@ %% API %% ============================================================================ -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(DeltasServer) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []). -spec subscribe(pid()) -> ok. @@ -73,11 +73,10 @@ terminate(_Reason, _State) -> %% gen_server callbacks %% ============================================================================ -init([]) -> +init(DeltasServer) -> ok = schedule_first_production(), Consumers = ordsets:new(), - StatsState = beam_stats_state:new(), - {ok, #state{consumers=Consumers, stats_state=StatsState}}. + {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}. handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) -> Consumers2 = ordsets:add_element(PID, Consumers1), @@ -115,16 +114,15 @@ produce_async(#state{}=State) -> produce( #state { consumers = ConsumersSet - , stats_state = StatsState1 + , deltas_server = DeltasServer }=State, MsgSendFun ) -> - StatsState2 = beam_stats_state:update(StatsState1), - Stats = beam_stats_state:export(StatsState2), + Stats = beam_stats:collect(DeltasServer), ConsumersList = ordsets:to_list(ConsumersSet), Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end, ok = lists:foreach(Send, ConsumersList), - State#state{stats_state = StatsState2}. + State. -spec schedule_first_production() -> ok. diff --git a/src/beam_stats_state.erl b/src/beam_stats_state.erl deleted file mode 100644 index d514f95..0000000 --- a/src/beam_stats_state.erl +++ /dev/null @@ -1,154 +0,0 @@ --module(beam_stats_state). - --include("include/beam_stats.hrl"). --include("include/beam_stats_process.hrl"). --include("include/beam_stats_processes.hrl"). - --export_type( - [ t/0 - ]). - --export( - [ new/0 - , update/1 - , export/1 - ]). - --record(snapshots, - { memory :: [{atom(), non_neg_integer()}] - , processes :: beam_stats_processes:t() - , run_queue :: non_neg_integer() - , ets :: beam_stats_ets:t() - }). - --type snapshots() :: - #snapshots{}. - --record(deltas, - { reductions :: non_neg_integer() - }). - --type deltas() :: - #deltas{}. - --record(totals, - { io_bytes_in :: non_neg_integer() - , io_bytes_out :: non_neg_integer() - , context_switches :: non_neg_integer() - }). - --type totals() :: - #totals{}. - --record(?MODULE, - { timestamp :: erlang:timestamp() - , node_id :: atom() - , snapshots :: snapshots() % Current state - , deltas :: deltas() % Accumulated since last check - , totals_previous :: totals() % Accumulated since VM start, as of last state - , totals_current :: totals() % Accumulated since VM start, as of this state - }). - --define(T, #?MODULE). - --opaque t() :: - ?T{}. - --spec new() -> - t(). -new() -> - TotalsPrevious = totals_empty(), - new(TotalsPrevious). - --spec new(TotalsPrevious :: totals()) -> - t(). -new(#totals{}=TotalsPrevious) -> - ?T - { timestamp = beam_stats_source:os_timestamp() - , node_id = beam_stats_source:erlang_node() - , snapshots = snapshots_new() - , deltas = deltas_new() - , totals_previous = TotalsPrevious - , totals_current = totals_new() - }. - --spec update(t()) -> - t(). -update(?T{totals_current=TotalsPrevious}) -> - new(TotalsPrevious). - --spec export(t()) -> - beam_stats:t(). -export( - ?T - { timestamp = Timestamp - , node_id = NodeID - , snapshots = - #snapshots - { memory = Memory - , processes = Processes - , run_queue = RunQueue - , ets = ETS - } - , deltas = - #deltas - { reductions = Reductions - } - , totals_previous = - #totals - { io_bytes_in = PreviousIOBytesIn - , io_bytes_out = PreviousIOBytesOut - , context_switches = PreviousContextSwitches - } - , totals_current = - #totals - { io_bytes_in = CurrentIOBytesIn - , io_bytes_out = CurrentIOBytesOut - , context_switches = CurrentContextSwitches - } - } -) -> - #beam_stats - { timestamp = Timestamp - , node_id = NodeID - , memory = Memory - , io_bytes_in = CurrentIOBytesIn - PreviousIOBytesIn - , io_bytes_out = CurrentIOBytesOut - PreviousIOBytesOut - , context_switches = CurrentContextSwitches - PreviousContextSwitches - , reductions = Reductions - , run_queue = RunQueue - , ets = ETS - , processes = Processes - }. - -snapshots_new() -> - #snapshots - { memory = beam_stats_source:erlang_memory() - , processes = beam_stats_processes:collect() - , run_queue = beam_stats_source:erlang_statistics(run_queue) - , ets = beam_stats_ets:collect() - }. - -deltas_new() -> - {_ReductionsTotal, ReductionsDelta} = beam_stats_source:erlang_statistics(reductions), - #deltas - { reductions = ReductionsDelta - }. - -totals_new() -> - { {input , IOBytesIn} - , {output , IOBytesOut} - } = beam_stats_source:erlang_statistics(io), - {ContextSwitches, 0} = beam_stats_source:erlang_statistics(context_switches), - #totals - { io_bytes_in = IOBytesIn - , io_bytes_out = IOBytesOut - , context_switches = ContextSwitches - }. - -totals_empty() -> - #totals - { io_bytes_in = 0 - , io_bytes_out = 0 - , context_switches = 0 - }. diff --git a/src/beam_stats_sup.erl b/src/beam_stats_sup.erl index 7cfec55..d582df4 100644 --- a/src/beam_stats_sup.erl +++ b/src/beam_stats_sup.erl @@ -3,30 +3,30 @@ -behaviour(supervisor). %% API --export([start_link/0]). +-export([start_link/1]). %% Supervisor callbacks -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(Type, Module), - {Module, {Module, start_link, []}, permanent, 5000, Type, [Module]}). +-define(CHILD(Type, Module, Args), + {Module, {Module, start_link, Args}, permanent, 5000, Type, [Module]}). %% =================================================================== %% API functions %% =================================================================== -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). +start_link(DeltasServer) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, DeltasServer). %% =================================================================== %% Supervisor callbacks %% =================================================================== -init([]) -> +init(DeltasServer) -> Children = - [ ?CHILD(worker , beam_stats_producer) - , ?CHILD(supervisor , beam_stats_sup_consumers) - ], + [ ?CHILD(worker , beam_stats_producer , [DeltasServer]) + , ?CHILD(supervisor , beam_stats_sup_consumers, []) + ], SupFlags = {one_for_one, 5, 10}, {ok, {SupFlags, Children}}. diff --git a/test/beam_stats_consumer_statsd_SUITE.erl b/test/beam_stats_consumer_statsd_SUITE.erl index b8b3d30..88f82e7 100644 --- a/test/beam_stats_consumer_statsd_SUITE.erl +++ b/test/beam_stats_consumer_statsd_SUITE.erl @@ -38,11 +38,7 @@ groups() -> t_full_cycle(_Cfg) -> meck:new(beam_stats_source), - BEAMStatsExpected = meck_expect_beam_stats(), - BEAMStatsComputed = beam_stats_state:export(beam_stats_state:new()), - ct:log("BEAMStatsExpected: ~p~n", [BEAMStatsExpected]), - ct:log("BEAMStatsComputed: ~p~n", [BEAMStatsComputed]), - BEAMStatsExpected = BEAMStatsComputed, + _BEAMStatsExpected = meck_expect_beam_stats(), {ok,[hope,beam_stats]} = application:ensure_all_started(beam_stats), ct:log("beam_stats started~n"),