X-Git-Url: https://git.xandkar.net/?p=beam_stats.git;a=blobdiff_plain;f=src%2Fbeam_stats_consumer.erl;h=3e46042311b1cd3033825bda65d04fa65dbe3dbb;hp=eb8cbb7d2094b8ae8dca88c748ceca48a0f52295;hb=2086842d3832286088d0ca9646b23e3bfd50553c;hpb=093e16693e52d84ef7c158aa9d808efe6a9b2e85 diff --git a/src/beam_stats_consumer.erl b/src/beam_stats_consumer.erl index eb8cbb7..3e46042 100644 --- a/src/beam_stats_consumer.erl +++ b/src/beam_stats_consumer.erl @@ -1,7 +1,6 @@ -module(beam_stats_consumer). -include("include/beam_stats.hrl"). --include( "beam_stats.hrl"). -behaviour(gen_server). @@ -12,6 +11,8 @@ %% Public API -export( [ add/2 + , consume_sync/2 + , consume_async/2 ]). %% Internal API @@ -49,7 +50,9 @@ %% Internal data %% ============================================================================ --define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal). +-define(FLUSH , flush). +-define(CONSUME_SYNC , consume_sync). +-define(CONSUME_ASYNC , consume_async). -record(state, { consumer_module :: module() @@ -58,6 +61,9 @@ , beam_stats_queue :: queue() }). +-type state() :: + #state{}. + %% ============================================================================ %% Public API %% ============================================================================ @@ -67,6 +73,17 @@ add(ConsumerModule, ConsumerOptions) -> beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions). +-spec consume_sync(pid(), beam_stats:t()) -> + {}. +consume_sync(PID, #beam_stats{}=BEAMStats) -> + {} = gen_server:call(PID, {?CONSUME_SYNC, BEAMStats}). + +-spec consume_async(pid(), beam_stats:t()) -> + {}. +consume_async(PID, #beam_stats{}=BEAMStats) -> + ok = gen_server:cast(PID, {?CONSUME_ASYNC, BEAMStats}), + {}. + %% ============================================================================ %% Internal API %% ============================================================================ @@ -81,9 +98,6 @@ start_link(ConsumerModule, ConsumerOptions) -> %% gen_server callbacks (unused) %% ============================================================================ -handle_call(_Request, _From, State) -> - ?METHOD_SHOULD_NOT_BE_USED(handle_call, State). - code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -100,44 +114,65 @@ init([ConsumerModule, ConsumerOptions]) -> , beam_stats_queue = queue:new() }, ok = beam_stats_producer:subscribe(self()), - ok = schedule_first_consumption(), + ok = schedule_first_flush(), {ok, State}. terminate(_Reason, _State) -> ok = beam_stats_producer:unsubscribe(self()). -handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) -> - Q2 = queue:in(BEAMStats, Q1), - {noreply, State#state{beam_stats_queue = Q2}}. +handle_call({?CONSUME_SYNC, #beam_stats{}=BEAMStats}, _, #state{}=State1) -> + State2 = consume_one(BEAMStats, State1), + {reply, {}, State2}. -handle_info( - ?SIGNAL_CONSUMPTION, - #state - { consumer_module = ConsumerModule - , consumer_state = ConsumerState - , consumption_interval = ConsumptionInterval - , beam_stats_queue = Q - }=State1 -) -> - State2 = State1#state - { consumer_state = ConsumerModule:consume(Q, ConsumerState) - , beam_stats_queue = queue:new() - }, - ok = schedule_next_consumption(ConsumptionInterval), +handle_cast({?CONSUME_ASYNC, #beam_stats{}=BEAMStats}, #state{}=State1) -> + State2 = enqueue(BEAMStats, State1), + {noreply, State2}. + +handle_info(?FLUSH, #state{consumption_interval=ConsumptionInterval}=State1) -> + State2 = consume_all_queued(State1), + ok = schedule_next_flush(ConsumptionInterval), {noreply, State2}. %% ============================================================================ %% Internal %% ============================================================================ --spec schedule_first_consumption() -> +-spec consume_one(beam_stats:t(), state()) -> + state(). +consume_one(#beam_stats{}=BEAMStats, #state{}=State1) -> + Q = queue:in(BEAMStats, queue:new()), + consume(Q, State1). + +-spec consume_all_queued(state()) -> + state(). +consume_all_queued(#state{beam_stats_queue=Q}=State1) -> + State2 = consume(Q, State1), + State2#state{beam_stats_queue = queue:new()}. + +-spec consume(queue(), state()) -> + state(). +consume( + Q, + #state + { consumer_module = ConsumerModule + , consumer_state = ConsumerState + }=State +) -> + State#state{consumer_state = ConsumerModule:consume(Q, ConsumerState)}. + +-spec enqueue(beam_stats:t(), state()) -> + state(). +enqueue(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q}=State) -> + State#state{beam_stats_queue = queue:in(BEAMStats, Q)}. + +-spec schedule_first_flush() -> ok. -schedule_first_consumption() -> - _ = self() ! ?SIGNAL_CONSUMPTION, +schedule_first_flush() -> + _ = self() ! ?FLUSH, ok. --spec schedule_next_consumption(non_neg_integer()) -> +-spec schedule_next_flush(non_neg_integer()) -> ok. -schedule_next_consumption(Time) -> - _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION), +schedule_next_flush(Time) -> + _ = erlang:send_after(Time, self(), ?FLUSH), ok.