Implement synchronous produce->consume cycle, for testing. 0.5.2
authorSiraaj Khandkar <siraaj@khandkar.net>
Wed, 26 Aug 2015 04:00:27 +0000 (00:00 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Wed, 26 Aug 2015 04:05:44 +0000 (00:05 -0400)
src/beam_stats.app.src
src/beam_stats.hrl [deleted file]
src/beam_stats_consumer.erl
src/beam_stats_producer.erl

index 40bab63..9eb72ab 100644 (file)
@@ -1,7 +1,7 @@
 {application, beam_stats,
  [
   {description, "Periodic VM stats production and consumption."},
-  {vsn, "0.5.1"},
+  {vsn, "0.5.2"},
   {registered, []},
   {applications,
     [ kernel
diff --git a/src/beam_stats.hrl b/src/beam_stats.hrl
deleted file mode 100644 (file)
index c7df52a..0000000
+++ /dev/null
@@ -1,3 +0,0 @@
--define(METHOD_SHOULD_NOT_BE_USED(Method, State),
-    {stop, {method_should_not_be_used, {?MODULE, Method}}, State}
-).
index eb8cbb7..3e46042 100644 (file)
@@ -1,7 +1,6 @@
 -module(beam_stats_consumer).
 
 -include("include/beam_stats.hrl").
--include(        "beam_stats.hrl").
 
 -behaviour(gen_server).
 
@@ -12,6 +11,8 @@
 %% Public API
 -export(
     [ add/2
+    , consume_sync/2
+    , consume_async/2
     ]).
 
 %% Internal API
@@ -49,7 +50,9 @@
 %% Internal data
 %% ============================================================================
 
--define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal).
+-define(FLUSH         , flush).
+-define(CONSUME_SYNC  , consume_sync).
+-define(CONSUME_ASYNC , consume_async).
 
 -record(state,
     { consumer_module      :: module()
@@ -58,6 +61,9 @@
     , beam_stats_queue     :: queue()
     }).
 
+-type state() ::
+    #state{}.
+
 %% ============================================================================
 %%  Public API
 %% ============================================================================
 add(ConsumerModule, ConsumerOptions) ->
     beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
 
+-spec consume_sync(pid(), beam_stats:t()) ->
+    {}.
+consume_sync(PID, #beam_stats{}=BEAMStats) ->
+    {} = gen_server:call(PID, {?CONSUME_SYNC, BEAMStats}).
+
+-spec consume_async(pid(), beam_stats:t()) ->
+    {}.
+consume_async(PID, #beam_stats{}=BEAMStats) ->
+    ok = gen_server:cast(PID, {?CONSUME_ASYNC, BEAMStats}),
+    {}.
+
 %% ============================================================================
 %%  Internal API
 %% ============================================================================
@@ -81,9 +98,6 @@ start_link(ConsumerModule, ConsumerOptions) ->
 %%  gen_server callbacks (unused)
 %% ============================================================================
 
-handle_call(_Request, _From, State) ->
-    ?METHOD_SHOULD_NOT_BE_USED(handle_call, State).
-
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
@@ -100,44 +114,65 @@ init([ConsumerModule, ConsumerOptions]) ->
         , beam_stats_queue     = queue:new()
         },
     ok = beam_stats_producer:subscribe(self()),
-    ok = schedule_first_consumption(),
+    ok = schedule_first_flush(),
     {ok, State}.
 
 terminate(_Reason, _State) ->
     ok = beam_stats_producer:unsubscribe(self()).
 
-handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) ->
-    Q2 = queue:in(BEAMStats, Q1),
-    {noreply, State#state{beam_stats_queue = Q2}}.
+handle_call({?CONSUME_SYNC, #beam_stats{}=BEAMStats}, _, #state{}=State1) ->
+    State2 = consume_one(BEAMStats, State1),
+    {reply, {}, State2}.
 
-handle_info(
-    ?SIGNAL_CONSUMPTION,
-    #state
-    { consumer_module      = ConsumerModule
-    , consumer_state       = ConsumerState
-    , consumption_interval = ConsumptionInterval
-    , beam_stats_queue     = Q
-    }=State1
-) ->
-    State2 = State1#state
-        { consumer_state   = ConsumerModule:consume(Q, ConsumerState)
-        , beam_stats_queue = queue:new()
-        },
-    ok = schedule_next_consumption(ConsumptionInterval),
+handle_cast({?CONSUME_ASYNC, #beam_stats{}=BEAMStats}, #state{}=State1) ->
+    State2 = enqueue(BEAMStats, State1),
+    {noreply, State2}.
+
+handle_info(?FLUSH, #state{consumption_interval=ConsumptionInterval}=State1) ->
+    State2 = consume_all_queued(State1),
+    ok = schedule_next_flush(ConsumptionInterval),
     {noreply, State2}.
 
 %% ============================================================================
 %%  Internal
 %% ============================================================================
 
--spec schedule_first_consumption() ->
+-spec consume_one(beam_stats:t(), state()) ->
+    state().
+consume_one(#beam_stats{}=BEAMStats, #state{}=State1) ->
+    Q = queue:in(BEAMStats, queue:new()),
+    consume(Q, State1).
+
+-spec consume_all_queued(state()) ->
+    state().
+consume_all_queued(#state{beam_stats_queue=Q}=State1) ->
+    State2 = consume(Q, State1),
+    State2#state{beam_stats_queue = queue:new()}.
+
+-spec consume(queue(), state()) ->
+    state().
+consume(
+    Q,
+    #state
+    { consumer_module  = ConsumerModule
+    , consumer_state   = ConsumerState
+    }=State
+) ->
+    State#state{consumer_state = ConsumerModule:consume(Q, ConsumerState)}.
+
+-spec enqueue(beam_stats:t(), state()) ->
+    state().
+enqueue(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q}=State) ->
+    State#state{beam_stats_queue = queue:in(BEAMStats, Q)}.
+
+-spec schedule_first_flush() ->
     ok.
-schedule_first_consumption() ->
-    _ = self() ! ?SIGNAL_CONSUMPTION,
+schedule_first_flush() ->
+    _ = self() ! ?FLUSH,
     ok.
 
--spec schedule_next_consumption(non_neg_integer()) ->
+-spec schedule_next_flush(non_neg_integer()) ->
     ok.
-schedule_next_consumption(Time) ->
-    _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION),
+schedule_next_flush(Time) ->
+    _ = erlang:send_after(Time, self(), ?FLUSH),
     ok.
index 67765d3..c7ebd5e 100644 (file)
@@ -1,7 +1,5 @@
 -module(beam_stats_producer).
 
--include("beam_stats.hrl").
-
 -behaviour(gen_server).
 
 %% API
@@ -10,8 +8,8 @@
     ,   subscribe/1
     , unsubscribe/1
 
-    % Force production and distribution. Blocks. Mainly for testing.
-    , force_production/0
+    % For testing
+    , sync_produce_consume/0
     ]).
 
 %% gen_server callbacks
@@ -28,8 +26,8 @@
 %% Internal data
 %% ============================================================================
 
--define(SIGNAL_PRODUCTION , beam_stats_production_signal).
--define(FORCE_PRODUCTION  , beam_stats_force_production).
+-define(PRODUCE_SYNC  , produce_sync).
+-define(PRODUCE_ASYNC , produce_async).
 
 -record(state,
     { consumers = ordsets:new() :: ordsets:ordset(pid())
@@ -56,10 +54,10 @@ subscribe(PID) ->
 unsubscribe(PID) ->
     gen_server:cast(?MODULE, {unsubscribe, PID}).
 
--spec force_production() ->
+-spec sync_produce_consume() ->
     {}.
-force_production() ->
-    gen_server:call(?MODULE, ?FORCE_PRODUCTION).
+sync_produce_consume() ->
+    {} = gen_server:call(?MODULE, ?PRODUCE_SYNC).
 
 %% ============================================================================
 %%  gen_server callbacks (unused)
@@ -89,12 +87,12 @@ handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) ->
     Consumers2 = ordsets:del_element(PID, Consumers1),
     {noreply, State#state{consumers=Consumers2}}.
 
-handle_call(?FORCE_PRODUCTION, _From, State1) ->
-    State2 = produce(State1),
+handle_call(?PRODUCE_SYNC, _From, State1) ->
+    State2 = produce_sync(State1),
     {reply, {}, State2}.
 
-handle_info(?SIGNAL_PRODUCTION, #state{}=State1) ->
-    State2 = produce(State1),
+handle_info(?PRODUCE_ASYNC, #state{}=State1) ->
+    State2 = produce_async(State1),
     ok = schedule_next_production(),
     {noreply, State2}.
 
@@ -102,30 +100,41 @@ handle_info(?SIGNAL_PRODUCTION, #state{}=State1) ->
 %%  Private
 %% ============================================================================
 
--spec produce(state()) ->
+-spec produce_sync(state()) ->
+    state().
+produce_sync(#state{}=State) ->
+    produce(State, fun beam_stats_consumer:consume_sync/2).
+
+-spec produce_async(state()) ->
     state().
-produce(#state{consumers=ConsumersSet, stats_state=StatsState1}=State) ->
+produce_async(#state{}=State) ->
+    produce(State, fun beam_stats_consumer:consume_async/2).
+
+-spec produce(state(), fun((pid(), term()) -> ok)) ->
+    state().
+produce(
+    #state
+    { consumers   = ConsumersSet
+    , stats_state = StatsState1
+    }=State,
+    MsgSendFun
+) ->
     StatsState2 = beam_stats_state:update(StatsState1),
     Stats       = beam_stats_state:export(StatsState2),
     ConsumersList = ordsets:to_list(ConsumersSet),
-    ok = push_to_consumers(Stats, ConsumersList),
+    Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
+    ok = lists:foreach(Send, ConsumersList),
     State#state{stats_state = StatsState2}.
 
 -spec schedule_first_production() ->
     ok.
 schedule_first_production() ->
-    _ = self() ! ?SIGNAL_PRODUCTION,
+    _ = self() ! ?PRODUCE_ASYNC,
     ok.
 
 -spec schedule_next_production() ->
     ok.
 schedule_next_production() ->
     ProductionInterval = beam_stats_config:production_interval(),
-    _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION),
-    ok.
-
--spec push_to_consumers(beam_stats:t(), [pid()]) ->
+    _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC),
     ok.
-push_to_consumers(Stats, Consumers) ->
-    Push = fun (Consumer) -> gen_server:cast(Consumer, Stats) end,
-    lists:foreach(Push, Consumers).
This page took 0.045137 seconds and 4 git commands to generate.