Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_consumer). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
caf75ed8 SK |
4 | |
5 | -behaviour(gen_server). | |
6 | ||
7 | -export_type( | |
8 | [ queue/0 | |
9 | ]). | |
10 | ||
11 | %% Public API | |
12 | -export( | |
13 | [ add/2 | |
2086842d SK |
14 | , consume_sync/2 |
15 | , consume_async/2 | |
caf75ed8 SK |
16 | ]). |
17 | ||
18 | %% Internal API | |
19 | -export( | |
20 | [ start_link/2 | |
21 | ]). | |
22 | ||
23 | %% gen_server callbacks | |
24 | -export( | |
25 | [ init/1 | |
26 | , handle_call/3 | |
27 | , handle_cast/2 | |
28 | , handle_info/2 | |
29 | , terminate/2 | |
30 | , code_change/3 | |
31 | ]). | |
32 | ||
33 | -type queue() :: | |
34 | queue:queue(beam_stats:t()). | |
35 | ||
36 | %% ============================================================================ | |
37 | %% Consumer interface | |
38 | %% ============================================================================ | |
39 | ||
40 | -callback init(Options :: term()) -> | |
76aefffb | 41 | {ConsumptionInterval :: non_neg_integer(), State :: term()}. |
caf75ed8 SK |
42 | |
43 | -callback consume(queue(), State) -> | |
44 | State. | |
45 | ||
46 | -callback terminate(State :: term()) -> | |
47 | {}. | |
48 | ||
49 | %% ============================================================================ | |
50 | %% Internal data | |
51 | %% ============================================================================ | |
52 | ||
2086842d SK |
53 | -define(FLUSH , flush). |
54 | -define(CONSUME_SYNC , consume_sync). | |
55 | -define(CONSUME_ASYNC , consume_async). | |
caf75ed8 SK |
56 | |
57 | -record(state, | |
25ef8cda | 58 | { consumer_module :: module() |
caf75ed8 | 59 | , consumer_state :: term() |
76aefffb | 60 | , consumption_interval :: non_neg_integer() |
caf75ed8 SK |
61 | , beam_stats_queue :: queue() |
62 | }). | |
63 | ||
2086842d SK |
64 | -type state() :: |
65 | #state{}. | |
66 | ||
caf75ed8 SK |
67 | %% ============================================================================ |
68 | %% Public API | |
69 | %% ============================================================================ | |
70 | ||
25ef8cda | 71 | -spec add(module(), term()) -> |
caf75ed8 SK |
72 | supervisor:startchild_ret(). |
73 | add(ConsumerModule, ConsumerOptions) -> | |
74 | beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions). | |
75 | ||
2086842d SK |
76 | -spec consume_sync(pid(), beam_stats:t()) -> |
77 | {}. | |
78 | consume_sync(PID, #beam_stats{}=BEAMStats) -> | |
79 | {} = gen_server:call(PID, {?CONSUME_SYNC, BEAMStats}). | |
80 | ||
81 | -spec consume_async(pid(), beam_stats:t()) -> | |
82 | {}. | |
83 | consume_async(PID, #beam_stats{}=BEAMStats) -> | |
84 | ok = gen_server:cast(PID, {?CONSUME_ASYNC, BEAMStats}), | |
85 | {}. | |
86 | ||
caf75ed8 SK |
87 | %% ============================================================================ |
88 | %% Internal API | |
89 | %% ============================================================================ | |
90 | ||
91 | start_link(ConsumerModule, ConsumerOptions) -> | |
92 | GenServerModule = ?MODULE, | |
93 | GenServerOpts = [], | |
94 | InitArgs = [ConsumerModule, ConsumerOptions], | |
95 | gen_server:start_link(GenServerModule, InitArgs, GenServerOpts). | |
96 | ||
97 | %% ============================================================================ | |
98 | %% gen_server callbacks (unused) | |
99 | %% ============================================================================ | |
100 | ||
caf75ed8 SK |
101 | code_change(_OldVsn, State, _Extra) -> |
102 | {ok, State}. | |
103 | ||
104 | %% ============================================================================ | |
105 | %% gen_server callbacks | |
106 | %% ============================================================================ | |
107 | ||
108 | init([ConsumerModule, ConsumerOptions]) -> | |
109 | {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions), | |
110 | State = #state | |
111 | { consumer_module = ConsumerModule | |
112 | , consumer_state = ConsumerState | |
113 | , consumption_interval = ConsumptionInterval | |
114 | , beam_stats_queue = queue:new() | |
115 | }, | |
116 | ok = beam_stats_producer:subscribe(self()), | |
2086842d | 117 | ok = schedule_first_flush(), |
caf75ed8 SK |
118 | {ok, State}. |
119 | ||
120 | terminate(_Reason, _State) -> | |
121 | ok = beam_stats_producer:unsubscribe(self()). | |
122 | ||
2086842d SK |
123 | handle_call({?CONSUME_SYNC, #beam_stats{}=BEAMStats}, _, #state{}=State1) -> |
124 | State2 = consume_one(BEAMStats, State1), | |
125 | {reply, {}, State2}. | |
caf75ed8 | 126 | |
2086842d SK |
127 | handle_cast({?CONSUME_ASYNC, #beam_stats{}=BEAMStats}, #state{}=State1) -> |
128 | State2 = enqueue(BEAMStats, State1), | |
129 | {noreply, State2}. | |
130 | ||
131 | handle_info(?FLUSH, #state{consumption_interval=ConsumptionInterval}=State1) -> | |
132 | State2 = consume_all_queued(State1), | |
133 | ok = schedule_next_flush(ConsumptionInterval), | |
caf75ed8 SK |
134 | {noreply, State2}. |
135 | ||
136 | %% ============================================================================ | |
137 | %% Internal | |
138 | %% ============================================================================ | |
139 | ||
2086842d SK |
140 | -spec consume_one(beam_stats:t(), state()) -> |
141 | state(). | |
142 | consume_one(#beam_stats{}=BEAMStats, #state{}=State1) -> | |
143 | Q = queue:in(BEAMStats, queue:new()), | |
144 | consume(Q, State1). | |
145 | ||
146 | -spec consume_all_queued(state()) -> | |
147 | state(). | |
148 | consume_all_queued(#state{beam_stats_queue=Q}=State1) -> | |
149 | State2 = consume(Q, State1), | |
150 | State2#state{beam_stats_queue = queue:new()}. | |
151 | ||
152 | -spec consume(queue(), state()) -> | |
153 | state(). | |
154 | consume( | |
155 | Q, | |
156 | #state | |
157 | { consumer_module = ConsumerModule | |
158 | , consumer_state = ConsumerState | |
159 | }=State | |
160 | ) -> | |
161 | State#state{consumer_state = ConsumerModule:consume(Q, ConsumerState)}. | |
162 | ||
163 | -spec enqueue(beam_stats:t(), state()) -> | |
164 | state(). | |
165 | enqueue(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q}=State) -> | |
166 | State#state{beam_stats_queue = queue:in(BEAMStats, Q)}. | |
167 | ||
168 | -spec schedule_first_flush() -> | |
caf75ed8 | 169 | ok. |
2086842d SK |
170 | schedule_first_flush() -> |
171 | _ = self() ! ?FLUSH, | |
caf75ed8 SK |
172 | ok. |
173 | ||
2086842d | 174 | -spec schedule_next_flush(non_neg_integer()) -> |
caf75ed8 | 175 | ok. |
2086842d SK |
176 | schedule_next_flush(Time) -> |
177 | _ = erlang:send_after(Time, self(), ?FLUSH), | |
caf75ed8 | 178 | ok. |