3e46042311b1cd3033825bda65d04fa65dbe3dbb
[beam_stats.git] / src / beam_stats_consumer.erl
1 -module(beam_stats_consumer).
2
3 -include("include/beam_stats.hrl").
4
5 -behaviour(gen_server).
6
7 -export_type(
8 [ queue/0
9 ]).
10
11 %% Public API
12 -export(
13 [ add/2
14 , consume_sync/2
15 , consume_async/2
16 ]).
17
18 %% Internal API
19 -export(
20 [ start_link/2
21 ]).
22
23 %% gen_server callbacks
24 -export(
25 [ init/1
26 , handle_call/3
27 , handle_cast/2
28 , handle_info/2
29 , terminate/2
30 , code_change/3
31 ]).
32
33 -type queue() ::
34 queue:queue(beam_stats:t()).
35
36 %% ============================================================================
37 %% Consumer interface
38 %% ============================================================================
39
40 -callback init(Options :: term()) ->
41 {ConsumptionInterval :: non_neg_integer(), State :: term()}.
42
43 -callback consume(queue(), State) ->
44 State.
45
46 -callback terminate(State :: term()) ->
47 {}.
48
49 %% ============================================================================
50 %% Internal data
51 %% ============================================================================
52
53 -define(FLUSH , flush).
54 -define(CONSUME_SYNC , consume_sync).
55 -define(CONSUME_ASYNC , consume_async).
56
57 -record(state,
58 { consumer_module :: module()
59 , consumer_state :: term()
60 , consumption_interval :: non_neg_integer()
61 , beam_stats_queue :: queue()
62 }).
63
64 -type state() ::
65 #state{}.
66
67 %% ============================================================================
68 %% Public API
69 %% ============================================================================
70
71 -spec add(module(), term()) ->
72 supervisor:startchild_ret().
73 add(ConsumerModule, ConsumerOptions) ->
74 beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
75
76 -spec consume_sync(pid(), beam_stats:t()) ->
77 {}.
78 consume_sync(PID, #beam_stats{}=BEAMStats) ->
79 {} = gen_server:call(PID, {?CONSUME_SYNC, BEAMStats}).
80
81 -spec consume_async(pid(), beam_stats:t()) ->
82 {}.
83 consume_async(PID, #beam_stats{}=BEAMStats) ->
84 ok = gen_server:cast(PID, {?CONSUME_ASYNC, BEAMStats}),
85 {}.
86
87 %% ============================================================================
88 %% Internal API
89 %% ============================================================================
90
91 start_link(ConsumerModule, ConsumerOptions) ->
92 GenServerModule = ?MODULE,
93 GenServerOpts = [],
94 InitArgs = [ConsumerModule, ConsumerOptions],
95 gen_server:start_link(GenServerModule, InitArgs, GenServerOpts).
96
97 %% ============================================================================
98 %% gen_server callbacks (unused)
99 %% ============================================================================
100
101 code_change(_OldVsn, State, _Extra) ->
102 {ok, State}.
103
104 %% ============================================================================
105 %% gen_server callbacks
106 %% ============================================================================
107
108 init([ConsumerModule, ConsumerOptions]) ->
109 {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions),
110 State = #state
111 { consumer_module = ConsumerModule
112 , consumer_state = ConsumerState
113 , consumption_interval = ConsumptionInterval
114 , beam_stats_queue = queue:new()
115 },
116 ok = beam_stats_producer:subscribe(self()),
117 ok = schedule_first_flush(),
118 {ok, State}.
119
120 terminate(_Reason, _State) ->
121 ok = beam_stats_producer:unsubscribe(self()).
122
123 handle_call({?CONSUME_SYNC, #beam_stats{}=BEAMStats}, _, #state{}=State1) ->
124 State2 = consume_one(BEAMStats, State1),
125 {reply, {}, State2}.
126
127 handle_cast({?CONSUME_ASYNC, #beam_stats{}=BEAMStats}, #state{}=State1) ->
128 State2 = enqueue(BEAMStats, State1),
129 {noreply, State2}.
130
131 handle_info(?FLUSH, #state{consumption_interval=ConsumptionInterval}=State1) ->
132 State2 = consume_all_queued(State1),
133 ok = schedule_next_flush(ConsumptionInterval),
134 {noreply, State2}.
135
136 %% ============================================================================
137 %% Internal
138 %% ============================================================================
139
140 -spec consume_one(beam_stats:t(), state()) ->
141 state().
142 consume_one(#beam_stats{}=BEAMStats, #state{}=State1) ->
143 Q = queue:in(BEAMStats, queue:new()),
144 consume(Q, State1).
145
146 -spec consume_all_queued(state()) ->
147 state().
148 consume_all_queued(#state{beam_stats_queue=Q}=State1) ->
149 State2 = consume(Q, State1),
150 State2#state{beam_stats_queue = queue:new()}.
151
152 -spec consume(queue(), state()) ->
153 state().
154 consume(
155 Q,
156 #state
157 { consumer_module = ConsumerModule
158 , consumer_state = ConsumerState
159 }=State
160 ) ->
161 State#state{consumer_state = ConsumerModule:consume(Q, ConsumerState)}.
162
163 -spec enqueue(beam_stats:t(), state()) ->
164 state().
165 enqueue(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q}=State) ->
166 State#state{beam_stats_queue = queue:in(BEAMStats, Q)}.
167
168 -spec schedule_first_flush() ->
169 ok.
170 schedule_first_flush() ->
171 _ = self() ! ?FLUSH,
172 ok.
173
174 -spec schedule_next_flush(non_neg_integer()) ->
175 ok.
176 schedule_next_flush(Time) ->
177 _ = erlang:send_after(Time, self(), ?FLUSH),
178 ok.
This page took 0.075862 seconds and 3 git commands to generate.