-module(beam_stats_consumer).
-include("include/beam_stats.hrl").
--include( "beam_stats.hrl").
-behaviour(gen_server).
%% Public API
-export(
[ add/2
+ , consume_sync/2
+ , consume_async/2
]).
%% Internal API
%% Internal data
%% ============================================================================
--define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal).
+-define(FLUSH , flush).
+-define(CONSUME_SYNC , consume_sync).
+-define(CONSUME_ASYNC , consume_async).
-record(state,
{ consumer_module :: module()
, beam_stats_queue :: queue()
}).
+-type state() ::
+ #state{}.
+
%% ============================================================================
%% Public API
%% ============================================================================
add(ConsumerModule, ConsumerOptions) ->
beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
+-spec consume_sync(pid(), beam_stats:t()) ->
+ {}.
+consume_sync(PID, #beam_stats{}=BEAMStats) ->
+ {} = gen_server:call(PID, {?CONSUME_SYNC, BEAMStats}).
+
+-spec consume_async(pid(), beam_stats:t()) ->
+ {}.
+consume_async(PID, #beam_stats{}=BEAMStats) ->
+ ok = gen_server:cast(PID, {?CONSUME_ASYNC, BEAMStats}),
+ {}.
+
%% ============================================================================
%% Internal API
%% ============================================================================
%% gen_server callbacks (unused)
%% ============================================================================
-handle_call(_Request, _From, State) ->
- ?METHOD_SHOULD_NOT_BE_USED(handle_call, State).
-
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
, beam_stats_queue = queue:new()
},
ok = beam_stats_producer:subscribe(self()),
- ok = schedule_first_consumption(),
+ ok = schedule_first_flush(),
{ok, State}.
terminate(_Reason, _State) ->
ok = beam_stats_producer:unsubscribe(self()).
-handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) ->
- Q2 = queue:in(BEAMStats, Q1),
- {noreply, State#state{beam_stats_queue = Q2}}.
+handle_call({?CONSUME_SYNC, #beam_stats{}=BEAMStats}, _, #state{}=State1) ->
+ State2 = consume_one(BEAMStats, State1),
+ {reply, {}, State2}.
-handle_info(
- ?SIGNAL_CONSUMPTION,
- #state
- { consumer_module = ConsumerModule
- , consumer_state = ConsumerState
- , consumption_interval = ConsumptionInterval
- , beam_stats_queue = Q
- }=State1
-) ->
- State2 = State1#state
- { consumer_state = ConsumerModule:consume(Q, ConsumerState)
- , beam_stats_queue = queue:new()
- },
- ok = schedule_next_consumption(ConsumptionInterval),
+handle_cast({?CONSUME_ASYNC, #beam_stats{}=BEAMStats}, #state{}=State1) ->
+ State2 = enqueue(BEAMStats, State1),
+ {noreply, State2}.
+
+handle_info(?FLUSH, #state{consumption_interval=ConsumptionInterval}=State1) ->
+ State2 = consume_all_queued(State1),
+ ok = schedule_next_flush(ConsumptionInterval),
{noreply, State2}.
%% ============================================================================
%% Internal
%% ============================================================================
--spec schedule_first_consumption() ->
+-spec consume_one(beam_stats:t(), state()) ->
+ state().
+consume_one(#beam_stats{}=BEAMStats, #state{}=State1) ->
+ Q = queue:in(BEAMStats, queue:new()),
+ consume(Q, State1).
+
+-spec consume_all_queued(state()) ->
+ state().
+consume_all_queued(#state{beam_stats_queue=Q}=State1) ->
+ State2 = consume(Q, State1),
+ State2#state{beam_stats_queue = queue:new()}.
+
+-spec consume(queue(), state()) ->
+ state().
+consume(
+ Q,
+ #state
+ { consumer_module = ConsumerModule
+ , consumer_state = ConsumerState
+ }=State
+) ->
+ State#state{consumer_state = ConsumerModule:consume(Q, ConsumerState)}.
+
+-spec enqueue(beam_stats:t(), state()) ->
+ state().
+enqueue(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q}=State) ->
+ State#state{beam_stats_queue = queue:in(BEAMStats, Q)}.
+
+-spec schedule_first_flush() ->
ok.
-schedule_first_consumption() ->
- _ = self() ! ?SIGNAL_CONSUMPTION,
+schedule_first_flush() ->
+ _ = self() ! ?FLUSH,
ok.
--spec schedule_next_consumption(non_neg_integer()) ->
+-spec schedule_next_flush(non_neg_integer()) ->
ok.
-schedule_next_consumption(Time) ->
- _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION),
+schedule_next_flush(Time) ->
+ _ = erlang:send_after(Time, self(), ?FLUSH),
ok.
-module(beam_stats_producer).
--include("beam_stats.hrl").
-
-behaviour(gen_server).
%% API
, subscribe/1
, unsubscribe/1
- % Force production and distribution. Blocks. Mainly for testing.
- , force_production/0
+ % For testing
+ , sync_produce_consume/0
]).
%% gen_server callbacks
%% Internal data
%% ============================================================================
--define(SIGNAL_PRODUCTION , beam_stats_production_signal).
--define(FORCE_PRODUCTION , beam_stats_force_production).
+-define(PRODUCE_SYNC , produce_sync).
+-define(PRODUCE_ASYNC , produce_async).
-record(state,
{ consumers = ordsets:new() :: ordsets:ordset(pid())
unsubscribe(PID) ->
gen_server:cast(?MODULE, {unsubscribe, PID}).
--spec force_production() ->
+-spec sync_produce_consume() ->
{}.
-force_production() ->
- gen_server:call(?MODULE, ?FORCE_PRODUCTION).
+sync_produce_consume() ->
+ {} = gen_server:call(?MODULE, ?PRODUCE_SYNC).
%% ============================================================================
%% gen_server callbacks (unused)
Consumers2 = ordsets:del_element(PID, Consumers1),
{noreply, State#state{consumers=Consumers2}}.
-handle_call(?FORCE_PRODUCTION, _From, State1) ->
- State2 = produce(State1),
+handle_call(?PRODUCE_SYNC, _From, State1) ->
+ State2 = produce_sync(State1),
{reply, {}, State2}.
-handle_info(?SIGNAL_PRODUCTION, #state{}=State1) ->
- State2 = produce(State1),
+handle_info(?PRODUCE_ASYNC, #state{}=State1) ->
+ State2 = produce_async(State1),
ok = schedule_next_production(),
{noreply, State2}.
%% Private
%% ============================================================================
--spec produce(state()) ->
+-spec produce_sync(state()) ->
+ state().
+produce_sync(#state{}=State) ->
+ produce(State, fun beam_stats_consumer:consume_sync/2).
+
+-spec produce_async(state()) ->
state().
-produce(#state{consumers=ConsumersSet, stats_state=StatsState1}=State) ->
+produce_async(#state{}=State) ->
+ produce(State, fun beam_stats_consumer:consume_async/2).
+
+-spec produce(state(), fun((pid(), term()) -> ok)) ->
+ state().
+produce(
+ #state
+ { consumers = ConsumersSet
+ , stats_state = StatsState1
+ }=State,
+ MsgSendFun
+) ->
StatsState2 = beam_stats_state:update(StatsState1),
Stats = beam_stats_state:export(StatsState2),
ConsumersList = ordsets:to_list(ConsumersSet),
- ok = push_to_consumers(Stats, ConsumersList),
+ Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
+ ok = lists:foreach(Send, ConsumersList),
State#state{stats_state = StatsState2}.
-spec schedule_first_production() ->
ok.
schedule_first_production() ->
- _ = self() ! ?SIGNAL_PRODUCTION,
+ _ = self() ! ?PRODUCE_ASYNC,
ok.
-spec schedule_next_production() ->
ok.
schedule_next_production() ->
ProductionInterval = beam_stats_config:production_interval(),
- _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION),
- ok.
-
--spec push_to_consumers(beam_stats:t(), [pid()]) ->
+ _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC),
ok.
-push_to_consumers(Stats, Consumers) ->
- Push = fun (Consumer) -> gen_server:cast(Consumer, Stats) end,
- lists:foreach(Push, Consumers).