Use module() instead of atom() where applicable
[beam_stats.git] / src / beam_stats_consumer.erl
CommitLineData
caf75ed8
SK
1-module(beam_stats_consumer).
2
3-include("include/beam_stats.hrl").
4-include( "beam_stats.hrl").
5
6-behaviour(gen_server).
7
8-export_type(
9 [ queue/0
10 ]).
11
12%% Public API
13-export(
14 [ add/2
15 ]).
16
17%% Internal API
18-export(
19 [ start_link/2
20 ]).
21
22%% gen_server callbacks
23-export(
24 [ init/1
25 , handle_call/3
26 , handle_cast/2
27 , handle_info/2
28 , terminate/2
29 , code_change/3
30 ]).
31
32-type queue() ::
33 queue:queue(beam_stats:t()).
34
35%% ============================================================================
36%% Consumer interface
37%% ============================================================================
38
39-callback init(Options :: term()) ->
76aefffb 40 {ConsumptionInterval :: non_neg_integer(), State :: term()}.
caf75ed8
SK
41
42-callback consume(queue(), State) ->
43 State.
44
45-callback terminate(State :: term()) ->
46 {}.
47
48%% ============================================================================
49%% Internal data
50%% ============================================================================
51
52-define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal).
53
54-record(state,
25ef8cda 55 { consumer_module :: module()
caf75ed8 56 , consumer_state :: term()
76aefffb 57 , consumption_interval :: non_neg_integer()
caf75ed8
SK
58 , beam_stats_queue :: queue()
59 }).
60
61%% ============================================================================
62%% Public API
63%% ============================================================================
64
25ef8cda 65-spec add(module(), term()) ->
caf75ed8
SK
66 supervisor:startchild_ret().
67add(ConsumerModule, ConsumerOptions) ->
68 beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
69
70%% ============================================================================
71%% Internal API
72%% ============================================================================
73
74start_link(ConsumerModule, ConsumerOptions) ->
75 GenServerModule = ?MODULE,
76 GenServerOpts = [],
77 InitArgs = [ConsumerModule, ConsumerOptions],
78 gen_server:start_link(GenServerModule, InitArgs, GenServerOpts).
79
80%% ============================================================================
81%% gen_server callbacks (unused)
82%% ============================================================================
83
84handle_call(_Request, _From, State) ->
85 ?METHOD_SHOULD_NOT_BE_USED(handle_call, State).
86
87code_change(_OldVsn, State, _Extra) ->
88 {ok, State}.
89
90%% ============================================================================
91%% gen_server callbacks
92%% ============================================================================
93
94init([ConsumerModule, ConsumerOptions]) ->
95 {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions),
96 State = #state
97 { consumer_module = ConsumerModule
98 , consumer_state = ConsumerState
99 , consumption_interval = ConsumptionInterval
100 , beam_stats_queue = queue:new()
101 },
102 ok = beam_stats_producer:subscribe(self()),
103 ok = schedule_first_consumption(),
104 {ok, State}.
105
106terminate(_Reason, _State) ->
107 ok = beam_stats_producer:unsubscribe(self()).
108
109handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) ->
110 Q2 = queue:in(BEAMStats, Q1),
111 {noreply, State#state{beam_stats_queue = Q2}}.
112
113handle_info(
114 ?SIGNAL_CONSUMPTION,
115 #state
116 { consumer_module = ConsumerModule
117 , consumer_state = ConsumerState
118 , consumption_interval = ConsumptionInterval
119 , beam_stats_queue = Q
120 }=State1
121) ->
122 State2 = State1#state
123 { consumer_state = ConsumerModule:consume(Q, ConsumerState)
124 , beam_stats_queue = queue:new()
125 },
126 ok = schedule_next_consumption(ConsumptionInterval),
127 {noreply, State2}.
128
129%% ============================================================================
130%% Internal
131%% ============================================================================
132
133-spec schedule_first_consumption() ->
134 ok.
135schedule_first_consumption() ->
136 _ = self() ! ?SIGNAL_CONSUMPTION,
137 ok.
138
76aefffb 139-spec schedule_next_consumption(non_neg_integer()) ->
caf75ed8
SK
140 ok.
141schedule_next_consumption(Time) ->
142 _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION),
143 ok.
This page took 0.033397 seconds and 4 git commands to generate.