1 -module(beam_stats_producer).
3 -behaviour(gen_server).
12 , sync_produce_consume/0
15 %% gen_server callbacks
25 %% ============================================================================
27 %% ============================================================================
29 -define(PRODUCE_SYNC , produce_sync).
30 -define(PRODUCE_ASYNC , produce_async).
33 { consumers = ordsets:new() :: ordsets:ordset(pid())
34 , deltas_server :: beam_stats_delta:t()
40 %% ============================================================================
42 %% ============================================================================
44 start_link(DeltasServer) ->
45 gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []).
47 -spec subscribe(pid()) ->
50 gen_server:cast(?MODULE, {subscribe, PID}).
52 -spec unsubscribe(pid()) ->
55 gen_server:cast(?MODULE, {unsubscribe, PID}).
57 -spec sync_produce_consume() ->
59 sync_produce_consume() ->
60 {} = gen_server:call(?MODULE, ?PRODUCE_SYNC).
62 %% ============================================================================
63 %% gen_server callbacks (unused)
64 %% ============================================================================
66 code_change(_OldVsn, State, _Extra) ->
69 terminate(_Reason, _State) ->
72 %% ============================================================================
73 %% gen_server callbacks
74 %% ============================================================================
77 ok = schedule_first_production(),
78 Consumers = ordsets:new(),
79 {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}.
81 handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
82 Consumers2 = ordsets:add_element(PID, Consumers1),
83 {noreply, State#state{consumers=Consumers2}};
85 handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) ->
86 Consumers2 = ordsets:del_element(PID, Consumers1),
87 {noreply, State#state{consumers=Consumers2}}.
89 handle_call(?PRODUCE_SYNC, _From, State1) ->
90 State2 = produce_sync(State1),
93 handle_info(?PRODUCE_ASYNC, #state{}=State1) ->
94 State2 = produce_async(State1),
95 ok = schedule_next_production(),
98 %% ============================================================================
100 %% ============================================================================
102 -spec produce_sync(state()) ->
104 produce_sync(#state{}=State) ->
105 produce(State, fun beam_stats_consumer:consume_sync/2).
107 -spec produce_async(state()) ->
109 produce_async(#state{}=State) ->
110 produce(State, fun beam_stats_consumer:consume_async/2).
112 -spec produce(state(), fun((pid(), term()) -> ok)) ->
116 { consumers = ConsumersSet
117 , deltas_server = DeltasServer
121 Stats = beam_stats:collect(DeltasServer),
122 ConsumersList = ordsets:to_list(ConsumersSet),
123 Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
124 ok = lists:foreach(Send, ConsumersList),
125 beam_stats_delta:gc(DeltasServer),
128 -spec schedule_first_production() ->
130 schedule_first_production() ->
131 _ = self() ! ?PRODUCE_ASYNC,
134 -spec schedule_next_production() ->
136 schedule_next_production() ->
137 ProductionInterval = beam_stats_config:production_interval(),
138 _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC),