Update alignments.
[beam_stats.git] / src / beam_stats_producer.erl
index cd0933c..eab1759 100644 (file)
@@ -1,17 +1,15 @@
 -module(beam_stats_producer).
 
--include("beam_stats.hrl").
-
 -behaviour(gen_server).
 
 %% API
 -export(
-    [ start_link/0
+    [ start_link/1
     ,   subscribe/1
     , unsubscribe/1
 
-    % Force production and distribution. Blocks. Mainly for testing.
-    , force_production/0
+    % For testing
+    , sync_produce_consume/0
     ]).
 
 %% gen_server callbacks
 %% Internal data
 %% ============================================================================
 
--define(SIGNAL_PRODUCTION , beam_stats_production_signal).
--define(FORCE_PRODUCTION  , beam_stats_force_production).
+-define(PRODUCE_SYNC  , produce_sync).
+-define(PRODUCE_ASYNC , produce_async).
 
 -record(state,
-    { consumers = ordsets:new() :: ordsets:ordset(pid())
+    { consumers     = ordsets:new() :: ordsets:ordset(pid())
+    , deltas_server                 :: beam_stats_delta:t()
     }).
 
 -type state() ::
@@ -42,8 +41,8 @@
 %%  API
 %% ============================================================================
 
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(DeltasServer) ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []).
 
 -spec subscribe(pid()) ->
     ok.
@@ -55,10 +54,10 @@ subscribe(PID) ->
 unsubscribe(PID) ->
     gen_server:cast(?MODULE, {unsubscribe, PID}).
 
--spec force_production() ->
+-spec sync_produce_consume() ->
     {}.
-force_production() ->
-    gen_server:call(?MODULE, ?FORCE_PRODUCTION).
+sync_produce_consume() ->
+    {} = gen_server:call(?MODULE, ?PRODUCE_SYNC).
 
 %% ============================================================================
 %%  gen_server callbacks (unused)
@@ -74,10 +73,10 @@ terminate(_Reason, _State) ->
 %%  gen_server callbacks
 %% ============================================================================
 
-init([]) ->
+init(DeltasServer) ->
     ok = schedule_first_production(),
     Consumers = ordsets:new(),
-    {ok, #state{consumers=Consumers}}.
+    {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}.
 
 handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
     Consumers2 = ordsets:add_element(PID, Consumers1),
@@ -87,42 +86,54 @@ handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) ->
     Consumers2 = ordsets:del_element(PID, Consumers1),
     {noreply, State#state{consumers=Consumers2}}.
 
-handle_call(?FORCE_PRODUCTION, _From, State) ->
-    {} = produce(State),
-    {reply, {}, State}.
+handle_call(?PRODUCE_SYNC, _From, State1) ->
+    State2 = produce_sync(State1),
+    {reply, {}, State2}.
 
-handle_info(?SIGNAL_PRODUCTION, #state{}=State) ->
-    {} = produce(State),
+handle_info(?PRODUCE_ASYNC, #state{}=State1) ->
+    State2 = produce_async(State1),
     ok = schedule_next_production(),
-    {noreply, State}.
+    {noreply, State2}.
 
 %% ============================================================================
 %%  Private
 %% ============================================================================
 
--spec produce(state()) ->
-    {}.
-produce(#state{consumers=ConsumersSet}) ->
+-spec produce_sync(state()) ->
+    state().
+produce_sync(#state{}=State) ->
+    produce(State, fun beam_stats_consumer:consume_sync/2).
+
+-spec produce_async(state()) ->
+    state().
+produce_async(#state{}=State) ->
+    produce(State, fun beam_stats_consumer:consume_async/2).
+
+-spec produce(state(), fun((pid(), term()) -> ok)) ->
+    state().
+produce(
+    #state
+    { consumers     = ConsumersSet
+    , deltas_server = DeltasServer
+    }=State,
+    MsgSendFun
+) ->
+    Stats = beam_stats:collect(DeltasServer),
     ConsumersList = ordsets:to_list(ConsumersSet),
-    ok = collect_and_push_to_consumers(ConsumersList),
-    {}.
+    Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
+    ok = lists:foreach(Send, ConsumersList),
+    beam_stats_delta:gc(DeltasServer),
+    State.
 
 -spec schedule_first_production() ->
     ok.
 schedule_first_production() ->
-    _ = self() ! ?SIGNAL_PRODUCTION,
+    _ = self() ! ?PRODUCE_ASYNC,
     ok.
 
 -spec schedule_next_production() ->
     ok.
 schedule_next_production() ->
     ProductionInterval = beam_stats_config:production_interval(),
-    _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION),
-    ok.
-
--spec collect_and_push_to_consumers([pid()]) ->
+    _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC),
     ok.
-collect_and_push_to_consumers(Consumers) ->
-    BEAMStats = beam_stats:collect(),
-    Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end,
-    lists:foreach(Push, Consumers).
This page took 0.029874 seconds and 4 git commands to generate.