1 -module(beam_stats_consumer).
3 -include("include/beam_stats.hrl").
4 -include( "beam_stats.hrl").
6 -behaviour(gen_server).
22 %% gen_server callbacks
33 queue:queue(beam_stats:t()).
35 %% ============================================================================
37 %% ============================================================================
39 -callback init(Options :: term()) ->
40 {ConsumptionInterval :: non_neg_integer(), State :: term()}.
42 -callback consume(queue(), State) ->
45 -callback terminate(State :: term()) ->
48 %% ============================================================================
50 %% ============================================================================
52 -define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal).
55 { consumer_module :: module()
56 , consumer_state :: term()
57 , consumption_interval :: non_neg_integer()
58 , beam_stats_queue :: queue()
61 %% ============================================================================
63 %% ============================================================================
65 -spec add(module(), term()) ->
66 supervisor:startchild_ret().
67 add(ConsumerModule, ConsumerOptions) ->
68 beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
70 %% ============================================================================
72 %% ============================================================================
74 start_link(ConsumerModule, ConsumerOptions) ->
75 GenServerModule = ?MODULE,
77 InitArgs = [ConsumerModule, ConsumerOptions],
78 gen_server:start_link(GenServerModule, InitArgs, GenServerOpts).
80 %% ============================================================================
81 %% gen_server callbacks (unused)
82 %% ============================================================================
84 handle_call(_Request, _From, State) ->
85 ?METHOD_SHOULD_NOT_BE_USED(handle_call, State).
87 code_change(_OldVsn, State, _Extra) ->
90 %% ============================================================================
91 %% gen_server callbacks
92 %% ============================================================================
94 init([ConsumerModule, ConsumerOptions]) ->
95 {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions),
97 { consumer_module = ConsumerModule
98 , consumer_state = ConsumerState
99 , consumption_interval = ConsumptionInterval
100 , beam_stats_queue = queue:new()
102 ok = beam_stats_producer:subscribe(self()),
103 ok = schedule_first_consumption(),
106 terminate(_Reason, _State) ->
107 ok = beam_stats_producer:unsubscribe(self()).
109 handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) ->
110 Q2 = queue:in(BEAMStats, Q1),
111 {noreply, State#state{beam_stats_queue = Q2}}.
116 { consumer_module = ConsumerModule
117 , consumer_state = ConsumerState
118 , consumption_interval = ConsumptionInterval
119 , beam_stats_queue = Q
122 State2 = State1#state
123 { consumer_state = ConsumerModule:consume(Q, ConsumerState)
124 , beam_stats_queue = queue:new()
126 ok = schedule_next_consumption(ConsumptionInterval),
129 %% ============================================================================
131 %% ============================================================================
133 -spec schedule_first_consumption() ->
135 schedule_first_consumption() ->
136 _ = self() ! ?SIGNAL_CONSUMPTION,
139 -spec schedule_next_consumption(non_neg_integer()) ->
141 schedule_next_consumption(Time) ->
142 _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION),