Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_consumer). |
2 | ||
3 | -include("include/beam_stats.hrl"). | |
4 | -include( "beam_stats.hrl"). | |
5 | ||
6 | -behaviour(gen_server). | |
7 | ||
8 | -export_type( | |
9 | [ queue/0 | |
10 | ]). | |
11 | ||
12 | %% Public API | |
13 | -export( | |
14 | [ add/2 | |
15 | ]). | |
16 | ||
17 | %% Internal API | |
18 | -export( | |
19 | [ start_link/2 | |
20 | ]). | |
21 | ||
22 | %% gen_server callbacks | |
23 | -export( | |
24 | [ init/1 | |
25 | , handle_call/3 | |
26 | , handle_cast/2 | |
27 | , handle_info/2 | |
28 | , terminate/2 | |
29 | , code_change/3 | |
30 | ]). | |
31 | ||
32 | -type queue() :: | |
33 | queue:queue(beam_stats:t()). | |
34 | ||
35 | %% ============================================================================ | |
36 | %% Consumer interface | |
37 | %% ============================================================================ | |
38 | ||
39 | -callback init(Options :: term()) -> | |
76aefffb | 40 | {ConsumptionInterval :: non_neg_integer(), State :: term()}. |
caf75ed8 SK |
41 | |
42 | -callback consume(queue(), State) -> | |
43 | State. | |
44 | ||
45 | -callback terminate(State :: term()) -> | |
46 | {}. | |
47 | ||
48 | %% ============================================================================ | |
49 | %% Internal data | |
50 | %% ============================================================================ | |
51 | ||
52 | -define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal). | |
53 | ||
54 | -record(state, | |
25ef8cda | 55 | { consumer_module :: module() |
caf75ed8 | 56 | , consumer_state :: term() |
76aefffb | 57 | , consumption_interval :: non_neg_integer() |
caf75ed8 SK |
58 | , beam_stats_queue :: queue() |
59 | }). | |
60 | ||
61 | %% ============================================================================ | |
62 | %% Public API | |
63 | %% ============================================================================ | |
64 | ||
25ef8cda | 65 | -spec add(module(), term()) -> |
caf75ed8 SK |
66 | supervisor:startchild_ret(). |
67 | add(ConsumerModule, ConsumerOptions) -> | |
68 | beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions). | |
69 | ||
70 | %% ============================================================================ | |
71 | %% Internal API | |
72 | %% ============================================================================ | |
73 | ||
74 | start_link(ConsumerModule, ConsumerOptions) -> | |
75 | GenServerModule = ?MODULE, | |
76 | GenServerOpts = [], | |
77 | InitArgs = [ConsumerModule, ConsumerOptions], | |
78 | gen_server:start_link(GenServerModule, InitArgs, GenServerOpts). | |
79 | ||
80 | %% ============================================================================ | |
81 | %% gen_server callbacks (unused) | |
82 | %% ============================================================================ | |
83 | ||
84 | handle_call(_Request, _From, State) -> | |
85 | ?METHOD_SHOULD_NOT_BE_USED(handle_call, State). | |
86 | ||
87 | code_change(_OldVsn, State, _Extra) -> | |
88 | {ok, State}. | |
89 | ||
90 | %% ============================================================================ | |
91 | %% gen_server callbacks | |
92 | %% ============================================================================ | |
93 | ||
94 | init([ConsumerModule, ConsumerOptions]) -> | |
95 | {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions), | |
96 | State = #state | |
97 | { consumer_module = ConsumerModule | |
98 | , consumer_state = ConsumerState | |
99 | , consumption_interval = ConsumptionInterval | |
100 | , beam_stats_queue = queue:new() | |
101 | }, | |
102 | ok = beam_stats_producer:subscribe(self()), | |
103 | ok = schedule_first_consumption(), | |
104 | {ok, State}. | |
105 | ||
106 | terminate(_Reason, _State) -> | |
107 | ok = beam_stats_producer:unsubscribe(self()). | |
108 | ||
109 | handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) -> | |
110 | Q2 = queue:in(BEAMStats, Q1), | |
111 | {noreply, State#state{beam_stats_queue = Q2}}. | |
112 | ||
113 | handle_info( | |
114 | ?SIGNAL_CONSUMPTION, | |
115 | #state | |
116 | { consumer_module = ConsumerModule | |
117 | , consumer_state = ConsumerState | |
118 | , consumption_interval = ConsumptionInterval | |
119 | , beam_stats_queue = Q | |
120 | }=State1 | |
121 | ) -> | |
122 | State2 = State1#state | |
123 | { consumer_state = ConsumerModule:consume(Q, ConsumerState) | |
124 | , beam_stats_queue = queue:new() | |
125 | }, | |
126 | ok = schedule_next_consumption(ConsumptionInterval), | |
127 | {noreply, State2}. | |
128 | ||
129 | %% ============================================================================ | |
130 | %% Internal | |
131 | %% ============================================================================ | |
132 | ||
133 | -spec schedule_first_consumption() -> | |
134 | ok. | |
135 | schedule_first_consumption() -> | |
136 | _ = self() ! ?SIGNAL_CONSUMPTION, | |
137 | ok. | |
138 | ||
76aefffb | 139 | -spec schedule_next_consumption(non_neg_integer()) -> |
caf75ed8 SK |
140 | ok. |
141 | schedule_next_consumption(Time) -> | |
142 | _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION), | |
143 | ok. |