| 1 | -module(beam_stats_consumer). |
| 2 | |
| 3 | -include("include/beam_stats.hrl"). |
| 4 | -include( "beam_stats.hrl"). |
| 5 | |
| 6 | -behaviour(gen_server). |
| 7 | |
| 8 | -export_type( |
| 9 | [ queue/0 |
| 10 | ]). |
| 11 | |
| 12 | %% Public API |
| 13 | -export( |
| 14 | [ add/2 |
| 15 | ]). |
| 16 | |
| 17 | %% Internal API |
| 18 | -export( |
| 19 | [ start_link/2 |
| 20 | ]). |
| 21 | |
| 22 | %% gen_server callbacks |
| 23 | -export( |
| 24 | [ init/1 |
| 25 | , handle_call/3 |
| 26 | , handle_cast/2 |
| 27 | , handle_info/2 |
| 28 | , terminate/2 |
| 29 | , code_change/3 |
| 30 | ]). |
| 31 | |
| 32 | -type queue() :: |
| 33 | queue:queue(beam_stats:t()). |
| 34 | |
| 35 | %% ============================================================================ |
| 36 | %% Consumer interface |
| 37 | %% ============================================================================ |
| 38 | |
| 39 | -callback init(Options :: term()) -> |
| 40 | {ConsumptionInterval :: non_neg_integer(), State :: term()}. |
| 41 | |
| 42 | -callback consume(queue(), State) -> |
| 43 | State. |
| 44 | |
| 45 | -callback terminate(State :: term()) -> |
| 46 | {}. |
| 47 | |
| 48 | %% ============================================================================ |
| 49 | %% Internal data |
| 50 | %% ============================================================================ |
| 51 | |
| 52 | -define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal). |
| 53 | |
| 54 | -record(state, |
| 55 | { consumer_module :: atom() |
| 56 | , consumer_state :: term() |
| 57 | , consumption_interval :: non_neg_integer() |
| 58 | , beam_stats_queue :: queue() |
| 59 | }). |
| 60 | |
| 61 | %% ============================================================================ |
| 62 | %% Public API |
| 63 | %% ============================================================================ |
| 64 | |
| 65 | -spec add(atom(), term()) -> |
| 66 | supervisor:startchild_ret(). |
| 67 | add(ConsumerModule, ConsumerOptions) -> |
| 68 | beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions). |
| 69 | |
| 70 | %% ============================================================================ |
| 71 | %% Internal API |
| 72 | %% ============================================================================ |
| 73 | |
| 74 | start_link(ConsumerModule, ConsumerOptions) -> |
| 75 | GenServerModule = ?MODULE, |
| 76 | GenServerOpts = [], |
| 77 | InitArgs = [ConsumerModule, ConsumerOptions], |
| 78 | gen_server:start_link(GenServerModule, InitArgs, GenServerOpts). |
| 79 | |
| 80 | %% ============================================================================ |
| 81 | %% gen_server callbacks (unused) |
| 82 | %% ============================================================================ |
| 83 | |
| 84 | handle_call(_Request, _From, State) -> |
| 85 | ?METHOD_SHOULD_NOT_BE_USED(handle_call, State). |
| 86 | |
| 87 | code_change(_OldVsn, State, _Extra) -> |
| 88 | {ok, State}. |
| 89 | |
| 90 | %% ============================================================================ |
| 91 | %% gen_server callbacks |
| 92 | %% ============================================================================ |
| 93 | |
| 94 | init([ConsumerModule, ConsumerOptions]) -> |
| 95 | {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions), |
| 96 | State = #state |
| 97 | { consumer_module = ConsumerModule |
| 98 | , consumer_state = ConsumerState |
| 99 | , consumption_interval = ConsumptionInterval |
| 100 | , beam_stats_queue = queue:new() |
| 101 | }, |
| 102 | ok = beam_stats_producer:subscribe(self()), |
| 103 | ok = schedule_first_consumption(), |
| 104 | {ok, State}. |
| 105 | |
| 106 | terminate(_Reason, _State) -> |
| 107 | ok = beam_stats_producer:unsubscribe(self()). |
| 108 | |
| 109 | handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) -> |
| 110 | Q2 = queue:in(BEAMStats, Q1), |
| 111 | {noreply, State#state{beam_stats_queue = Q2}}. |
| 112 | |
| 113 | handle_info( |
| 114 | ?SIGNAL_CONSUMPTION, |
| 115 | #state |
| 116 | { consumer_module = ConsumerModule |
| 117 | , consumer_state = ConsumerState |
| 118 | , consumption_interval = ConsumptionInterval |
| 119 | , beam_stats_queue = Q |
| 120 | }=State1 |
| 121 | ) -> |
| 122 | State2 = State1#state |
| 123 | { consumer_state = ConsumerModule:consume(Q, ConsumerState) |
| 124 | , beam_stats_queue = queue:new() |
| 125 | }, |
| 126 | ok = schedule_next_consumption(ConsumptionInterval), |
| 127 | {noreply, State2}. |
| 128 | |
| 129 | %% ============================================================================ |
| 130 | %% Internal |
| 131 | %% ============================================================================ |
| 132 | |
| 133 | -spec schedule_first_consumption() -> |
| 134 | ok. |
| 135 | schedule_first_consumption() -> |
| 136 | _ = self() ! ?SIGNAL_CONSUMPTION, |
| 137 | ok. |
| 138 | |
| 139 | -spec schedule_next_consumption(non_neg_integer()) -> |
| 140 | ok. |
| 141 | schedule_next_consumption(Time) -> |
| 142 | _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION), |
| 143 | ok. |