Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_producer). |
2 | ||
caf75ed8 SK |
3 | -behaviour(gen_server). |
4 | ||
5 | %% API | |
6 | -export( | |
7 | [ start_link/0 | |
8 | , subscribe/1 | |
9 | , unsubscribe/1 | |
4997ac36 | 10 | |
2086842d SK |
11 | % For testing |
12 | , sync_produce_consume/0 | |
caf75ed8 SK |
13 | ]). |
14 | ||
15 | %% gen_server callbacks | |
16 | -export( | |
17 | [ init/1 | |
18 | , handle_call/3 | |
19 | , handle_cast/2 | |
20 | , handle_info/2 | |
21 | , terminate/2 | |
22 | , code_change/3 | |
23 | ]). | |
24 | ||
25 | %% ============================================================================ | |
26 | %% Internal data | |
27 | %% ============================================================================ | |
28 | ||
2086842d SK |
29 | -define(PRODUCE_SYNC , produce_sync). |
30 | -define(PRODUCE_ASYNC , produce_async). | |
caf75ed8 SK |
31 | |
32 | -record(state, | |
33 | { consumers = ordsets:new() :: ordsets:ordset(pid()) | |
b4e2333f | 34 | , stats_state :: beam_stats_state:t() |
caf75ed8 SK |
35 | }). |
36 | ||
4997ac36 SK |
37 | -type state() :: |
38 | #state{}. | |
39 | ||
caf75ed8 SK |
40 | %% ============================================================================ |
41 | %% API | |
42 | %% ============================================================================ | |
43 | ||
44 | start_link() -> | |
45 | gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). | |
46 | ||
47 | -spec subscribe(pid()) -> | |
48 | ok. | |
49 | subscribe(PID) -> | |
50 | gen_server:cast(?MODULE, {subscribe, PID}). | |
51 | ||
52 | -spec unsubscribe(pid()) -> | |
53 | ok. | |
54 | unsubscribe(PID) -> | |
55 | gen_server:cast(?MODULE, {unsubscribe, PID}). | |
56 | ||
2086842d | 57 | -spec sync_produce_consume() -> |
4997ac36 | 58 | {}. |
2086842d SK |
59 | sync_produce_consume() -> |
60 | {} = gen_server:call(?MODULE, ?PRODUCE_SYNC). | |
4997ac36 | 61 | |
caf75ed8 SK |
62 | %% ============================================================================ |
63 | %% gen_server callbacks (unused) | |
64 | %% ============================================================================ | |
65 | ||
caf75ed8 SK |
66 | code_change(_OldVsn, State, _Extra) -> |
67 | {ok, State}. | |
68 | ||
69 | terminate(_Reason, _State) -> | |
70 | ok. | |
71 | ||
72 | %% ============================================================================ | |
73 | %% gen_server callbacks | |
74 | %% ============================================================================ | |
75 | ||
76 | init([]) -> | |
77 | ok = schedule_first_production(), | |
78 | Consumers = ordsets:new(), | |
b4e2333f SK |
79 | StatsState = beam_stats_state:new(), |
80 | {ok, #state{consumers=Consumers, stats_state=StatsState}}. | |
caf75ed8 SK |
81 | |
82 | handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) -> | |
83 | Consumers2 = ordsets:add_element(PID, Consumers1), | |
84 | {noreply, State#state{consumers=Consumers2}}; | |
85 | ||
86 | handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) -> | |
87 | Consumers2 = ordsets:del_element(PID, Consumers1), | |
88 | {noreply, State#state{consumers=Consumers2}}. | |
89 | ||
2086842d SK |
90 | handle_call(?PRODUCE_SYNC, _From, State1) -> |
91 | State2 = produce_sync(State1), | |
b4e2333f | 92 | {reply, {}, State2}. |
4997ac36 | 93 | |
2086842d SK |
94 | handle_info(?PRODUCE_ASYNC, #state{}=State1) -> |
95 | State2 = produce_async(State1), | |
caf75ed8 | 96 | ok = schedule_next_production(), |
b4e2333f | 97 | {noreply, State2}. |
caf75ed8 SK |
98 | |
99 | %% ============================================================================ | |
100 | %% Private | |
101 | %% ============================================================================ | |
102 | ||
2086842d SK |
103 | -spec produce_sync(state()) -> |
104 | state(). | |
105 | produce_sync(#state{}=State) -> | |
106 | produce(State, fun beam_stats_consumer:consume_sync/2). | |
107 | ||
108 | -spec produce_async(state()) -> | |
b4e2333f | 109 | state(). |
2086842d SK |
110 | produce_async(#state{}=State) -> |
111 | produce(State, fun beam_stats_consumer:consume_async/2). | |
112 | ||
113 | -spec produce(state(), fun((pid(), term()) -> ok)) -> | |
114 | state(). | |
115 | produce( | |
116 | #state | |
117 | { consumers = ConsumersSet | |
118 | , stats_state = StatsState1 | |
119 | }=State, | |
120 | MsgSendFun | |
121 | ) -> | |
b4e2333f SK |
122 | StatsState2 = beam_stats_state:update(StatsState1), |
123 | Stats = beam_stats_state:export(StatsState2), | |
4997ac36 | 124 | ConsumersList = ordsets:to_list(ConsumersSet), |
2086842d SK |
125 | Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end, |
126 | ok = lists:foreach(Send, ConsumersList), | |
b4e2333f | 127 | State#state{stats_state = StatsState2}. |
4997ac36 | 128 | |
caf75ed8 SK |
129 | -spec schedule_first_production() -> |
130 | ok. | |
131 | schedule_first_production() -> | |
2086842d | 132 | _ = self() ! ?PRODUCE_ASYNC, |
caf75ed8 SK |
133 | ok. |
134 | ||
135 | -spec schedule_next_production() -> | |
136 | ok. | |
137 | schedule_next_production() -> | |
138 | ProductionInterval = beam_stats_config:production_interval(), | |
2086842d | 139 | _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC), |
caf75ed8 | 140 | ok. |