From 2086842d3832286088d0ca9646b23e3bfd50553c Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Wed, 26 Aug 2015 00:00:27 -0400 Subject: [PATCH] Implement synchronous produce->consume cycle, for testing. --- src/beam_stats.app.src | 2 +- src/beam_stats.hrl | 3 -- src/beam_stats_consumer.erl | 93 +++++++++++++++++++++++++------------ src/beam_stats_producer.erl | 57 +++++++++++++---------- 4 files changed, 98 insertions(+), 57 deletions(-) delete mode 100644 src/beam_stats.hrl diff --git a/src/beam_stats.app.src b/src/beam_stats.app.src index 40bab63..9eb72ab 100644 --- a/src/beam_stats.app.src +++ b/src/beam_stats.app.src @@ -1,7 +1,7 @@ {application, beam_stats, [ {description, "Periodic VM stats production and consumption."}, - {vsn, "0.5.1"}, + {vsn, "0.5.2"}, {registered, []}, {applications, [ kernel diff --git a/src/beam_stats.hrl b/src/beam_stats.hrl deleted file mode 100644 index c7df52a..0000000 --- a/src/beam_stats.hrl +++ /dev/null @@ -1,3 +0,0 @@ --define(METHOD_SHOULD_NOT_BE_USED(Method, State), - {stop, {method_should_not_be_used, {?MODULE, Method}}, State} -). diff --git a/src/beam_stats_consumer.erl b/src/beam_stats_consumer.erl index eb8cbb7..3e46042 100644 --- a/src/beam_stats_consumer.erl +++ b/src/beam_stats_consumer.erl @@ -1,7 +1,6 @@ -module(beam_stats_consumer). -include("include/beam_stats.hrl"). --include( "beam_stats.hrl"). -behaviour(gen_server). @@ -12,6 +11,8 @@ %% Public API -export( [ add/2 + , consume_sync/2 + , consume_async/2 ]). %% Internal API @@ -49,7 +50,9 @@ %% Internal data %% ============================================================================ --define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal). +-define(FLUSH , flush). +-define(CONSUME_SYNC , consume_sync). +-define(CONSUME_ASYNC , consume_async). -record(state, { consumer_module :: module() @@ -58,6 +61,9 @@ , beam_stats_queue :: queue() }). +-type state() :: + #state{}. + %% ============================================================================ %% Public API %% ============================================================================ @@ -67,6 +73,17 @@ add(ConsumerModule, ConsumerOptions) -> beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions). +-spec consume_sync(pid(), beam_stats:t()) -> + {}. +consume_sync(PID, #beam_stats{}=BEAMStats) -> + {} = gen_server:call(PID, {?CONSUME_SYNC, BEAMStats}). + +-spec consume_async(pid(), beam_stats:t()) -> + {}. +consume_async(PID, #beam_stats{}=BEAMStats) -> + ok = gen_server:cast(PID, {?CONSUME_ASYNC, BEAMStats}), + {}. + %% ============================================================================ %% Internal API %% ============================================================================ @@ -81,9 +98,6 @@ start_link(ConsumerModule, ConsumerOptions) -> %% gen_server callbacks (unused) %% ============================================================================ -handle_call(_Request, _From, State) -> - ?METHOD_SHOULD_NOT_BE_USED(handle_call, State). - code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -100,44 +114,65 @@ init([ConsumerModule, ConsumerOptions]) -> , beam_stats_queue = queue:new() }, ok = beam_stats_producer:subscribe(self()), - ok = schedule_first_consumption(), + ok = schedule_first_flush(), {ok, State}. terminate(_Reason, _State) -> ok = beam_stats_producer:unsubscribe(self()). -handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) -> - Q2 = queue:in(BEAMStats, Q1), - {noreply, State#state{beam_stats_queue = Q2}}. +handle_call({?CONSUME_SYNC, #beam_stats{}=BEAMStats}, _, #state{}=State1) -> + State2 = consume_one(BEAMStats, State1), + {reply, {}, State2}. -handle_info( - ?SIGNAL_CONSUMPTION, - #state - { consumer_module = ConsumerModule - , consumer_state = ConsumerState - , consumption_interval = ConsumptionInterval - , beam_stats_queue = Q - }=State1 -) -> - State2 = State1#state - { consumer_state = ConsumerModule:consume(Q, ConsumerState) - , beam_stats_queue = queue:new() - }, - ok = schedule_next_consumption(ConsumptionInterval), +handle_cast({?CONSUME_ASYNC, #beam_stats{}=BEAMStats}, #state{}=State1) -> + State2 = enqueue(BEAMStats, State1), + {noreply, State2}. + +handle_info(?FLUSH, #state{consumption_interval=ConsumptionInterval}=State1) -> + State2 = consume_all_queued(State1), + ok = schedule_next_flush(ConsumptionInterval), {noreply, State2}. %% ============================================================================ %% Internal %% ============================================================================ --spec schedule_first_consumption() -> +-spec consume_one(beam_stats:t(), state()) -> + state(). +consume_one(#beam_stats{}=BEAMStats, #state{}=State1) -> + Q = queue:in(BEAMStats, queue:new()), + consume(Q, State1). + +-spec consume_all_queued(state()) -> + state(). +consume_all_queued(#state{beam_stats_queue=Q}=State1) -> + State2 = consume(Q, State1), + State2#state{beam_stats_queue = queue:new()}. + +-spec consume(queue(), state()) -> + state(). +consume( + Q, + #state + { consumer_module = ConsumerModule + , consumer_state = ConsumerState + }=State +) -> + State#state{consumer_state = ConsumerModule:consume(Q, ConsumerState)}. + +-spec enqueue(beam_stats:t(), state()) -> + state(). +enqueue(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q}=State) -> + State#state{beam_stats_queue = queue:in(BEAMStats, Q)}. + +-spec schedule_first_flush() -> ok. -schedule_first_consumption() -> - _ = self() ! ?SIGNAL_CONSUMPTION, +schedule_first_flush() -> + _ = self() ! ?FLUSH, ok. --spec schedule_next_consumption(non_neg_integer()) -> +-spec schedule_next_flush(non_neg_integer()) -> ok. -schedule_next_consumption(Time) -> - _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION), +schedule_next_flush(Time) -> + _ = erlang:send_after(Time, self(), ?FLUSH), ok. diff --git a/src/beam_stats_producer.erl b/src/beam_stats_producer.erl index 67765d3..c7ebd5e 100644 --- a/src/beam_stats_producer.erl +++ b/src/beam_stats_producer.erl @@ -1,7 +1,5 @@ -module(beam_stats_producer). --include("beam_stats.hrl"). - -behaviour(gen_server). %% API @@ -10,8 +8,8 @@ , subscribe/1 , unsubscribe/1 - % Force production and distribution. Blocks. Mainly for testing. - , force_production/0 + % For testing + , sync_produce_consume/0 ]). %% gen_server callbacks @@ -28,8 +26,8 @@ %% Internal data %% ============================================================================ --define(SIGNAL_PRODUCTION , beam_stats_production_signal). --define(FORCE_PRODUCTION , beam_stats_force_production). +-define(PRODUCE_SYNC , produce_sync). +-define(PRODUCE_ASYNC , produce_async). -record(state, { consumers = ordsets:new() :: ordsets:ordset(pid()) @@ -56,10 +54,10 @@ subscribe(PID) -> unsubscribe(PID) -> gen_server:cast(?MODULE, {unsubscribe, PID}). --spec force_production() -> +-spec sync_produce_consume() -> {}. -force_production() -> - gen_server:call(?MODULE, ?FORCE_PRODUCTION). +sync_produce_consume() -> + {} = gen_server:call(?MODULE, ?PRODUCE_SYNC). %% ============================================================================ %% gen_server callbacks (unused) @@ -89,12 +87,12 @@ handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) -> Consumers2 = ordsets:del_element(PID, Consumers1), {noreply, State#state{consumers=Consumers2}}. -handle_call(?FORCE_PRODUCTION, _From, State1) -> - State2 = produce(State1), +handle_call(?PRODUCE_SYNC, _From, State1) -> + State2 = produce_sync(State1), {reply, {}, State2}. -handle_info(?SIGNAL_PRODUCTION, #state{}=State1) -> - State2 = produce(State1), +handle_info(?PRODUCE_ASYNC, #state{}=State1) -> + State2 = produce_async(State1), ok = schedule_next_production(), {noreply, State2}. @@ -102,30 +100,41 @@ handle_info(?SIGNAL_PRODUCTION, #state{}=State1) -> %% Private %% ============================================================================ --spec produce(state()) -> +-spec produce_sync(state()) -> + state(). +produce_sync(#state{}=State) -> + produce(State, fun beam_stats_consumer:consume_sync/2). + +-spec produce_async(state()) -> state(). -produce(#state{consumers=ConsumersSet, stats_state=StatsState1}=State) -> +produce_async(#state{}=State) -> + produce(State, fun beam_stats_consumer:consume_async/2). + +-spec produce(state(), fun((pid(), term()) -> ok)) -> + state(). +produce( + #state + { consumers = ConsumersSet + , stats_state = StatsState1 + }=State, + MsgSendFun +) -> StatsState2 = beam_stats_state:update(StatsState1), Stats = beam_stats_state:export(StatsState2), ConsumersList = ordsets:to_list(ConsumersSet), - ok = push_to_consumers(Stats, ConsumersList), + Send = fun (Consumer) -> MsgSendFun(Consumer, Stats) end, + ok = lists:foreach(Send, ConsumersList), State#state{stats_state = StatsState2}. -spec schedule_first_production() -> ok. schedule_first_production() -> - _ = self() ! ?SIGNAL_PRODUCTION, + _ = self() ! ?PRODUCE_ASYNC, ok. -spec schedule_next_production() -> ok. schedule_next_production() -> ProductionInterval = beam_stats_config:production_interval(), - _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION), - ok. - --spec push_to_consumers(beam_stats:t(), [pid()]) -> + _ = erlang:send_after(ProductionInterval, self(), ?PRODUCE_ASYNC), ok. -push_to_consumers(Stats, Consumers) -> - Push = fun (Consumer) -> gen_server:cast(Consumer, Stats) end, - lists:foreach(Push, Consumers). -- 2.20.1