Implement deltas server, replacing beam_stats_state.
authorSiraaj Khandkar <siraaj@khandkar.net>
Sun, 27 Sep 2015 20:11:03 +0000 (16:11 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Sun, 27 Sep 2015 20:36:27 +0000 (16:36 -0400)
To simplify collection of deltas.

src/beam_stats.app.src
src/beam_stats.erl
src/beam_stats_app.erl
src/beam_stats_delta.erl [new file with mode: 0644]
src/beam_stats_producer.erl
src/beam_stats_state.erl [deleted file]
src/beam_stats_sup.erl
test/beam_stats_consumer_statsd_SUITE.erl

index b88f464..9c8d943 100644 (file)
@@ -1,7 +1,7 @@
 {application, beam_stats,
  [
   {description, "Periodic VM stats production and consumption."},
-  {vsn, "0.14.1"},
+  {vsn, "0.14.2"},
   {registered, []},
   {applications,
     [ kernel
index 1522568..af5167c 100644 (file)
@@ -6,5 +6,31 @@
     [ t/0
     ]).
 
+-export(
+    [ collect/1
+    ]).
+
+-define(T, #?MODULE).
+
 -type t() ::
-    #?MODULE{}.
+    ?T{}.
+
+-spec collect(beam_stats_delta:t()) ->
+    t().
+collect(DeltasServer) ->
+    {_, DeltaOfReductions} = beam_stats_source:erlang_statistics(reductions),
+    { {io_bytes_in  , DeltaOfIOBytesIn}
+    , {io_bytes_out , DeltaOfIOBytesOut}
+    } = beam_stats_delta:of_io(DeltasServer),
+    ?T
+    { timestamp        = beam_stats_source:os_timestamp()
+    , node_id          = beam_stats_source:erlang_node()
+    , memory           = beam_stats_source:erlang_memory()
+    , io_bytes_in      = DeltaOfIOBytesIn
+    , io_bytes_out     = DeltaOfIOBytesOut
+    , context_switches = beam_stats_delta:of_context_switches(DeltasServer)
+    , reductions       = DeltaOfReductions
+    , run_queue        = beam_stats_source:erlang_statistics(run_queue)
+    , ets              = beam_stats_ets:collect()
+    , processes        = beam_stats_processes:collect()
+    }.
index c72415c..9eb063d 100644 (file)
@@ -10,7 +10,8 @@
 %% ===================================================================
 
 start(_StartType, _StartArgs) ->
-    beam_stats_sup:start_link().
+    DeltasServer = beam_stats_delta:start(),
+    beam_stats_sup:start_link(DeltasServer).
 
 stop(_State) ->
     ok.
diff --git a/src/beam_stats_delta.erl b/src/beam_stats_delta.erl
new file mode 100644 (file)
index 0000000..d563507
--- /dev/null
@@ -0,0 +1,85 @@
+-module(beam_stats_delta).
+
+-export_type(
+    [ t/0
+    ]).
+
+-export(
+    [ start/0
+    , stop/1
+    , of_context_switches/1
+    , of_io/1
+    ]).
+
+-record(?MODULE,
+    { erlang_statistics :: ets:tid()
+    }).
+
+-define(T, #?MODULE).
+
+-opaque t()  ::
+    ?T{}.
+
+-spec start() ->
+    t().
+start() ->
+    Options =
+        [ set
+        , public
+        ],
+    ?T
+    { erlang_statistics = ets:new(beam_stats_delta_erlang_statistics, Options)
+    }.
+
+-spec stop(t()) ->
+    {}.
+stop(?T
+    { erlang_statistics = TidErlangStatistics
+    }
+) ->
+    true = ets:delete(TidErlangStatistics),
+    {}.
+
+-spec of_context_switches(t()) ->
+    non_neg_integer().
+of_context_switches(?T{erlang_statistics=Table}) ->
+    Key = context_switches,
+    {Current, 0} = beam_stats_source:erlang_statistics(Key),
+    delta(Table, Key, Current).
+
+-spec of_io(t()) ->
+    { {io_bytes_in  , non_neg_integer()}
+    , {io_bytes_out , non_neg_integer()}
+    }.
+of_io(?T{erlang_statistics=Table}) ->
+    Key = io,
+    { {input  , CurrentIn}
+    , {output , CurrentOut}
+    } = beam_stats_source:erlang_statistics(Key),
+    DeltaIn  = delta(Table, io_bytes_in , CurrentIn),
+    DeltaOut = delta(Table, io_bytes_out, CurrentOut),
+    { {io_bytes_in  , DeltaIn}
+    , {io_bytes_out , DeltaOut}
+    }.
+
+-spec delta(ets:tid(), atom(), non_neg_integer()) ->
+    non_neg_integer().
+delta(Table, Key, CurrentTotal) ->
+    PreviousTotalOpt = find(Table, Key),
+    PreviousTotal = hope_option:get(PreviousTotalOpt, 0),
+    save(Table, Key, CurrentTotal),
+    CurrentTotal - PreviousTotal.
+
+-spec find(ets:tid(), term()) ->
+    hope_option:t(term()).
+find(Table, K) ->
+    case ets:lookup(Table, K)
+    of  []       -> none
+    ;   [{K, V}] -> {some, V}
+    end.
+
+-spec save(ets:tid(), term(), term()) ->
+    {}.
+save(Table, K, V) ->
+    true = ets:insert(Table, {K, V}),
+    {}.
index c7ebd5e..dd52692 100644 (file)
@@ -4,7 +4,7 @@
 
 %% API
 -export(
-    [ start_link/0
+    [ start_link/1
     ,   subscribe/1
     , unsubscribe/1
 
@@ -31,7 +31,7 @@
 
 -record(state,
     { consumers = ordsets:new() :: ordsets:ordset(pid())
-    , stats_state :: beam_stats_state:t()
+    , deltas_server :: beam_stats_delta:t()
     }).
 
 -type state() ::
@@ -41,8 +41,8 @@
 %%  API
 %% ============================================================================
 
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(DeltasServer) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []).
 
 -spec subscribe(pid()) ->
     ok.
@@ -73,11 +73,10 @@ terminate(_Reason, _State) ->
 %%  gen_server callbacks
 %% ============================================================================
 
-init([]) ->
+init(DeltasServer) ->
     ok = schedule_first_production(),
     Consumers = ordsets:new(),
-    StatsState = beam_stats_state:new(),
-    {ok, #state{consumers=Consumers, stats_state=StatsState}}.
+    {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}.
 
 handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
     Consumers2 = ordsets:add_element(PID, Consumers1),
@@ -115,16 +114,15 @@ produce_async(#state{}=State) ->
 produce(
     #state
     { consumers   = ConsumersSet
-    , stats_state = StatsState1
+    , deltas_server = DeltasServer
     }=State,
     MsgSendFun
 ) ->
-    StatsState2 = beam_stats_state:update(StatsState1),
-    Stats       = beam_stats_state:export(StatsState2),
+    Stats = beam_stats:collect(DeltasServer),
     ConsumersList = ordsets:to_list(ConsumersSet),
     Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
     ok = lists:foreach(Send, ConsumersList),
-    State#state{stats_state = StatsState2}.
+    State.
 
 -spec schedule_first_production() ->
     ok.
diff --git a/src/beam_stats_state.erl b/src/beam_stats_state.erl
deleted file mode 100644 (file)
index d514f95..0000000
+++ /dev/null
@@ -1,154 +0,0 @@
--module(beam_stats_state).
-
--include("include/beam_stats.hrl").
--include("include/beam_stats_process.hrl").
--include("include/beam_stats_processes.hrl").
-
--export_type(
-    [ t/0
-    ]).
-
--export(
-    [ new/0
-    , update/1
-    , export/1
-    ]).
-
--record(snapshots,
-    { memory     :: [{atom(), non_neg_integer()}]
-    , processes  :: beam_stats_processes:t()
-    , run_queue  :: non_neg_integer()
-    , ets        :: beam_stats_ets:t()
-    }).
-
--type snapshots() ::
-    #snapshots{}.
-
--record(deltas,
-    { reductions :: non_neg_integer()
-    }).
-
--type deltas() ::
-    #deltas{}.
-
--record(totals,
-    { io_bytes_in      :: non_neg_integer()
-    , io_bytes_out     :: non_neg_integer()
-    , context_switches :: non_neg_integer()
-    }).
-
--type totals() ::
-    #totals{}.
-
--record(?MODULE,
-    { timestamp       :: erlang:timestamp()
-    , node_id         :: atom()
-    , snapshots       :: snapshots()  % Current state
-    , deltas          :: deltas()     % Accumulated since last check
-    , totals_previous :: totals()     % Accumulated since VM start, as of last state
-    , totals_current  :: totals()     % Accumulated since VM start, as of this state
-    }).
-
--define(T, #?MODULE).
-
--opaque t() ::
-    ?T{}.
-
--spec new() ->
-    t().
-new() ->
-    TotalsPrevious = totals_empty(),
-    new(TotalsPrevious).
-
--spec new(TotalsPrevious :: totals()) ->
-    t().
-new(#totals{}=TotalsPrevious) ->
-    ?T
-    { timestamp       = beam_stats_source:os_timestamp()
-    , node_id         = beam_stats_source:erlang_node()
-    , snapshots       = snapshots_new()
-    , deltas          = deltas_new()
-    , totals_previous = TotalsPrevious
-    , totals_current  = totals_new()
-    }.
-
--spec update(t()) ->
-    t().
-update(?T{totals_current=TotalsPrevious}) ->
-    new(TotalsPrevious).
-
--spec export(t()) ->
-    beam_stats:t().
-export(
-    ?T
-    { timestamp = Timestamp
-    , node_id   = NodeID
-    , snapshots =
-        #snapshots
-        { memory    = Memory
-        , processes = Processes
-        , run_queue = RunQueue
-        , ets       = ETS
-        }
-    , deltas =
-        #deltas
-        { reductions = Reductions
-        }
-    , totals_previous =
-        #totals
-        { io_bytes_in      = PreviousIOBytesIn
-        , io_bytes_out     = PreviousIOBytesOut
-        , context_switches = PreviousContextSwitches
-        }
-    , totals_current =
-        #totals
-        { io_bytes_in      = CurrentIOBytesIn
-        , io_bytes_out     = CurrentIOBytesOut
-        , context_switches = CurrentContextSwitches
-        }
-    }
-) ->
-    #beam_stats
-    { timestamp        = Timestamp
-    , node_id          = NodeID
-    , memory           = Memory
-    , io_bytes_in      = CurrentIOBytesIn  - PreviousIOBytesIn
-    , io_bytes_out     = CurrentIOBytesOut - PreviousIOBytesOut
-    , context_switches = CurrentContextSwitches - PreviousContextSwitches
-    , reductions       = Reductions
-    , run_queue        = RunQueue
-    , ets              = ETS
-    , processes        = Processes
-    }.
-
-snapshots_new() ->
-    #snapshots
-    { memory     = beam_stats_source:erlang_memory()
-    , processes  = beam_stats_processes:collect()
-    , run_queue  = beam_stats_source:erlang_statistics(run_queue)
-    , ets        = beam_stats_ets:collect()
-    }.
-
-deltas_new() ->
-    {_ReductionsTotal, ReductionsDelta} = beam_stats_source:erlang_statistics(reductions),
-    #deltas
-    { reductions = ReductionsDelta
-    }.
-
-totals_new() ->
-    { {input  , IOBytesIn}
-    , {output , IOBytesOut}
-    } = beam_stats_source:erlang_statistics(io),
-    {ContextSwitches, 0} = beam_stats_source:erlang_statistics(context_switches),
-    #totals
-    { io_bytes_in      = IOBytesIn
-    , io_bytes_out     = IOBytesOut
-    , context_switches = ContextSwitches
-    }.
-
-totals_empty() ->
-    #totals
-    { io_bytes_in      = 0
-    , io_bytes_out     = 0
-    , context_switches = 0
-    }.
index 7cfec55..d582df4 100644 (file)
@@ -3,30 +3,30 @@
 -behaviour(supervisor).
 
 %% API
--export([start_link/0]).
+-export([start_link/1]).
 
 %% Supervisor callbacks
 -export([init/1]).
 
 %% Helper macro for declaring children of supervisor
--define(CHILD(Type, Module),
-    {Module, {Module, start_link, []}, permanent, 5000, Type, [Module]}).
+-define(CHILD(Type, Module, Args),
+    {Module, {Module, start_link, Args}, permanent, 5000, Type, [Module]}).
 
 %% ===================================================================
 %% API functions
 %% ===================================================================
 
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+start_link(DeltasServer) ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, DeltasServer).
 
 %% ===================================================================
 %% Supervisor callbacks
 %% ===================================================================
 
-init([]) ->
+init(DeltasServer) ->
     Children =
-        [ ?CHILD(worker     , beam_stats_producer)
-        , ?CHILD(supervisor , beam_stats_sup_consumers)
-        ],
+    [ ?CHILD(worker     , beam_stats_producer     , [DeltasServer])
+    , ?CHILD(supervisor , beam_stats_sup_consumers, [])
+    ],
     SupFlags = {one_for_one, 5, 10},
     {ok, {SupFlags, Children}}.
index b8b3d30..88f82e7 100644 (file)
@@ -38,11 +38,7 @@ groups() ->
 
 t_full_cycle(_Cfg) ->
     meck:new(beam_stats_source),
-    BEAMStatsExpected = meck_expect_beam_stats(),
-    BEAMStatsComputed = beam_stats_state:export(beam_stats_state:new()),
-    ct:log("BEAMStatsExpected: ~p~n", [BEAMStatsExpected]),
-    ct:log("BEAMStatsComputed: ~p~n", [BEAMStatsComputed]),
-    BEAMStatsExpected = BEAMStatsComputed,
+    _BEAMStatsExpected = meck_expect_beam_stats(),
 
     {ok,[hope,beam_stats]} = application:ensure_all_started(beam_stats),
     ct:log("beam_stats started~n"),
This page took 0.052814 seconds and 4 git commands to generate.