Commit | Line | Data |
---|---|---|
caf75ed8 SK |
1 | -module(beam_stats_producer). |
2 | ||
3 | -include("beam_stats.hrl"). | |
4 | ||
5 | -behaviour(gen_server). | |
6 | ||
7 | %% API | |
8 | -export( | |
9 | [ start_link/0 | |
10 | , subscribe/1 | |
11 | , unsubscribe/1 | |
4997ac36 SK |
12 | |
13 | % Force production and distribution. Blocks. Mainly for testing. | |
14 | , force_production/0 | |
caf75ed8 SK |
15 | ]). |
16 | ||
17 | %% gen_server callbacks | |
18 | -export( | |
19 | [ init/1 | |
20 | , handle_call/3 | |
21 | , handle_cast/2 | |
22 | , handle_info/2 | |
23 | , terminate/2 | |
24 | , code_change/3 | |
25 | ]). | |
26 | ||
27 | %% ============================================================================ | |
28 | %% Internal data | |
29 | %% ============================================================================ | |
30 | ||
31 | -define(SIGNAL_PRODUCTION , beam_stats_production_signal). | |
4997ac36 | 32 | -define(FORCE_PRODUCTION , beam_stats_force_production). |
caf75ed8 SK |
33 | |
34 | -record(state, | |
35 | { consumers = ordsets:new() :: ordsets:ordset(pid()) | |
36 | }). | |
37 | ||
4997ac36 SK |
38 | -type state() :: |
39 | #state{}. | |
40 | ||
caf75ed8 SK |
41 | %% ============================================================================ |
42 | %% API | |
43 | %% ============================================================================ | |
44 | ||
45 | start_link() -> | |
46 | gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). | |
47 | ||
48 | -spec subscribe(pid()) -> | |
49 | ok. | |
50 | subscribe(PID) -> | |
51 | gen_server:cast(?MODULE, {subscribe, PID}). | |
52 | ||
53 | -spec unsubscribe(pid()) -> | |
54 | ok. | |
55 | unsubscribe(PID) -> | |
56 | gen_server:cast(?MODULE, {unsubscribe, PID}). | |
57 | ||
4997ac36 SK |
58 | -spec force_production() -> |
59 | {}. | |
60 | force_production() -> | |
61 | gen_server:call(?MODULE, ?FORCE_PRODUCTION). | |
62 | ||
caf75ed8 SK |
63 | %% ============================================================================ |
64 | %% gen_server callbacks (unused) | |
65 | %% ============================================================================ | |
66 | ||
caf75ed8 SK |
67 | code_change(_OldVsn, State, _Extra) -> |
68 | {ok, State}. | |
69 | ||
70 | terminate(_Reason, _State) -> | |
71 | ok. | |
72 | ||
73 | %% ============================================================================ | |
74 | %% gen_server callbacks | |
75 | %% ============================================================================ | |
76 | ||
77 | init([]) -> | |
78 | ok = schedule_first_production(), | |
79 | Consumers = ordsets:new(), | |
80 | {ok, #state{consumers=Consumers}}. | |
81 | ||
82 | handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) -> | |
83 | Consumers2 = ordsets:add_element(PID, Consumers1), | |
84 | {noreply, State#state{consumers=Consumers2}}; | |
85 | ||
86 | handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) -> | |
87 | Consumers2 = ordsets:del_element(PID, Consumers1), | |
88 | {noreply, State#state{consumers=Consumers2}}. | |
89 | ||
4997ac36 SK |
90 | handle_call(?FORCE_PRODUCTION, _From, State) -> |
91 | {} = produce(State), | |
92 | {reply, {}, State}. | |
93 | ||
94 | handle_info(?SIGNAL_PRODUCTION, #state{}=State) -> | |
95 | {} = produce(State), | |
caf75ed8 SK |
96 | ok = schedule_next_production(), |
97 | {noreply, State}. | |
98 | ||
99 | %% ============================================================================ | |
100 | %% Private | |
101 | %% ============================================================================ | |
102 | ||
4997ac36 SK |
103 | -spec produce(state()) -> |
104 | {}. | |
105 | produce(#state{consumers=ConsumersSet}) -> | |
106 | ConsumersList = ordsets:to_list(ConsumersSet), | |
107 | ok = collect_and_push_to_consumers(ConsumersList), | |
108 | {}. | |
109 | ||
caf75ed8 SK |
110 | -spec schedule_first_production() -> |
111 | ok. | |
112 | schedule_first_production() -> | |
113 | _ = self() ! ?SIGNAL_PRODUCTION, | |
114 | ok. | |
115 | ||
116 | -spec schedule_next_production() -> | |
117 | ok. | |
118 | schedule_next_production() -> | |
119 | ProductionInterval = beam_stats_config:production_interval(), | |
120 | _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION), | |
121 | ok. | |
122 | ||
123 | -spec collect_and_push_to_consumers([pid()]) -> | |
124 | ok. | |
125 | collect_and_push_to_consumers(Consumers) -> | |
126 | BEAMStats = beam_stats:collect(), | |
127 | Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end, | |
128 | lists:foreach(Push, Consumers). |