X-Git-Url: https://git.xandkar.net/?a=blobdiff_plain;f=src%2Fbeam_stats_consumer.erl;h=3c5ce1d1d30fd2ea67fd391199974fbfdd6f06db;hb=010bff4874ddc0eee49d65edd03bcfbe0a702198;hp=eb8cbb7d2094b8ae8dca88c748ceca48a0f52295;hpb=25ef8cdae5c39135121c6b617fea747315dea3ff;p=beam_stats.git diff --git a/src/beam_stats_consumer.erl b/src/beam_stats_consumer.erl index eb8cbb7..3c5ce1d 100644 --- a/src/beam_stats_consumer.erl +++ b/src/beam_stats_consumer.erl @@ -1,7 +1,6 @@ -module(beam_stats_consumer). -include("include/beam_stats.hrl"). --include( "beam_stats.hrl"). -behaviour(gen_server). @@ -12,6 +11,8 @@ %% Public API -export( [ add/2 + , consume_sync/2 + , consume_async/2 ]). %% Internal API @@ -49,7 +50,9 @@ %% Internal data %% ============================================================================ --define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal). +-define(FLUSH , flush). +-define(CONSUME_SYNC , consume_sync). +-define(CONSUME_ASYNC , consume_async). -record(state, { consumer_module :: module() @@ -58,6 +61,9 @@ , beam_stats_queue :: queue() }). +-type state() :: + #state{}. + %% ============================================================================ %% Public API %% ============================================================================ @@ -67,23 +73,32 @@ add(ConsumerModule, ConsumerOptions) -> beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions). +-spec consume_sync(pid(), beam_stats:t()) -> + {}. +consume_sync(PID, #beam_stats{}=BEAMStats) -> + {} = gen_server:call(PID, {?CONSUME_SYNC, BEAMStats}). + +-spec consume_async(pid(), beam_stats:t()) -> + {}. +consume_async(PID, #beam_stats{}=BEAMStats) -> + ok = gen_server:cast(PID, {?CONSUME_ASYNC, BEAMStats}), + {}. + %% ============================================================================ %% Internal API %% ============================================================================ start_link(ConsumerModule, ConsumerOptions) -> + RegisteredName = ConsumerModule, GenServerModule = ?MODULE, GenServerOpts = [], InitArgs = [ConsumerModule, ConsumerOptions], - gen_server:start_link(GenServerModule, InitArgs, GenServerOpts). + gen_server:start_link({local, RegisteredName}, GenServerModule, InitArgs, GenServerOpts). %% ============================================================================ %% gen_server callbacks (unused) %% ============================================================================ -handle_call(_Request, _From, State) -> - ?METHOD_SHOULD_NOT_BE_USED(handle_call, State). - code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -100,44 +115,65 @@ init([ConsumerModule, ConsumerOptions]) -> , beam_stats_queue = queue:new() }, ok = beam_stats_producer:subscribe(self()), - ok = schedule_first_consumption(), + ok = schedule_first_flush(), {ok, State}. terminate(_Reason, _State) -> ok = beam_stats_producer:unsubscribe(self()). -handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) -> - Q2 = queue:in(BEAMStats, Q1), - {noreply, State#state{beam_stats_queue = Q2}}. +handle_call({?CONSUME_SYNC, #beam_stats{}=BEAMStats}, _, #state{}=State1) -> + State2 = consume_one(BEAMStats, State1), + {reply, {}, State2}. -handle_info( - ?SIGNAL_CONSUMPTION, - #state - { consumer_module = ConsumerModule - , consumer_state = ConsumerState - , consumption_interval = ConsumptionInterval - , beam_stats_queue = Q - }=State1 -) -> - State2 = State1#state - { consumer_state = ConsumerModule:consume(Q, ConsumerState) - , beam_stats_queue = queue:new() - }, - ok = schedule_next_consumption(ConsumptionInterval), +handle_cast({?CONSUME_ASYNC, #beam_stats{}=BEAMStats}, #state{}=State1) -> + State2 = enqueue(BEAMStats, State1), + {noreply, State2}. + +handle_info(?FLUSH, #state{consumption_interval=ConsumptionInterval}=State1) -> + State2 = consume_all_queued(State1), + ok = schedule_next_flush(ConsumptionInterval), {noreply, State2}. %% ============================================================================ %% Internal %% ============================================================================ --spec schedule_first_consumption() -> +-spec consume_one(beam_stats:t(), state()) -> + state(). +consume_one(#beam_stats{}=BEAMStats, #state{}=State1) -> + Q = queue:in(BEAMStats, queue:new()), + consume(Q, State1). + +-spec consume_all_queued(state()) -> + state(). +consume_all_queued(#state{beam_stats_queue=Q}=State1) -> + State2 = consume(Q, State1), + State2#state{beam_stats_queue = queue:new()}. + +-spec consume(queue(), state()) -> + state(). +consume( + Q, + #state + { consumer_module = ConsumerModule + , consumer_state = ConsumerState + }=State +) -> + State#state{consumer_state = ConsumerModule:consume(Q, ConsumerState)}. + +-spec enqueue(beam_stats:t(), state()) -> + state(). +enqueue(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q}=State) -> + State#state{beam_stats_queue = queue:in(BEAMStats, Q)}. + +-spec schedule_first_flush() -> ok. -schedule_first_consumption() -> - _ = self() ! ?SIGNAL_CONSUMPTION, +schedule_first_flush() -> + _ = self() ! ?FLUSH, ok. --spec schedule_next_consumption(non_neg_integer()) -> +-spec schedule_next_flush(non_neg_integer()) -> ok. -schedule_next_consumption(Time) -> - _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION), +schedule_next_flush(Time) -> + _ = erlang:send_after(Time, self(), ?FLUSH), ok.