Register consumer process name.
[beam_stats.git] / src / beam_stats_consumer.erl
CommitLineData
caf75ed8
SK
1-module(beam_stats_consumer).
2
3-include("include/beam_stats.hrl").
caf75ed8
SK
4
5-behaviour(gen_server).
6
7-export_type(
8 [ queue/0
9 ]).
10
11%% Public API
12-export(
13 [ add/2
2086842d
SK
14 , consume_sync/2
15 , consume_async/2
caf75ed8
SK
16 ]).
17
18%% Internal API
19-export(
20 [ start_link/2
21 ]).
22
23%% gen_server callbacks
24-export(
25 [ init/1
26 , handle_call/3
27 , handle_cast/2
28 , handle_info/2
29 , terminate/2
30 , code_change/3
31 ]).
32
33-type queue() ::
34 queue:queue(beam_stats:t()).
35
36%% ============================================================================
37%% Consumer interface
38%% ============================================================================
39
40-callback init(Options :: term()) ->
76aefffb 41 {ConsumptionInterval :: non_neg_integer(), State :: term()}.
caf75ed8
SK
42
43-callback consume(queue(), State) ->
44 State.
45
46-callback terminate(State :: term()) ->
47 {}.
48
49%% ============================================================================
50%% Internal data
51%% ============================================================================
52
2086842d
SK
53-define(FLUSH , flush).
54-define(CONSUME_SYNC , consume_sync).
55-define(CONSUME_ASYNC , consume_async).
caf75ed8
SK
56
57-record(state,
25ef8cda 58 { consumer_module :: module()
caf75ed8 59 , consumer_state :: term()
76aefffb 60 , consumption_interval :: non_neg_integer()
caf75ed8
SK
61 , beam_stats_queue :: queue()
62 }).
63
2086842d
SK
64-type state() ::
65 #state{}.
66
caf75ed8
SK
67%% ============================================================================
68%% Public API
69%% ============================================================================
70
25ef8cda 71-spec add(module(), term()) ->
caf75ed8
SK
72 supervisor:startchild_ret().
73add(ConsumerModule, ConsumerOptions) ->
74 beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
75
2086842d
SK
76-spec consume_sync(pid(), beam_stats:t()) ->
77 {}.
78consume_sync(PID, #beam_stats{}=BEAMStats) ->
79 {} = gen_server:call(PID, {?CONSUME_SYNC, BEAMStats}).
80
81-spec consume_async(pid(), beam_stats:t()) ->
82 {}.
83consume_async(PID, #beam_stats{}=BEAMStats) ->
84 ok = gen_server:cast(PID, {?CONSUME_ASYNC, BEAMStats}),
85 {}.
86
caf75ed8
SK
87%% ============================================================================
88%% Internal API
89%% ============================================================================
90
91start_link(ConsumerModule, ConsumerOptions) ->
010bff48 92 RegisteredName = ConsumerModule,
caf75ed8
SK
93 GenServerModule = ?MODULE,
94 GenServerOpts = [],
95 InitArgs = [ConsumerModule, ConsumerOptions],
010bff48 96 gen_server:start_link({local, RegisteredName}, GenServerModule, InitArgs, GenServerOpts).
caf75ed8
SK
97
98%% ============================================================================
99%% gen_server callbacks (unused)
100%% ============================================================================
101
caf75ed8
SK
102code_change(_OldVsn, State, _Extra) ->
103 {ok, State}.
104
105%% ============================================================================
106%% gen_server callbacks
107%% ============================================================================
108
109init([ConsumerModule, ConsumerOptions]) ->
110 {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions),
111 State = #state
112 { consumer_module = ConsumerModule
113 , consumer_state = ConsumerState
114 , consumption_interval = ConsumptionInterval
115 , beam_stats_queue = queue:new()
116 },
117 ok = beam_stats_producer:subscribe(self()),
2086842d 118 ok = schedule_first_flush(),
caf75ed8
SK
119 {ok, State}.
120
121terminate(_Reason, _State) ->
122 ok = beam_stats_producer:unsubscribe(self()).
123
2086842d
SK
124handle_call({?CONSUME_SYNC, #beam_stats{}=BEAMStats}, _, #state{}=State1) ->
125 State2 = consume_one(BEAMStats, State1),
126 {reply, {}, State2}.
caf75ed8 127
2086842d
SK
128handle_cast({?CONSUME_ASYNC, #beam_stats{}=BEAMStats}, #state{}=State1) ->
129 State2 = enqueue(BEAMStats, State1),
130 {noreply, State2}.
131
132handle_info(?FLUSH, #state{consumption_interval=ConsumptionInterval}=State1) ->
133 State2 = consume_all_queued(State1),
134 ok = schedule_next_flush(ConsumptionInterval),
caf75ed8
SK
135 {noreply, State2}.
136
137%% ============================================================================
138%% Internal
139%% ============================================================================
140
2086842d
SK
141-spec consume_one(beam_stats:t(), state()) ->
142 state().
143consume_one(#beam_stats{}=BEAMStats, #state{}=State1) ->
144 Q = queue:in(BEAMStats, queue:new()),
145 consume(Q, State1).
146
147-spec consume_all_queued(state()) ->
148 state().
149consume_all_queued(#state{beam_stats_queue=Q}=State1) ->
150 State2 = consume(Q, State1),
151 State2#state{beam_stats_queue = queue:new()}.
152
153-spec consume(queue(), state()) ->
154 state().
155consume(
156 Q,
157 #state
158 { consumer_module = ConsumerModule
159 , consumer_state = ConsumerState
160 }=State
161) ->
162 State#state{consumer_state = ConsumerModule:consume(Q, ConsumerState)}.
163
164-spec enqueue(beam_stats:t(), state()) ->
165 state().
166enqueue(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q}=State) ->
167 State#state{beam_stats_queue = queue:in(BEAMStats, Q)}.
168
169-spec schedule_first_flush() ->
caf75ed8 170 ok.
2086842d
SK
171schedule_first_flush() ->
172 _ = self() ! ?FLUSH,
caf75ed8
SK
173 ok.
174
2086842d 175-spec schedule_next_flush(non_neg_integer()) ->
caf75ed8 176 ok.
2086842d
SK
177schedule_next_flush(Time) ->
178 _ = erlang:send_after(Time, self(), ?FLUSH),
caf75ed8 179 ok.
This page took 0.037203 seconds and 4 git commands to generate.