X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_producer.erl;h=c7ebd5e02987fea54dd8efc5f1cf37982d1e3159;hb=56baeef9aa992469400dc635b3914bdc368bbcbd;hp=5c47951a389cec9829e2e7cee349bfc5b9bd2e5c;hpb=caf75ed8160362773766c6bde005cf5f33544392;p=beam_stats.git diff --git a/src/beam_stats_producer.erl b/src/beam_stats_producer.erl index 5c47951..c7ebd5e 100644 --- a/src/beam_stats_producer.erl +++ b/src/beam_stats_producer.erl @@ -1,7 +1,5 @@ -module(beam_stats_producer). --include("beam_stats.hrl"). - -behaviour(gen_server). %% API @@ -9,6 +7,9 @@ [ start_link/0 , subscribe/1 , unsubscribe/1 + + % For testing + , sync_produce_consume/0 ]). %% gen_server callbacks @@ -25,12 +26,17 @@ %% Internal data %% ============================================================================ --define(SIGNAL_PRODUCTION , beam_stats_production_signal). +-define(PRODUCE_SYNC , produce_sync). +-define(PRODUCE_ASYNC , produce_async). -record(state, { consumers = ordsets:new() :: ordsets:ordset(pid()) + , stats_state :: beam_stats_state:t() }). +-type state() :: + #state{}. + %% ============================================================================ %% API %% ============================================================================ @@ -48,13 +54,15 @@ subscribe(PID) -> unsubscribe(PID) -> gen_server:cast(?MODULE, {unsubscribe, PID}). +-spec sync_produce_consume() -> + {}. +sync_produce_consume() -> + {} = gen_server:call(?MODULE, ?PRODUCE_SYNC). + %% ============================================================================ %% gen_server callbacks (unused) %% ============================================================================ -handle_call(_Request, _From, State) -> - ?METHOD_SHOULD_NOT_BE_USED(handle_call, State). - code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -68,7 +76,8 @@ terminate(_Reason, _State) -> init([]) -> ok = schedule_first_production(), Consumers = ordsets:new(), - {ok, #state{consumers=Consumers}}. + StatsState = beam_stats_state:new(), + {ok, #state{consumers=Consumers, stats_state=StatsState}}. handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) -> Consumers2 = ordsets:add_element(PID, Consumers1), @@ -78,32 +87,54 @@ handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) -> Consumers2 = ordsets:del_element(PID, Consumers1), {noreply, State#state{consumers=Consumers2}}. -handle_info(?SIGNAL_PRODUCTION, #state{consumers=ConsumersSet}=State) -> - ConsumersList = ordsets:to_list(ConsumersSet), - ok = collect_and_push_to_consumers(ConsumersList), +handle_call(?PRODUCE_SYNC, _From, State1) -> + State2 = produce_sync(State1), + {reply, {}, State2}. + +handle_info(?PRODUCE_ASYNC, #state{}=State1) -> + State2 = produce_async(State1), ok = schedule_next_production(), - {noreply, State}. + {noreply, State2}. %% ============================================================================ %% Private %% ============================================================================ +-spec produce_sync(state()) -> + state(). +produce_sync(#state{}=State) -> + produce(State, fun beam_stats_consumer:consume_sync/2). + +-spec produce_async(state()) -> + state(). +produce_async(#state{}=State) -> + produce(State, fun beam_stats_consumer:consume_async/2). + +-spec produce(state(), fun((pid(), term()) -> ok)) -> + state(). +produce( + #state + { consumers = ConsumersSet + , stats_state = StatsState1 + }=State, + MsgSendFun +) -> + StatsState2 = beam_stats_state:update(StatsState1), + Stats = beam_stats_state:export(StatsState2), + ConsumersList = ordsets:to_list(ConsumersSet), + Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end, + ok = lists:foreach(Send, ConsumersList), + State#state{stats_state = StatsState2}. + -spec schedule_first_production() -> ok. schedule_first_production() -> - _ = self() ! ?SIGNAL_PRODUCTION, + _ = self() ! ?PRODUCE_ASYNC, ok. -spec schedule_next_production() -> ok. schedule_next_production() -> ProductionInterval = beam_stats_config:production_interval(), - _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION), - ok. - --spec collect_and_push_to_consumers([pid()]) -> + _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC), ok. -collect_and_push_to_consumers(Consumers) -> - BEAMStats = beam_stats:collect(), - Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end, - lists:foreach(Push, Consumers).