Use module() instead of atom() where applicable
[beam_stats.git] / src / beam_stats_consumer.erl
1 -module(beam_stats_consumer).
2
3 -include("include/beam_stats.hrl").
4 -include( "beam_stats.hrl").
5
6 -behaviour(gen_server).
7
8 -export_type(
9 [ queue/0
10 ]).
11
12 %% Public API
13 -export(
14 [ add/2
15 ]).
16
17 %% Internal API
18 -export(
19 [ start_link/2
20 ]).
21
22 %% gen_server callbacks
23 -export(
24 [ init/1
25 , handle_call/3
26 , handle_cast/2
27 , handle_info/2
28 , terminate/2
29 , code_change/3
30 ]).
31
32 -type queue() ::
33 queue:queue(beam_stats:t()).
34
35 %% ============================================================================
36 %% Consumer interface
37 %% ============================================================================
38
39 -callback init(Options :: term()) ->
40 {ConsumptionInterval :: non_neg_integer(), State :: term()}.
41
42 -callback consume(queue(), State) ->
43 State.
44
45 -callback terminate(State :: term()) ->
46 {}.
47
48 %% ============================================================================
49 %% Internal data
50 %% ============================================================================
51
52 -define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal).
53
54 -record(state,
55 { consumer_module :: module()
56 , consumer_state :: term()
57 , consumption_interval :: non_neg_integer()
58 , beam_stats_queue :: queue()
59 }).
60
61 %% ============================================================================
62 %% Public API
63 %% ============================================================================
64
65 -spec add(module(), term()) ->
66 supervisor:startchild_ret().
67 add(ConsumerModule, ConsumerOptions) ->
68 beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
69
70 %% ============================================================================
71 %% Internal API
72 %% ============================================================================
73
74 start_link(ConsumerModule, ConsumerOptions) ->
75 GenServerModule = ?MODULE,
76 GenServerOpts = [],
77 InitArgs = [ConsumerModule, ConsumerOptions],
78 gen_server:start_link(GenServerModule, InitArgs, GenServerOpts).
79
80 %% ============================================================================
81 %% gen_server callbacks (unused)
82 %% ============================================================================
83
84 handle_call(_Request, _From, State) ->
85 ?METHOD_SHOULD_NOT_BE_USED(handle_call, State).
86
87 code_change(_OldVsn, State, _Extra) ->
88 {ok, State}.
89
90 %% ============================================================================
91 %% gen_server callbacks
92 %% ============================================================================
93
94 init([ConsumerModule, ConsumerOptions]) ->
95 {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions),
96 State = #state
97 { consumer_module = ConsumerModule
98 , consumer_state = ConsumerState
99 , consumption_interval = ConsumptionInterval
100 , beam_stats_queue = queue:new()
101 },
102 ok = beam_stats_producer:subscribe(self()),
103 ok = schedule_first_consumption(),
104 {ok, State}.
105
106 terminate(_Reason, _State) ->
107 ok = beam_stats_producer:unsubscribe(self()).
108
109 handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) ->
110 Q2 = queue:in(BEAMStats, Q1),
111 {noreply, State#state{beam_stats_queue = Q2}}.
112
113 handle_info(
114 ?SIGNAL_CONSUMPTION,
115 #state
116 { consumer_module = ConsumerModule
117 , consumer_state = ConsumerState
118 , consumption_interval = ConsumptionInterval
119 , beam_stats_queue = Q
120 }=State1
121 ) ->
122 State2 = State1#state
123 { consumer_state = ConsumerModule:consume(Q, ConsumerState)
124 , beam_stats_queue = queue:new()
125 },
126 ok = schedule_next_consumption(ConsumptionInterval),
127 {noreply, State2}.
128
129 %% ============================================================================
130 %% Internal
131 %% ============================================================================
132
133 -spec schedule_first_consumption() ->
134 ok.
135 schedule_first_consumption() ->
136 _ = self() ! ?SIGNAL_CONSUMPTION,
137 ok.
138
139 -spec schedule_next_consumption(non_neg_integer()) ->
140 ok.
141 schedule_next_consumption(Time) ->
142 _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION),
143 ok.
This page took 0.050352 seconds and 4 git commands to generate.