Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_producer). |
2 | ||
caf75ed8 SK |
3 | -behaviour(gen_server). |
4 | ||
5 | %% API | |
6 | -export( | |
b2c364fd | 7 | [ start_link/1 |
caf75ed8 SK |
8 | , subscribe/1 |
9 | , unsubscribe/1 | |
4997ac36 | 10 | |
2086842d SK |
11 | % For testing |
12 | , sync_produce_consume/0 | |
caf75ed8 SK |
13 | ]). |
14 | ||
15 | %% gen_server callbacks | |
16 | -export( | |
17 | [ init/1 | |
18 | , handle_call/3 | |
19 | , handle_cast/2 | |
20 | , handle_info/2 | |
21 | , terminate/2 | |
22 | , code_change/3 | |
23 | ]). | |
24 | ||
25 | %% ============================================================================ | |
26 | %% Internal data | |
27 | %% ============================================================================ | |
28 | ||
2086842d SK |
29 | -define(PRODUCE_SYNC , produce_sync). |
30 | -define(PRODUCE_ASYNC , produce_async). | |
caf75ed8 SK |
31 | |
32 | -record(state, | |
33 | { consumers = ordsets:new() :: ordsets:ordset(pid()) | |
b2c364fd | 34 | , deltas_server :: beam_stats_delta:t() |
caf75ed8 SK |
35 | }). |
36 | ||
4997ac36 SK |
37 | -type state() :: |
38 | #state{}. | |
39 | ||
caf75ed8 SK |
40 | %% ============================================================================ |
41 | %% API | |
42 | %% ============================================================================ | |
43 | ||
b2c364fd SK |
44 | start_link(DeltasServer) -> |
45 | gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []). | |
caf75ed8 SK |
46 | |
47 | -spec subscribe(pid()) -> | |
48 | ok. | |
49 | subscribe(PID) -> | |
50 | gen_server:cast(?MODULE, {subscribe, PID}). | |
51 | ||
52 | -spec unsubscribe(pid()) -> | |
53 | ok. | |
54 | unsubscribe(PID) -> | |
55 | gen_server:cast(?MODULE, {unsubscribe, PID}). | |
56 | ||
2086842d | 57 | -spec sync_produce_consume() -> |
4997ac36 | 58 | {}. |
2086842d SK |
59 | sync_produce_consume() -> |
60 | {} = gen_server:call(?MODULE, ?PRODUCE_SYNC). | |
4997ac36 | 61 | |
caf75ed8 SK |
62 | %% ============================================================================ |
63 | %% gen_server callbacks (unused) | |
64 | %% ============================================================================ | |
65 | ||
caf75ed8 SK |
66 | code_change(_OldVsn, State, _Extra) -> |
67 | {ok, State}. | |
68 | ||
69 | terminate(_Reason, _State) -> | |
70 | ok. | |
71 | ||
72 | %% ============================================================================ | |
73 | %% gen_server callbacks | |
74 | %% ============================================================================ | |
75 | ||
b2c364fd | 76 | init(DeltasServer) -> |
caf75ed8 SK |
77 | ok = schedule_first_production(), |
78 | Consumers = ordsets:new(), | |
b2c364fd | 79 | {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}. |
caf75ed8 SK |
80 | |
81 | handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) -> | |
82 | Consumers2 = ordsets:add_element(PID, Consumers1), | |
83 | {noreply, State#state{consumers=Consumers2}}; | |
84 | ||
85 | handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) -> | |
86 | Consumers2 = ordsets:del_element(PID, Consumers1), | |
87 | {noreply, State#state{consumers=Consumers2}}. | |
88 | ||
2086842d SK |
89 | handle_call(?PRODUCE_SYNC, _From, State1) -> |
90 | State2 = produce_sync(State1), | |
b4e2333f | 91 | {reply, {}, State2}. |
4997ac36 | 92 | |
2086842d SK |
93 | handle_info(?PRODUCE_ASYNC, #state{}=State1) -> |
94 | State2 = produce_async(State1), | |
caf75ed8 | 95 | ok = schedule_next_production(), |
b4e2333f | 96 | {noreply, State2}. |
caf75ed8 SK |
97 | |
98 | %% ============================================================================ | |
99 | %% Private | |
100 | %% ============================================================================ | |
101 | ||
2086842d SK |
102 | -spec produce_sync(state()) -> |
103 | state(). | |
104 | produce_sync(#state{}=State) -> | |
105 | produce(State, fun beam_stats_consumer:consume_sync/2). | |
106 | ||
107 | -spec produce_async(state()) -> | |
b4e2333f | 108 | state(). |
2086842d SK |
109 | produce_async(#state{}=State) -> |
110 | produce(State, fun beam_stats_consumer:consume_async/2). | |
111 | ||
112 | -spec produce(state(), fun((pid(), term()) -> ok)) -> | |
113 | state(). | |
114 | produce( | |
115 | #state | |
116 | { consumers = ConsumersSet | |
b2c364fd | 117 | , deltas_server = DeltasServer |
2086842d SK |
118 | }=State, |
119 | MsgSendFun | |
120 | ) -> | |
b2c364fd | 121 | Stats = beam_stats:collect(DeltasServer), |
4997ac36 | 122 | ConsumersList = ordsets:to_list(ConsumersSet), |
2086842d SK |
123 | Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end, |
124 | ok = lists:foreach(Send, ConsumersList), | |
140077ce | 125 | beam_stats_delta:gc(DeltasServer), |
b2c364fd | 126 | State. |
4997ac36 | 127 | |
caf75ed8 SK |
128 | -spec schedule_first_production() -> |
129 | ok. | |
130 | schedule_first_production() -> | |
2086842d | 131 | _ = self() ! ?PRODUCE_ASYNC, |
caf75ed8 SK |
132 | ok. |
133 | ||
134 | -spec schedule_next_production() -> | |
135 | ok. | |
136 | schedule_next_production() -> | |
137 | ProductionInterval = beam_stats_config:production_interval(), | |
2086842d | 138 | _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC), |
caf75ed8 | 139 | ok. |