-module(beam_stats_producer).
--include("beam_stats.hrl").
-
-behaviour(gen_server).
%% API
-export(
- [ start_link/0
+ [ start_link/1
, subscribe/1
, unsubscribe/1
- % Force production and distribution. Blocks. Mainly for testing.
- , force_production/0
+ % For testing
+ , sync_produce_consume/0
]).
%% gen_server callbacks
%% Internal data
%% ============================================================================
--define(SIGNAL_PRODUCTION , beam_stats_production_signal).
--define(FORCE_PRODUCTION , beam_stats_force_production).
+-define(PRODUCE_SYNC , produce_sync).
+-define(PRODUCE_ASYNC , produce_async).
-record(state,
- { consumers = ordsets:new() :: ordsets:ordset(pid())
+ { consumers = ordsets:new() :: ordsets:ordset(pid())
+ , deltas_server :: beam_stats_delta:t()
}).
-type state() ::
%% API
%% ============================================================================
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(DeltasServer) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []).
-spec subscribe(pid()) ->
ok.
unsubscribe(PID) ->
gen_server:cast(?MODULE, {unsubscribe, PID}).
--spec force_production() ->
+-spec sync_produce_consume() ->
{}.
-force_production() ->
- gen_server:call(?MODULE, ?FORCE_PRODUCTION).
+sync_produce_consume() ->
+ {} = gen_server:call(?MODULE, ?PRODUCE_SYNC).
%% ============================================================================
%% gen_server callbacks (unused)
%% gen_server callbacks
%% ============================================================================
-init([]) ->
+init(DeltasServer) ->
ok = schedule_first_production(),
Consumers = ordsets:new(),
- {ok, #state{consumers=Consumers}}.
+ {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}.
handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
Consumers2 = ordsets:add_element(PID, Consumers1),
Consumers2 = ordsets:del_element(PID, Consumers1),
{noreply, State#state{consumers=Consumers2}}.
-handle_call(?FORCE_PRODUCTION, _From, State) ->
- {} = produce(State),
- {reply, {}, State}.
+handle_call(?PRODUCE_SYNC, _From, State1) ->
+ State2 = produce_sync(State1),
+ {reply, {}, State2}.
-handle_info(?SIGNAL_PRODUCTION, #state{}=State) ->
- {} = produce(State),
+handle_info(?PRODUCE_ASYNC, #state{}=State1) ->
+ State2 = produce_async(State1),
ok = schedule_next_production(),
- {noreply, State}.
+ {noreply, State2}.
%% ============================================================================
%% Private
%% ============================================================================
--spec produce(state()) ->
- {}.
-produce(#state{consumers=ConsumersSet}) ->
+-spec produce_sync(state()) ->
+ state().
+produce_sync(#state{}=State) ->
+ produce(State, fun beam_stats_consumer:consume_sync/2).
+
+-spec produce_async(state()) ->
+ state().
+produce_async(#state{}=State) ->
+ produce(State, fun beam_stats_consumer:consume_async/2).
+
+-spec produce(state(), fun((pid(), term()) -> ok)) ->
+ state().
+produce(
+ #state
+ { consumers = ConsumersSet
+ , deltas_server = DeltasServer
+ }=State,
+ MsgSendFun
+) ->
+ Stats = beam_stats:collect(DeltasServer),
ConsumersList = ordsets:to_list(ConsumersSet),
- ok = collect_and_push_to_consumers(ConsumersList),
- {}.
+ Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
+ ok = lists:foreach(Send, ConsumersList),
+ beam_stats_delta:gc(DeltasServer),
+ State.
-spec schedule_first_production() ->
ok.
schedule_first_production() ->
- _ = self() ! ?SIGNAL_PRODUCTION,
+ _ = self() ! ?PRODUCE_ASYNC,
ok.
-spec schedule_next_production() ->
ok.
schedule_next_production() ->
ProductionInterval = beam_stats_config:production_interval(),
- _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION),
- ok.
-
--spec collect_and_push_to_consumers([pid()]) ->
+ _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC),
ok.
-collect_and_push_to_consumers(Consumers) ->
- BEAMStats = beam_stats:collect(),
- Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end,
- lists:foreach(Push, Consumers).