Track IO deltas.
[beam_stats.git] / src / beam_stats_producer.erl
1 -module(beam_stats_producer).
2
3 -include("beam_stats.hrl").
4
5 -behaviour(gen_server).
6
7 %% API
8 -export(
9 [ start_link/0
10 , subscribe/1
11 , unsubscribe/1
12
13 % Force production and distribution. Blocks. Mainly for testing.
14 , force_production/0
15 ]).
16
17 %% gen_server callbacks
18 -export(
19 [ init/1
20 , handle_call/3
21 , handle_cast/2
22 , handle_info/2
23 , terminate/2
24 , code_change/3
25 ]).
26
27 %% ============================================================================
28 %% Internal data
29 %% ============================================================================
30
31 -define(SIGNAL_PRODUCTION , beam_stats_production_signal).
32 -define(FORCE_PRODUCTION , beam_stats_force_production).
33
34 -record(state,
35 { consumers = ordsets:new() :: ordsets:ordset(pid())
36 , stats_state :: beam_stats_state:t()
37 }).
38
39 -type state() ::
40 #state{}.
41
42 %% ============================================================================
43 %% API
44 %% ============================================================================
45
46 start_link() ->
47 gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
48
49 -spec subscribe(pid()) ->
50 ok.
51 subscribe(PID) ->
52 gen_server:cast(?MODULE, {subscribe, PID}).
53
54 -spec unsubscribe(pid()) ->
55 ok.
56 unsubscribe(PID) ->
57 gen_server:cast(?MODULE, {unsubscribe, PID}).
58
59 -spec force_production() ->
60 {}.
61 force_production() ->
62 gen_server:call(?MODULE, ?FORCE_PRODUCTION).
63
64 %% ============================================================================
65 %% gen_server callbacks (unused)
66 %% ============================================================================
67
68 code_change(_OldVsn, State, _Extra) ->
69 {ok, State}.
70
71 terminate(_Reason, _State) ->
72 ok.
73
74 %% ============================================================================
75 %% gen_server callbacks
76 %% ============================================================================
77
78 init([]) ->
79 ok = schedule_first_production(),
80 Consumers = ordsets:new(),
81 StatsState = beam_stats_state:new(),
82 {ok, #state{consumers=Consumers, stats_state=StatsState}}.
83
84 handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
85 Consumers2 = ordsets:add_element(PID, Consumers1),
86 {noreply, State#state{consumers=Consumers2}};
87
88 handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) ->
89 Consumers2 = ordsets:del_element(PID, Consumers1),
90 {noreply, State#state{consumers=Consumers2}}.
91
92 handle_call(?FORCE_PRODUCTION, _From, State1) ->
93 State2 = produce(State1),
94 {reply, {}, State2}.
95
96 handle_info(?SIGNAL_PRODUCTION, #state{}=State1) ->
97 State2 = produce(State1),
98 ok = schedule_next_production(),
99 {noreply, State2}.
100
101 %% ============================================================================
102 %% Private
103 %% ============================================================================
104
105 -spec produce(state()) ->
106 state().
107 produce(#state{consumers=ConsumersSet, stats_state=StatsState1}=State) ->
108 StatsState2 = beam_stats_state:update(StatsState1),
109 Stats = beam_stats_state:export(StatsState2),
110 ConsumersList = ordsets:to_list(ConsumersSet),
111 ok = push_to_consumers(Stats, ConsumersList),
112 State#state{stats_state = StatsState2}.
113
114 -spec schedule_first_production() ->
115 ok.
116 schedule_first_production() ->
117 _ = self() ! ?SIGNAL_PRODUCTION,
118 ok.
119
120 -spec schedule_next_production() ->
121 ok.
122 schedule_next_production() ->
123 ProductionInterval = beam_stats_config:production_interval(),
124 _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION),
125 ok.
126
127 -spec push_to_consumers(beam_stats:t(), [pid()]) ->
128 ok.
129 push_to_consumers(Stats, Consumers) ->
130 Push = fun (Consumer) -> gen_server:cast(Consumer, Stats) end,
131 lists:foreach(Push, Consumers).
This page took 0.070926 seconds and 4 git commands to generate.