1 -module(beam_stats_consumer).
3 -include("include/beam_stats.hrl").
5 -behaviour(gen_server).
23 %% gen_server callbacks
34 queue:queue(beam_stats:t()).
36 %% ============================================================================
38 %% ============================================================================
40 -callback init(Options :: term()) ->
41 {ConsumptionInterval :: non_neg_integer(), State :: term()}.
43 -callback consume(queue(), State) ->
46 -callback terminate(State :: term()) ->
49 %% ============================================================================
51 %% ============================================================================
53 -define(FLUSH , flush).
54 -define(CONSUME_SYNC , consume_sync).
55 -define(CONSUME_ASYNC , consume_async).
58 { consumer_module :: module()
59 , consumer_state :: term()
60 , consumption_interval :: non_neg_integer()
61 , beam_stats_queue :: queue()
67 %% ============================================================================
69 %% ============================================================================
71 -spec add(module(), term()) ->
72 supervisor:startchild_ret().
73 add(ConsumerModule, ConsumerOptions) ->
74 beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
76 -spec consume_sync(pid(), beam_stats:t()) ->
78 consume_sync(PID, #beam_stats{}=BEAMStats) ->
79 {} = gen_server:call(PID, {?CONSUME_SYNC, BEAMStats}).
81 -spec consume_async(pid(), beam_stats:t()) ->
83 consume_async(PID, #beam_stats{}=BEAMStats) ->
84 ok = gen_server:cast(PID, {?CONSUME_ASYNC, BEAMStats}),
87 %% ============================================================================
89 %% ============================================================================
91 start_link(ConsumerModule, ConsumerOptions) ->
92 RegisteredName = ConsumerModule,
93 GenServerModule = ?MODULE,
95 InitArgs = [ConsumerModule, ConsumerOptions],
96 gen_server:start_link({local, RegisteredName}, GenServerModule, InitArgs, GenServerOpts).
98 %% ============================================================================
99 %% gen_server callbacks (unused)
100 %% ============================================================================
102 code_change(_OldVsn, State, _Extra) ->
105 %% ============================================================================
106 %% gen_server callbacks
107 %% ============================================================================
109 init([ConsumerModule, ConsumerOptions]) ->
110 {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions),
112 { consumer_module = ConsumerModule
113 , consumer_state = ConsumerState
114 , consumption_interval = ConsumptionInterval
115 , beam_stats_queue = queue:new()
117 ok = beam_stats_producer:subscribe(self()),
118 ok = schedule_first_flush(),
121 terminate(_Reason, _State) ->
122 ok = beam_stats_producer:unsubscribe(self()).
124 handle_call({?CONSUME_SYNC, #beam_stats{}=BEAMStats}, _, #state{}=State1) ->
125 State2 = consume_one(BEAMStats, State1),
128 handle_cast({?CONSUME_ASYNC, #beam_stats{}=BEAMStats}, #state{}=State1) ->
129 State2 = enqueue(BEAMStats, State1),
132 handle_info(?FLUSH, #state{consumption_interval=ConsumptionInterval}=State1) ->
133 State2 = consume_all_queued(State1),
134 ok = schedule_next_flush(ConsumptionInterval),
137 %% ============================================================================
139 %% ============================================================================
141 -spec consume_one(beam_stats:t(), state()) ->
143 consume_one(#beam_stats{}=BEAMStats, #state{}=State1) ->
144 Q = queue:in(BEAMStats, queue:new()),
147 -spec consume_all_queued(state()) ->
149 consume_all_queued(#state{beam_stats_queue=Q}=State1) ->
150 State2 = consume(Q, State1),
151 State2#state{beam_stats_queue = queue:new()}.
153 -spec consume(queue(), state()) ->
158 { consumer_module = ConsumerModule
159 , consumer_state = ConsumerState
162 State#state{consumer_state = ConsumerModule:consume(Q, ConsumerState)}.
164 -spec enqueue(beam_stats:t(), state()) ->
166 enqueue(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q}=State) ->
167 State#state{beam_stats_queue = queue:in(BEAMStats, Q)}.
169 -spec schedule_first_flush() ->
171 schedule_first_flush() ->
175 -spec schedule_next_flush(non_neg_integer()) ->
177 schedule_next_flush(Time) ->
178 _ = erlang:send_after(Time, self(), ?FLUSH),