From: Siraaj Khandkar Date: Tue, 11 Aug 2015 05:05:20 +0000 (-0400) Subject: WIP X-Git-Tag: 0.0.0~24 X-Git-Url: https://git.xandkar.net/?a=commitdiff_plain;h=caf75ed8160362773766c6bde005cf5f33544392;p=beam_stats.git WIP --- caf75ed8160362773766c6bde005cf5f33544392 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1204ed7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +deps/ +ebin/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..338f685 --- /dev/null +++ b/Makefile @@ -0,0 +1,52 @@ +REBAR := ./rebar --config rebar_test_build.config + +.PHONY: \ + all \ + clean_all \ + clean_app \ + compile_all \ + compile_app \ + deps \ + deps_get \ + deps_update \ + dialyze \ + dialyzer_blt_build + +all: \ + clean_all \ + deps_get \ + compile_all + +deps_get: + @$(REBAR) get-deps + +deps_update: + @$(REBAR) update-deps + +deps: \ + deps_get \ + deps_update + +compile_all: + $(REBAR) compile skip_deps=false + +compile_app: + $(REBAR) compile skip_deps=true + +clean_all: + $(REBAR) clean skip_deps=false + +clean_app: + $(REBAR) clean skip_deps=true + +dialyze: + @dialyzer deps/*/ebin ebin + +dialyzer_blt_build: + @dialyzer \ + --build_plt \ + --apps $(shell ls $(shell \ + erl -eval 'io:format(code:lib_dir()), init:stop().' -noshell) \ + | grep -v interface \ + | sed -e 's/-[0-9.]*//' \ + ) diff --git a/README.md b/README.md new file mode 100644 index 0000000..12e2e3c --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ +beam_stats +========== + +Periodically collects and pushes VM metrics to arbitrary consumers (Graphite +and CSV by default). + +Essentially like `folsomite`, but better. Better in the following ways: + +- More-general: consumers other than graphite can be defined +- More-focused: only concerned with VM metrics, while `folsomite` ships off + _everything_ from `folsom` (in addition to VM metrics) +- Easier-to-reason-about implementation: well-defined metrics-to-binary + conversions, as opposed to the nearly-arbitrary term-to-string conversions + used in `folsomite` +- Spec'd, tested and Dialyzed diff --git a/beam_stats.csv b/beam_stats.csv new file mode 100644 index 0000000..37d2b5d --- /dev/null +++ b/beam_stats.csv @@ -0,0 +1,36 @@ +1439265086|nonode@nohost|total|19511216 +1439265086|nonode@nohost|processes|5198144 +1439265086|nonode@nohost|processes_used|5198144 +1439265086|nonode@nohost|system|14313072 +1439265086|nonode@nohost|atom|202481 +1439265086|nonode@nohost|atom_used|189359 +1439265086|nonode@nohost|binary|431680 +1439265086|nonode@nohost|code|4447323 +1439265086|nonode@nohost|ets|310096 +1439265146|nonode@nohost|total|19636224 +1439265146|nonode@nohost|processes|5256696 +1439265146|nonode@nohost|processes_used|5256696 +1439265146|nonode@nohost|system|14379528 +1439265146|nonode@nohost|atom|202481 +1439265146|nonode@nohost|atom_used|190107 +1439265146|nonode@nohost|binary|445632 +1439265146|nonode@nohost|code|4495278 +1439265146|nonode@nohost|ets|311248 +2015-08-11 00:17:32.260189|nonode@nohost|total|18823432 +2015-08-11 00:17:32.260189|nonode@nohost|processes|4517464 +2015-08-11 00:17:32.260189|nonode@nohost|processes_used|4517464 +2015-08-11 00:17:32.260189|nonode@nohost|system|14305968 +2015-08-11 00:17:32.260189|nonode@nohost|atom|202481 +2015-08-11 00:17:32.260189|nonode@nohost|atom_used|189163 +2015-08-11 00:17:32.260189|nonode@nohost|binary|435592 +2015-08-11 00:17:32.260189|nonode@nohost|code|4439027 +2015-08-11 00:17:32.260189|nonode@nohost|ets|308784 +2015-08-11 00:18:32.261018|nonode@nohost|total|19010544 +2015-08-11 00:18:32.261018|nonode@nohost|processes|4537640 +2015-08-11 00:18:32.261018|nonode@nohost|processes_used|4537640 +2015-08-11 00:18:32.261018|nonode@nohost|system|14472904 +2015-08-11 00:18:32.261018|nonode@nohost|atom|202481 +2015-08-11 00:18:32.261018|nonode@nohost|atom_used|190558 +2015-08-11 00:18:32.261018|nonode@nohost|binary|499792 +2015-08-11 00:18:32.261018|nonode@nohost|code|4530778 +2015-08-11 00:18:32.261018|nonode@nohost|ets|312400 diff --git a/include/beam_stats.hrl b/include/beam_stats.hrl new file mode 100644 index 0000000..6b4df10 --- /dev/null +++ b/include/beam_stats.hrl @@ -0,0 +1,11 @@ +-record(beam_stats, + { timestamp :: erlang:timestamp() + , node_id :: atom() + , memory :: [{erlang:memory_type(), non_neg_integer()}] + %, statistics :: [{atom() , term()}] + %, system :: [{atom() , term()}] + %, process :: [{atom() , term()}] + %, port :: [{atom() , term()}] + %, ets :: [{atom() , term()}] + %, dets :: [{atom() , term()}] + }). diff --git a/rebar b/rebar new file mode 100755 index 0000000..49711dc Binary files /dev/null and b/rebar differ diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..8d9caba --- /dev/null +++ b/rebar.config @@ -0,0 +1,4 @@ +{ deps +, [ {hope, ".*"} + ] +}. diff --git a/rebar_test_build.config b/rebar_test_build.config new file mode 100644 index 0000000..b65a1df --- /dev/null +++ b/rebar_test_build.config @@ -0,0 +1,4 @@ +{ deps +, [ {hope, ".*", {git, "https://github.com/ibnfirnas/hope.git", {tag, "3.7.0"}}} + ] +}. diff --git a/src/beam_stats.app.src b/src/beam_stats.app.src new file mode 100644 index 0000000..1750455 --- /dev/null +++ b/src/beam_stats.app.src @@ -0,0 +1,31 @@ +{application, beam_stats, + [ + {description, "Periodic VM stats production and consumption."}, + {vsn, "0.0.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + + {mod, { beam_stats_app, []}}, + + {env, + [ {production_interval , 30000} + , {consumers, + [ {beam_stats_consumer_graphite, + [ {consumption_interval , 60000} + , {connect_options, + [ {host , "localhost"} + , {port , 2003} + , {timeout , 5000} + ]} + ]} + , {beam_stats_consumer_csv, + [ {consumption_interval , 60000} + , {path , "beam_stats.csv"} + ]} + ]} + ]} + + ]}. diff --git a/src/beam_stats.erl b/src/beam_stats.erl new file mode 100644 index 0000000..e388396 --- /dev/null +++ b/src/beam_stats.erl @@ -0,0 +1,25 @@ +-module(beam_stats). + +-include("include/beam_stats.hrl"). + +-export_type( + [ t/0 + ]). + +-export( + [ collect/0 + ]). + +-define(T, #?MODULE). + +-type t() :: + ?T{}. + +-spec collect() -> + t(). +collect() -> + ?T + { timestamp = os:timestamp() + , node_id = erlang:node() + , memory = erlang:memory() + }. diff --git a/src/beam_stats.hrl b/src/beam_stats.hrl new file mode 100644 index 0000000..c7df52a --- /dev/null +++ b/src/beam_stats.hrl @@ -0,0 +1,3 @@ +-define(METHOD_SHOULD_NOT_BE_USED(Method, State), + {stop, {method_should_not_be_used, {?MODULE, Method}}, State} +). diff --git a/src/beam_stats_app.erl b/src/beam_stats_app.erl new file mode 100644 index 0000000..c72415c --- /dev/null +++ b/src/beam_stats_app.erl @@ -0,0 +1,16 @@ +-module(beam_stats_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + beam_stats_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/beam_stats_config.erl b/src/beam_stats_config.erl new file mode 100644 index 0000000..68a44ed --- /dev/null +++ b/src/beam_stats_config.erl @@ -0,0 +1,32 @@ +-module(beam_stats_config). + +-export( + [ production_interval/0 + , consumers/0 + ]). + +-define(APP, beam_stats). + +%% ============================================================================ +%% API +%% ============================================================================ + +-spec production_interval() -> + erlang:time(). +production_interval() -> + get_env(production_interval). + +-spec consumers() -> + [{ConsumerModule :: atom(), ConsumerDefinedOptionsData :: term()}]. +consumers() -> + get_env(consumers). + +%% ============================================================================ +%% Internal +%% ============================================================================ + +-spec get_env(atom()) -> + term(). +get_env(Key) -> + {ok, Value} = application:get_env(?APP, Key), + Value. diff --git a/src/beam_stats_consumer.erl b/src/beam_stats_consumer.erl new file mode 100644 index 0000000..8d25fa4 --- /dev/null +++ b/src/beam_stats_consumer.erl @@ -0,0 +1,143 @@ +-module(beam_stats_consumer). + +-include("include/beam_stats.hrl"). +-include( "beam_stats.hrl"). + +-behaviour(gen_server). + +-export_type( + [ queue/0 + ]). + +%% Public API +-export( + [ add/2 + ]). + +%% Internal API +-export( + [ start_link/2 + ]). + +%% gen_server callbacks +-export( + [ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-type queue() :: + queue:queue(beam_stats:t()). + +%% ============================================================================ +%% Consumer interface +%% ============================================================================ + +-callback init(Options :: term()) -> + {ConsumptionInterval :: erlang:time(), State :: term()}. + +-callback consume(queue(), State) -> + State. + +-callback terminate(State :: term()) -> + {}. + +%% ============================================================================ +%% Internal data +%% ============================================================================ + +-define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal). + +-record(state, + { consumer_module :: atom() + , consumer_state :: term() + , consumption_interval :: erlang:time() + , beam_stats_queue :: queue() + }). + +%% ============================================================================ +%% Public API +%% ============================================================================ + +-spec add(atom(), term()) -> + supervisor:startchild_ret(). +add(ConsumerModule, ConsumerOptions) -> + beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions). + +%% ============================================================================ +%% Internal API +%% ============================================================================ + +start_link(ConsumerModule, ConsumerOptions) -> + GenServerModule = ?MODULE, + GenServerOpts = [], + InitArgs = [ConsumerModule, ConsumerOptions], + gen_server:start_link(GenServerModule, InitArgs, GenServerOpts). + +%% ============================================================================ +%% gen_server callbacks (unused) +%% ============================================================================ + +handle_call(_Request, _From, State) -> + ?METHOD_SHOULD_NOT_BE_USED(handle_call, State). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ============================================================================ +%% gen_server callbacks +%% ============================================================================ + +init([ConsumerModule, ConsumerOptions]) -> + {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions), + State = #state + { consumer_module = ConsumerModule + , consumer_state = ConsumerState + , consumption_interval = ConsumptionInterval + , beam_stats_queue = queue:new() + }, + ok = beam_stats_producer:subscribe(self()), + ok = schedule_first_consumption(), + {ok, State}. + +terminate(_Reason, _State) -> + ok = beam_stats_producer:unsubscribe(self()). + +handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) -> + Q2 = queue:in(BEAMStats, Q1), + {noreply, State#state{beam_stats_queue = Q2}}. + +handle_info( + ?SIGNAL_CONSUMPTION, + #state + { consumer_module = ConsumerModule + , consumer_state = ConsumerState + , consumption_interval = ConsumptionInterval + , beam_stats_queue = Q + }=State1 +) -> + State2 = State1#state + { consumer_state = ConsumerModule:consume(Q, ConsumerState) + , beam_stats_queue = queue:new() + }, + ok = schedule_next_consumption(ConsumptionInterval), + {noreply, State2}. + +%% ============================================================================ +%% Internal +%% ============================================================================ + +-spec schedule_first_consumption() -> + ok. +schedule_first_consumption() -> + _ = self() ! ?SIGNAL_CONSUMPTION, + ok. + +-spec schedule_next_consumption(erlang:time()) -> + ok. +schedule_next_consumption(Time) -> + _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION), + ok. diff --git a/src/beam_stats_consumer_csv.erl b/src/beam_stats_consumer_csv.erl new file mode 100644 index 0000000..c00c28f --- /dev/null +++ b/src/beam_stats_consumer_csv.erl @@ -0,0 +1,155 @@ +-module(beam_stats_consumer_csv). + +-include("include/beam_stats.hrl"). + +-behaviour(beam_stats_consumer). + +-export_type( + [ option/0 + ]). + +-export( + [ init/1 + , consume/2 + , terminate/1 + ]). + +-type option() :: + {path , file:filename()} + | {consumption_interval , erlang:time()} + . + +-record(state, + { path :: file:filename() + , file = none :: hope_option:t(file:io_device()) + }). + +-type state() :: + #state{}. + +-spec init([option()]) -> + {erlang:time(), state()}. +init(Options) -> + ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), + {some, Path} = hope_kv_list:get(Options, path), + State = #state + { path = Path + , file = none + }, + {ConsumptionInterval, State}. + +-spec consume(beam_stats_consumer:queue(), state()) -> + state(). +consume(Q, #state{}=State1) -> + Payload = beam_stats_queue_to_binary(Q), + State2 = try_to_open_if_no_file(State1), + try_to_write(State2, Payload). + +-spec terminate(state()) -> + {}. +terminate(#state{file=FileOpt}) -> + ok = hope_option:iter(FileOpt, fun file:close/1), + {}. + +%% ============================================================================ + +-spec try_to_write(state(), binary()) -> + state(). +try_to_write(#state{file=none, path=Path}=State, _) -> + io:format("error: file closed: ~s~n", [Path]), + State; +try_to_write(#state{file={some, File}}=State, Payload) -> + case file:write(File, Payload) + of ok -> + State + ; {error, _}=Error -> + io:format("error: file:write/2 failed: ~p~n", [Error]), + % TODO: Maybe schedule retry? + ok = file:close(File), + State#state{file=none} + end. + +-spec try_to_open_if_no_file(state()) -> + state(). +try_to_open_if_no_file(#state{file={some, _}}=State) -> + State; +try_to_open_if_no_file(#state{file=none, path=Path}=State) -> + case file:open(Path, [append]) + of {ok, File} -> + State#state{file = {some, File}} + ; {error, _}=Error -> + io:format("error: file:open/2 failed: ~p~n", [Error]), + State#state{file = none} + end. + +-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> + binary(). +beam_stats_queue_to_binary(BEAMStatsQ) -> + Bins = [beam_stats_to_bin(B) || B <- queue:to_list(BEAMStatsQ)], + iolist_to_binary(Bins). + + +-spec beam_stats_to_bin(beam_stats:t()) -> + binary(). +beam_stats_to_bin(#beam_stats + { timestamp = Timestamp + , node_id = NodeID + , memory = Memory + } +) -> + <> = timestamp_to_bin(Timestamp), + <> = node_id_to_bin(NodeID), + PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin), + MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory), + MemoryBins = lists:map(PairToBin, MemoryBinPairs), + AllBins = + [ MemoryBins + ], + iolist_to_binary(AllBins). + +-spec timestamp_to_bin(erlang:timestamp()) -> + binary(). +timestamp_to_bin(Timestamp) -> + TimestampFloat = timestamp_to_float(Timestamp), + {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(Timestamp), + SecondsFloat = Sec + (TimestampFloat - trunc(TimestampFloat)), + Fmt2Digits = "~2.10.0b", + FmtDate = string:join(["~b" , Fmt2Digits, Fmt2Digits], "-"), + FmtTime = string:join([Fmt2Digits, Fmt2Digits, "~9..0f" ], ":"), + Separator = " ", + Fmt = FmtDate ++ Separator ++ FmtTime, + IOList = io_lib:format(Fmt, [Year, Month, Day, Hour, Min, SecondsFloat]), + iolist_to_binary(IOList). + +-spec timestamp_to_float(erlang:timestamp()) -> + float(). +timestamp_to_float({ComponentMega, ComponentWhole, ComponentMicro}) -> + OneMillion = 1000000, + TotalWholeSeconds = ComponentMega * OneMillion + ComponentWhole, + TotalMicroSeconds = (TotalWholeSeconds * OneMillion) + ComponentMicro, + TotalMicroSeconds / OneMillion. + +-spec make_pair_to_bin(binary(), binary()) -> + fun(({binary(), binary()}) -> binary()). +make_pair_to_bin(<>, <>) -> + fun ({<>, <>}) -> + << TimestampBin/binary + , "|" + , NodeID/binary + , "|" + , K/binary + , "|" + , V/binary + , "\n" + >> + end. + +-spec node_id_to_bin(node()) -> + binary(). +node_id_to_bin(NodeID) -> + atom_to_binary(NodeID, utf8). + +-spec atom_int_to_bin_bin({atom(), integer()}) -> + {binary(), binary()}. +atom_int_to_bin_bin({K, V}) -> + {atom_to_binary(K, latin1), integer_to_binary(V)}. diff --git a/src/beam_stats_consumer_graphite.erl b/src/beam_stats_consumer_graphite.erl new file mode 100644 index 0000000..c60c6e8 --- /dev/null +++ b/src/beam_stats_consumer_graphite.erl @@ -0,0 +1,157 @@ +-module(beam_stats_consumer_graphite). + +-include("include/beam_stats.hrl"). + +-behaviour(beam_stats_consumer). + +-export_type( + [ option/0 + , connect_option/0 + ]). + +-export( + [ init/1 + , consume/2 + , terminate/1 + ]). + +-type connect_option() :: + {host , inet:ip_address() | inet:hostname()} + | {port , inet:port_number()} + | {timeout , timeout()} + . + +-type option() :: + {connect_options , [connect_option()]} + | {consumption_interval , erlang:time()} + . + +-record(state, + { connect_options = [] :: [connect_option()] + , sock = none :: hope_option:t(gen_tcp:socket()) + }). + +-type state() :: + #state{}. + +-define(GRAPHITE_PATH_PREFIX, "beam_stats"). + +-spec init([option()]) -> + {erlang:time(), state()}. +init(Options) -> + ConnectOptions = hope_kv_list:get(Options, connect_options , []), + ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000), + State = #state + { connect_options = ConnectOptions + , sock = none + }, + {ConsumptionInterval, State}. + +-spec consume(beam_stats_consumer:queue(), state()) -> + state(). +consume(Q, #state{}=State1) -> + Payload = beam_stats_queue_to_binary(Q), + State2 = try_to_connect_if_no_socket(State1), + try_to_send(State2, Payload). + +-spec terminate(state()) -> + {}. +terminate(#state{sock=SockOpt}) -> + ok = hope_option:iter(SockOpt, fun gen_tcp:close/1), + {}. + +%% ============================================================================ + +-spec try_to_send(state(), binary()) -> + state(). +try_to_send(#state{sock=none}=State, _) -> + io:format("error: socket closed~n"), + % TODO: Maybe schedule retry? + State; +try_to_send(#state{sock={some, Sock}}=State, Payload) -> + case gen_tcp:send(Sock, Payload) + of ok -> + State + ; {error, _}=Error -> + io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]), + % TODO: Maybe schedule retry? + ok = gen_tcp:close(Sock), + State#state{sock=none} + end. + +-spec try_to_connect_if_no_socket(state()) -> + state(). +try_to_connect_if_no_socket(#state{sock={some, _}}=State) -> + State; +try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) -> + DefaultHost = "localhost", + DefaultPort = 2003, + DefaultTimeout = 5000, + Host = hope_kv_list:get(Options, host , DefaultHost), + Port = hope_kv_list:get(Options, port , DefaultPort), + Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout), + case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout) + of {ok, Sock} -> + State#state{sock = {some, Sock}} + ; {error, _}=Error -> + io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]), + State#state{sock = none} + end. + +-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) -> + binary(). +beam_stats_queue_to_binary(Q) -> + Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)], + iolist_to_binary(Bins). + +-spec beam_stats_to_bin(beam_stats:t()) -> + binary(). +beam_stats_to_bin(#beam_stats + { timestamp = Timestamp + , node_id = NodeID + , memory = Memory + } +) -> + TimestampInt = timestamp_to_integer(Timestamp), + TimestampBin = integer_to_binary(TimestampInt), + <> = node_id_to_bin(NodeID), + PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin), + MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory), + MemoryBins = lists:map(PairToBin, MemoryBinPairs), + AllBins = + [ MemoryBins + ], + iolist_to_binary(AllBins). + +-spec timestamp_to_integer(erlang:timestamp()) -> + non_neg_integer(). +timestamp_to_integer({Megaseconds, Seconds, _}) -> + Megaseconds * 1000000 + Seconds. + +-spec make_pair_to_bin(binary(), binary()) -> + fun(({binary(), binary()}) -> binary()). +make_pair_to_bin(<>, <>) -> + fun ({<>, <>}) -> + << ?GRAPHITE_PATH_PREFIX + , "." + , NodeID/binary + , "." + , K/binary + , " " + , V/binary + , " " + , TimestampBin/binary + , "\n" + >> + end. + +-spec node_id_to_bin(node()) -> + binary(). +node_id_to_bin(NodeID) -> + NodeIDBin = atom_to_binary(NodeID, utf8), + re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]). + +-spec atom_int_to_bin_bin({atom(), integer()}) -> + {binary(), binary()}. +atom_int_to_bin_bin({K, V}) -> + {atom_to_binary(K, latin1), integer_to_binary(V)}. diff --git a/src/beam_stats_producer.erl b/src/beam_stats_producer.erl new file mode 100644 index 0000000..5c47951 --- /dev/null +++ b/src/beam_stats_producer.erl @@ -0,0 +1,109 @@ +-module(beam_stats_producer). + +-include("beam_stats.hrl"). + +-behaviour(gen_server). + +%% API +-export( + [ start_link/0 + , subscribe/1 + , unsubscribe/1 + ]). + +%% gen_server callbacks +-export( + [ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +%% ============================================================================ +%% Internal data +%% ============================================================================ + +-define(SIGNAL_PRODUCTION , beam_stats_production_signal). + +-record(state, + { consumers = ordsets:new() :: ordsets:ordset(pid()) + }). + +%% ============================================================================ +%% API +%% ============================================================================ + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec subscribe(pid()) -> + ok. +subscribe(PID) -> + gen_server:cast(?MODULE, {subscribe, PID}). + +-spec unsubscribe(pid()) -> + ok. +unsubscribe(PID) -> + gen_server:cast(?MODULE, {unsubscribe, PID}). + +%% ============================================================================ +%% gen_server callbacks (unused) +%% ============================================================================ + +handle_call(_Request, _From, State) -> + ?METHOD_SHOULD_NOT_BE_USED(handle_call, State). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +%% ============================================================================ +%% gen_server callbacks +%% ============================================================================ + +init([]) -> + ok = schedule_first_production(), + Consumers = ordsets:new(), + {ok, #state{consumers=Consumers}}. + +handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) -> + Consumers2 = ordsets:add_element(PID, Consumers1), + {noreply, State#state{consumers=Consumers2}}; + +handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) -> + Consumers2 = ordsets:del_element(PID, Consumers1), + {noreply, State#state{consumers=Consumers2}}. + +handle_info(?SIGNAL_PRODUCTION, #state{consumers=ConsumersSet}=State) -> + ConsumersList = ordsets:to_list(ConsumersSet), + ok = collect_and_push_to_consumers(ConsumersList), + ok = schedule_next_production(), + {noreply, State}. + +%% ============================================================================ +%% Private +%% ============================================================================ + +-spec schedule_first_production() -> + ok. +schedule_first_production() -> + _ = self() ! ?SIGNAL_PRODUCTION, + ok. + +-spec schedule_next_production() -> + ok. +schedule_next_production() -> + ProductionInterval = beam_stats_config:production_interval(), + _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION), + ok. + +-spec collect_and_push_to_consumers([pid()]) -> + ok. +collect_and_push_to_consumers(Consumers) -> + BEAMStats = beam_stats:collect(), + Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end, + lists:foreach(Push, Consumers). diff --git a/src/beam_stats_sup.erl b/src/beam_stats_sup.erl new file mode 100644 index 0000000..7cfec55 --- /dev/null +++ b/src/beam_stats_sup.erl @@ -0,0 +1,32 @@ +-module(beam_stats_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(Type, Module), + {Module, {Module, start_link, []}, permanent, 5000, Type, [Module]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + Children = + [ ?CHILD(worker , beam_stats_producer) + , ?CHILD(supervisor , beam_stats_sup_consumers) + ], + SupFlags = {one_for_one, 5, 10}, + {ok, {SupFlags, Children}}. diff --git a/src/beam_stats_sup_consumers.erl b/src/beam_stats_sup_consumers.erl new file mode 100644 index 0000000..727cf3d --- /dev/null +++ b/src/beam_stats_sup_consumers.erl @@ -0,0 +1,41 @@ +-module(beam_stats_sup_consumers). + +-behaviour(supervisor). + +%% API +-export( + [ start_link/0 + , start_child/2 + ]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(Type, Module, Args), + {make_ref(), {Module, start_link, Args}, permanent, 5000, Type, [Module]}). + +%% =================================================================== +%% API +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_child(Module, Options) -> + Child = ?CHILD(worker, beam_stats_consumer, [Module, Options]), + supervisor:start_child(?MODULE, Child). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + Consumers = beam_stats_config:consumers(), + ConsumerSpecToChild = + fun ({Module, Options}) -> + ?CHILD(worker, beam_stats_consumer, [Module, Options]) + end, + Children = lists:map(ConsumerSpecToChild, Consumers), + SupFlags = {one_for_one, 5, 10}, + {ok, {SupFlags, Children}}.