X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_producer.erl;h=e27f9a301ca1c2cd9f0a86d742d41a26aacabb67;hb=140077ceda11a34f4880fd2f4871b3aaab34f13d;hp=67765d377f995570995e529181c6f1b99f2bbd62;hpb=b4e2333fc5fd9f32c8a0a39db4c6faacdbb15a91;p=beam_stats.git diff --git a/src/beam_stats_producer.erl b/src/beam_stats_producer.erl index 67765d3..e27f9a3 100644 --- a/src/beam_stats_producer.erl +++ b/src/beam_stats_producer.erl @@ -1,17 +1,15 @@ -module(beam_stats_producer). --include("beam_stats.hrl"). - -behaviour(gen_server). %% API -export( - [ start_link/0 + [ start_link/1 , subscribe/1 , unsubscribe/1 - % Force production and distribution. Blocks. Mainly for testing. - , force_production/0 + % For testing + , sync_produce_consume/0 ]). %% gen_server callbacks @@ -28,12 +26,12 @@ %% Internal data %% ============================================================================ --define(SIGNAL_PRODUCTION , beam_stats_production_signal). --define(FORCE_PRODUCTION , beam_stats_force_production). +-define(PRODUCE_SYNC , produce_sync). +-define(PRODUCE_ASYNC , produce_async). -record(state, { consumers = ordsets:new() :: ordsets:ordset(pid()) - , stats_state :: beam_stats_state:t() + , deltas_server :: beam_stats_delta:t() }). -type state() :: @@ -43,8 +41,8 @@ %% API %% ============================================================================ -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(DeltasServer) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, DeltasServer, []). -spec subscribe(pid()) -> ok. @@ -56,10 +54,10 @@ subscribe(PID) -> unsubscribe(PID) -> gen_server:cast(?MODULE, {unsubscribe, PID}). --spec force_production() -> +-spec sync_produce_consume() -> {}. -force_production() -> - gen_server:call(?MODULE, ?FORCE_PRODUCTION). +sync_produce_consume() -> + {} = gen_server:call(?MODULE, ?PRODUCE_SYNC). %% ============================================================================ %% gen_server callbacks (unused) @@ -75,11 +73,10 @@ terminate(_Reason, _State) -> %% gen_server callbacks %% ============================================================================ -init([]) -> +init(DeltasServer) -> ok = schedule_first_production(), Consumers = ordsets:new(), - StatsState = beam_stats_state:new(), - {ok, #state{consumers=Consumers, stats_state=StatsState}}. + {ok, #state{consumers=Consumers, deltas_server=DeltasServer}}. handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) -> Consumers2 = ordsets:add_element(PID, Consumers1), @@ -89,12 +86,12 @@ handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) -> Consumers2 = ordsets:del_element(PID, Consumers1), {noreply, State#state{consumers=Consumers2}}. -handle_call(?FORCE_PRODUCTION, _From, State1) -> - State2 = produce(State1), +handle_call(?PRODUCE_SYNC, _From, State1) -> + State2 = produce_sync(State1), {reply, {}, State2}. -handle_info(?SIGNAL_PRODUCTION, #state{}=State1) -> - State2 = produce(State1), +handle_info(?PRODUCE_ASYNC, #state{}=State1) -> + State2 = produce_async(State1), ok = schedule_next_production(), {noreply, State2}. @@ -102,30 +99,41 @@ handle_info(?SIGNAL_PRODUCTION, #state{}=State1) -> %% Private %% ============================================================================ --spec produce(state()) -> +-spec produce_sync(state()) -> + state(). +produce_sync(#state{}=State) -> + produce(State, fun beam_stats_consumer:consume_sync/2). + +-spec produce_async(state()) -> state(). -produce(#state{consumers=ConsumersSet, stats_state=StatsState1}=State) -> - StatsState2 = beam_stats_state:update(StatsState1), - Stats = beam_stats_state:export(StatsState2), +produce_async(#state{}=State) -> + produce(State, fun beam_stats_consumer:consume_async/2). + +-spec produce(state(), fun((pid(), term()) -> ok)) -> + state(). +produce( + #state + { consumers = ConsumersSet + , deltas_server = DeltasServer + }=State, + MsgSendFun +) -> + Stats = beam_stats:collect(DeltasServer), ConsumersList = ordsets:to_list(ConsumersSet), - ok = push_to_consumers(Stats, ConsumersList), - State#state{stats_state = StatsState2}. + Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end, + ok = lists:foreach(Send, ConsumersList), + beam_stats_delta:gc(DeltasServer), + State. -spec schedule_first_production() -> ok. schedule_first_production() -> - _ = self() ! ?SIGNAL_PRODUCTION, + _ = self() ! ?PRODUCE_ASYNC, ok. -spec schedule_next_production() -> ok. schedule_next_production() -> ProductionInterval = beam_stats_config:production_interval(), - _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION), - ok. - --spec push_to_consumers(beam_stats:t(), [pid()]) -> + _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC), ok. -push_to_consumers(Stats, Consumers) -> - Push = fun (Consumer) -> gen_server:cast(Consumer, Stats) end, - lists:foreach(Push, Consumers).