Implement deltas server, replacing beam_stats_state.
[beam_stats.git] / src / beam_stats_producer.erl
CommitLineData
caf75ed8
SK
1-module(beam_stats_producer).
2
caf75ed8
SK
3-behaviour(gen_server).
4
5%% API
6-export(
b2c364fd 7 [ start_link/1
caf75ed8
SK
8 , subscribe/1
9 , unsubscribe/1
4997ac36 10
2086842d
SK
11 % For testing
12 , sync_produce_consume/0
caf75ed8
SK
13 ]).
14
15%% gen_server callbacks
16-export(
17 [ init/1
18 , handle_call/3
19 , handle_cast/2
20 , handle_info/2
21 , terminate/2
22 , code_change/3
23 ]).
24
25%% ============================================================================
26%% Internal data
27%% ============================================================================
28
2086842d
SK
29-define(PRODUCE_SYNC , produce_sync).
30-define(PRODUCE_ASYNC , produce_async).
caf75ed8
SK
31
32-record(state,
33 { consumers = ordsets:new() :: ordsets:ordset(pid())
b2c364fd 34 , deltas_server :: beam_stats_delta:t()
caf75ed8
SK
35 }).
36
4997ac36
SK
37-type state() ::
38 #state{}.
39
caf75ed8
SK
40%% ============================================================================
41%% API
42%% ============================================================================
43
b2c364fd
SK
44start_link(DeltasServer) ->
45 gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []).
caf75ed8
SK
46
47-spec subscribe(pid()) ->
48 ok.
49subscribe(PID) ->
50 gen_server:cast(?MODULE, {subscribe, PID}).
51
52-spec unsubscribe(pid()) ->
53 ok.
54unsubscribe(PID) ->
55 gen_server:cast(?MODULE, {unsubscribe, PID}).
56
2086842d 57-spec sync_produce_consume() ->
4997ac36 58 {}.
2086842d
SK
59sync_produce_consume() ->
60 {} = gen_server:call(?MODULE, ?PRODUCE_SYNC).
4997ac36 61
caf75ed8
SK
62%% ============================================================================
63%% gen_server callbacks (unused)
64%% ============================================================================
65
caf75ed8
SK
66code_change(_OldVsn, State, _Extra) ->
67 {ok, State}.
68
69terminate(_Reason, _State) ->
70 ok.
71
72%% ============================================================================
73%% gen_server callbacks
74%% ============================================================================
75
b2c364fd 76init(DeltasServer) ->
caf75ed8
SK
77 ok = schedule_first_production(),
78 Consumers = ordsets:new(),
b2c364fd 79 {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}.
caf75ed8
SK
80
81handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
82 Consumers2 = ordsets:add_element(PID, Consumers1),
83 {noreply, State#state{consumers=Consumers2}};
84
85handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) ->
86 Consumers2 = ordsets:del_element(PID, Consumers1),
87 {noreply, State#state{consumers=Consumers2}}.
88
2086842d
SK
89handle_call(?PRODUCE_SYNC, _From, State1) ->
90 State2 = produce_sync(State1),
b4e2333f 91 {reply, {}, State2}.
4997ac36 92
2086842d
SK
93handle_info(?PRODUCE_ASYNC, #state{}=State1) ->
94 State2 = produce_async(State1),
caf75ed8 95 ok = schedule_next_production(),
b4e2333f 96 {noreply, State2}.
caf75ed8
SK
97
98%% ============================================================================
99%% Private
100%% ============================================================================
101
2086842d
SK
102-spec produce_sync(state()) ->
103 state().
104produce_sync(#state{}=State) ->
105 produce(State, fun beam_stats_consumer:consume_sync/2).
106
107-spec produce_async(state()) ->
b4e2333f 108 state().
2086842d
SK
109produce_async(#state{}=State) ->
110 produce(State, fun beam_stats_consumer:consume_async/2).
111
112-spec produce(state(), fun((pid(), term()) -> ok)) ->
113 state().
114produce(
115 #state
116 { consumers = ConsumersSet
b2c364fd 117 , deltas_server = DeltasServer
2086842d
SK
118 }=State,
119 MsgSendFun
120) ->
b2c364fd 121 Stats = beam_stats:collect(DeltasServer),
4997ac36 122 ConsumersList = ordsets:to_list(ConsumersSet),
2086842d
SK
123 Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end,
124 ok = lists:foreach(Send, ConsumersList),
b2c364fd 125 State.
4997ac36 126
caf75ed8
SK
127-spec schedule_first_production() ->
128 ok.
129schedule_first_production() ->
2086842d 130 _ = self() ! ?PRODUCE_ASYNC,
caf75ed8
SK
131 ok.
132
133-spec schedule_next_production() ->
134 ok.
135schedule_next_production() ->
136 ProductionInterval = beam_stats_config:production_interval(),
2086842d 137 _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC),
caf75ed8 138 ok.
This page took 0.028269 seconds and 4 git commands to generate.