%% API
-export(
- [ start_link/0
+ [ start_link/1
, subscribe/1
, unsubscribe/1
-define(PRODUCE_ASYNC , produce_async).
-record(state,
- { consumers = ordsets:new() :: ordsets:ordset(pid())
- , stats_state :: beam_stats_state:t()
+ { consumers = ordsets:new() :: ordsets:ordset(pid())
+ , deltas_server :: beam_stats_delta:t()
}).
-type state() ::
%% API
%% ============================================================================
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(DeltasServer) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []).
-spec subscribe(pid()) ->
ok.
%% gen_server callbacks
%% ============================================================================
-init([]) ->
+init(DeltasServer) ->
ok = schedule_first_production(),
Consumers = ordsets:new(),
- StatsState = beam_stats_state:new(),
- {ok, #state{consumers=Consumers, stats_state=StatsState}}.
+ {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}.
handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
Consumers2 = ordsets:add_element(PID, Consumers1),
state().
produce(
#state
- { consumers = ConsumersSet
- , stats_state = StatsState1
+ { consumers = ConsumersSet
+ , deltas_server = DeltasServer
}=State,
MsgSendFun
) ->
- StatsState2 = beam_stats_state:update(StatsState1),
- Stats = beam_stats_state:export(StatsState2),
+ Stats = beam_stats:collect(DeltasServer),
ConsumersList = ordsets:to_list(ConsumersSet),
Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
ok = lists:foreach(Send, ConsumersList),
- State#state{stats_state = StatsState2}.
+ beam_stats_delta:gc(DeltasServer),
+ State.
-spec schedule_first_production() ->
ok.