--- /dev/null
+deps/
+ebin/
--- /dev/null
+REBAR := ./rebar --config rebar_test_build.config
+
+.PHONY: \
+ all \
+ clean_all \
+ clean_app \
+ compile_all \
+ compile_app \
+ deps \
+ deps_get \
+ deps_update \
+ dialyze \
+ dialyzer_blt_build
+
+all: \
+ clean_all \
+ deps_get \
+ compile_all
+
+deps_get:
+ @$(REBAR) get-deps
+
+deps_update:
+ @$(REBAR) update-deps
+
+deps: \
+ deps_get \
+ deps_update
+
+compile_all:
+ $(REBAR) compile skip_deps=false
+
+compile_app:
+ $(REBAR) compile skip_deps=true
+
+clean_all:
+ $(REBAR) clean skip_deps=false
+
+clean_app:
+ $(REBAR) clean skip_deps=true
+
+dialyze:
+ @dialyzer deps/*/ebin ebin
+
+dialyzer_blt_build:
+ @dialyzer \
+ --build_plt \
+ --apps $(shell ls $(shell \
+ erl -eval 'io:format(code:lib_dir()), init:stop().' -noshell) \
+ | grep -v interface \
+ | sed -e 's/-[0-9.]*//' \
+ )
--- /dev/null
+beam_stats
+==========
+
+Periodically collects and pushes VM metrics to arbitrary consumers (Graphite
+and CSV by default).
+
+Essentially like `folsomite`, but better. Better in the following ways:
+
+- More-general: consumers other than graphite can be defined
+- More-focused: only concerned with VM metrics, while `folsomite` ships off
+ _everything_ from `folsom` (in addition to VM metrics)
+- Easier-to-reason-about implementation: well-defined metrics-to-binary
+ conversions, as opposed to the nearly-arbitrary term-to-string conversions
+ used in `folsomite`
+- Spec'd, tested and Dialyzed
--- /dev/null
+1439265086|nonode@nohost|total|19511216
+1439265086|nonode@nohost|processes|5198144
+1439265086|nonode@nohost|processes_used|5198144
+1439265086|nonode@nohost|system|14313072
+1439265086|nonode@nohost|atom|202481
+1439265086|nonode@nohost|atom_used|189359
+1439265086|nonode@nohost|binary|431680
+1439265086|nonode@nohost|code|4447323
+1439265086|nonode@nohost|ets|310096
+1439265146|nonode@nohost|total|19636224
+1439265146|nonode@nohost|processes|5256696
+1439265146|nonode@nohost|processes_used|5256696
+1439265146|nonode@nohost|system|14379528
+1439265146|nonode@nohost|atom|202481
+1439265146|nonode@nohost|atom_used|190107
+1439265146|nonode@nohost|binary|445632
+1439265146|nonode@nohost|code|4495278
+1439265146|nonode@nohost|ets|311248
+2015-08-11 00:17:32.260189|nonode@nohost|total|18823432
+2015-08-11 00:17:32.260189|nonode@nohost|processes|4517464
+2015-08-11 00:17:32.260189|nonode@nohost|processes_used|4517464
+2015-08-11 00:17:32.260189|nonode@nohost|system|14305968
+2015-08-11 00:17:32.260189|nonode@nohost|atom|202481
+2015-08-11 00:17:32.260189|nonode@nohost|atom_used|189163
+2015-08-11 00:17:32.260189|nonode@nohost|binary|435592
+2015-08-11 00:17:32.260189|nonode@nohost|code|4439027
+2015-08-11 00:17:32.260189|nonode@nohost|ets|308784
+2015-08-11 00:18:32.261018|nonode@nohost|total|19010544
+2015-08-11 00:18:32.261018|nonode@nohost|processes|4537640
+2015-08-11 00:18:32.261018|nonode@nohost|processes_used|4537640
+2015-08-11 00:18:32.261018|nonode@nohost|system|14472904
+2015-08-11 00:18:32.261018|nonode@nohost|atom|202481
+2015-08-11 00:18:32.261018|nonode@nohost|atom_used|190558
+2015-08-11 00:18:32.261018|nonode@nohost|binary|499792
+2015-08-11 00:18:32.261018|nonode@nohost|code|4530778
+2015-08-11 00:18:32.261018|nonode@nohost|ets|312400
--- /dev/null
+-record(beam_stats,
+ { timestamp :: erlang:timestamp()
+ , node_id :: atom()
+ , memory :: [{erlang:memory_type(), non_neg_integer()}]
+ %, statistics :: [{atom() , term()}]
+ %, system :: [{atom() , term()}]
+ %, process :: [{atom() , term()}]
+ %, port :: [{atom() , term()}]
+ %, ets :: [{atom() , term()}]
+ %, dets :: [{atom() , term()}]
+ }).
--- /dev/null
+{ deps
+, [ {hope, ".*"}
+ ]
+}.
--- /dev/null
+{ deps
+, [ {hope, ".*", {git, "https://github.com/ibnfirnas/hope.git", {tag, "3.7.0"}}}
+ ]
+}.
--- /dev/null
+{application, beam_stats,
+ [
+ {description, "Periodic VM stats production and consumption."},
+ {vsn, "0.0.0"},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+
+ {mod, { beam_stats_app, []}},
+
+ {env,
+ [ {production_interval , 30000}
+ , {consumers,
+ [ {beam_stats_consumer_graphite,
+ [ {consumption_interval , 60000}
+ , {connect_options,
+ [ {host , "localhost"}
+ , {port , 2003}
+ , {timeout , 5000}
+ ]}
+ ]}
+ , {beam_stats_consumer_csv,
+ [ {consumption_interval , 60000}
+ , {path , "beam_stats.csv"}
+ ]}
+ ]}
+ ]}
+
+ ]}.
--- /dev/null
+-module(beam_stats).
+
+-include("include/beam_stats.hrl").
+
+-export_type(
+ [ t/0
+ ]).
+
+-export(
+ [ collect/0
+ ]).
+
+-define(T, #?MODULE).
+
+-type t() ::
+ ?T{}.
+
+-spec collect() ->
+ t().
+collect() ->
+ ?T
+ { timestamp = os:timestamp()
+ , node_id = erlang:node()
+ , memory = erlang:memory()
+ }.
--- /dev/null
+-define(METHOD_SHOULD_NOT_BE_USED(Method, State),
+ {stop, {method_should_not_be_used, {?MODULE, Method}}, State}
+).
--- /dev/null
+-module(beam_stats_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+ beam_stats_sup:start_link().
+
+stop(_State) ->
+ ok.
--- /dev/null
+-module(beam_stats_config).
+
+-export(
+ [ production_interval/0
+ , consumers/0
+ ]).
+
+-define(APP, beam_stats).
+
+%% ============================================================================
+%% API
+%% ============================================================================
+
+-spec production_interval() ->
+ erlang:time().
+production_interval() ->
+ get_env(production_interval).
+
+-spec consumers() ->
+ [{ConsumerModule :: atom(), ConsumerDefinedOptionsData :: term()}].
+consumers() ->
+ get_env(consumers).
+
+%% ============================================================================
+%% Internal
+%% ============================================================================
+
+-spec get_env(atom()) ->
+ term().
+get_env(Key) ->
+ {ok, Value} = application:get_env(?APP, Key),
+ Value.
--- /dev/null
+-module(beam_stats_consumer).
+
+-include("include/beam_stats.hrl").
+-include( "beam_stats.hrl").
+
+-behaviour(gen_server).
+
+-export_type(
+ [ queue/0
+ ]).
+
+%% Public API
+-export(
+ [ add/2
+ ]).
+
+%% Internal API
+-export(
+ [ start_link/2
+ ]).
+
+%% gen_server callbacks
+-export(
+ [ init/1
+ , handle_call/3
+ , handle_cast/2
+ , handle_info/2
+ , terminate/2
+ , code_change/3
+ ]).
+
+-type queue() ::
+ queue:queue(beam_stats:t()).
+
+%% ============================================================================
+%% Consumer interface
+%% ============================================================================
+
+-callback init(Options :: term()) ->
+ {ConsumptionInterval :: erlang:time(), State :: term()}.
+
+-callback consume(queue(), State) ->
+ State.
+
+-callback terminate(State :: term()) ->
+ {}.
+
+%% ============================================================================
+%% Internal data
+%% ============================================================================
+
+-define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal).
+
+-record(state,
+ { consumer_module :: atom()
+ , consumer_state :: term()
+ , consumption_interval :: erlang:time()
+ , beam_stats_queue :: queue()
+ }).
+
+%% ============================================================================
+%% Public API
+%% ============================================================================
+
+-spec add(atom(), term()) ->
+ supervisor:startchild_ret().
+add(ConsumerModule, ConsumerOptions) ->
+ beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
+
+%% ============================================================================
+%% Internal API
+%% ============================================================================
+
+start_link(ConsumerModule, ConsumerOptions) ->
+ GenServerModule = ?MODULE,
+ GenServerOpts = [],
+ InitArgs = [ConsumerModule, ConsumerOptions],
+ gen_server:start_link(GenServerModule, InitArgs, GenServerOpts).
+
+%% ============================================================================
+%% gen_server callbacks (unused)
+%% ============================================================================
+
+handle_call(_Request, _From, State) ->
+ ?METHOD_SHOULD_NOT_BE_USED(handle_call, State).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ============================================================================
+%% gen_server callbacks
+%% ============================================================================
+
+init([ConsumerModule, ConsumerOptions]) ->
+ {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions),
+ State = #state
+ { consumer_module = ConsumerModule
+ , consumer_state = ConsumerState
+ , consumption_interval = ConsumptionInterval
+ , beam_stats_queue = queue:new()
+ },
+ ok = beam_stats_producer:subscribe(self()),
+ ok = schedule_first_consumption(),
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok = beam_stats_producer:unsubscribe(self()).
+
+handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) ->
+ Q2 = queue:in(BEAMStats, Q1),
+ {noreply, State#state{beam_stats_queue = Q2}}.
+
+handle_info(
+ ?SIGNAL_CONSUMPTION,
+ #state
+ { consumer_module = ConsumerModule
+ , consumer_state = ConsumerState
+ , consumption_interval = ConsumptionInterval
+ , beam_stats_queue = Q
+ }=State1
+) ->
+ State2 = State1#state
+ { consumer_state = ConsumerModule:consume(Q, ConsumerState)
+ , beam_stats_queue = queue:new()
+ },
+ ok = schedule_next_consumption(ConsumptionInterval),
+ {noreply, State2}.
+
+%% ============================================================================
+%% Internal
+%% ============================================================================
+
+-spec schedule_first_consumption() ->
+ ok.
+schedule_first_consumption() ->
+ _ = self() ! ?SIGNAL_CONSUMPTION,
+ ok.
+
+-spec schedule_next_consumption(erlang:time()) ->
+ ok.
+schedule_next_consumption(Time) ->
+ _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION),
+ ok.
--- /dev/null
+-module(beam_stats_consumer_csv).
+
+-include("include/beam_stats.hrl").
+
+-behaviour(beam_stats_consumer).
+
+-export_type(
+ [ option/0
+ ]).
+
+-export(
+ [ init/1
+ , consume/2
+ , terminate/1
+ ]).
+
+-type option() ::
+ {path , file:filename()}
+ | {consumption_interval , erlang:time()}
+ .
+
+-record(state,
+ { path :: file:filename()
+ , file = none :: hope_option:t(file:io_device())
+ }).
+
+-type state() ::
+ #state{}.
+
+-spec init([option()]) ->
+ {erlang:time(), state()}.
+init(Options) ->
+ ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
+ {some, Path} = hope_kv_list:get(Options, path),
+ State = #state
+ { path = Path
+ , file = none
+ },
+ {ConsumptionInterval, State}.
+
+-spec consume(beam_stats_consumer:queue(), state()) ->
+ state().
+consume(Q, #state{}=State1) ->
+ Payload = beam_stats_queue_to_binary(Q),
+ State2 = try_to_open_if_no_file(State1),
+ try_to_write(State2, Payload).
+
+-spec terminate(state()) ->
+ {}.
+terminate(#state{file=FileOpt}) ->
+ ok = hope_option:iter(FileOpt, fun file:close/1),
+ {}.
+
+%% ============================================================================
+
+-spec try_to_write(state(), binary()) ->
+ state().
+try_to_write(#state{file=none, path=Path}=State, _) ->
+ io:format("error: file closed: ~s~n", [Path]),
+ State;
+try_to_write(#state{file={some, File}}=State, Payload) ->
+ case file:write(File, Payload)
+ of ok ->
+ State
+ ; {error, _}=Error ->
+ io:format("error: file:write/2 failed: ~p~n", [Error]),
+ % TODO: Maybe schedule retry?
+ ok = file:close(File),
+ State#state{file=none}
+ end.
+
+-spec try_to_open_if_no_file(state()) ->
+ state().
+try_to_open_if_no_file(#state{file={some, _}}=State) ->
+ State;
+try_to_open_if_no_file(#state{file=none, path=Path}=State) ->
+ case file:open(Path, [append])
+ of {ok, File} ->
+ State#state{file = {some, File}}
+ ; {error, _}=Error ->
+ io:format("error: file:open/2 failed: ~p~n", [Error]),
+ State#state{file = none}
+ end.
+
+-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
+ binary().
+beam_stats_queue_to_binary(BEAMStatsQ) ->
+ Bins = [beam_stats_to_bin(B) || B <- queue:to_list(BEAMStatsQ)],
+ iolist_to_binary(Bins).
+
+
+-spec beam_stats_to_bin(beam_stats:t()) ->
+ binary().
+beam_stats_to_bin(#beam_stats
+ { timestamp = Timestamp
+ , node_id = NodeID
+ , memory = Memory
+ }
+) ->
+ <<TimestampBin/binary>> = timestamp_to_bin(Timestamp),
+ <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
+ PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
+ MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
+ MemoryBins = lists:map(PairToBin, MemoryBinPairs),
+ AllBins =
+ [ MemoryBins
+ ],
+ iolist_to_binary(AllBins).
+
+-spec timestamp_to_bin(erlang:timestamp()) ->
+ binary().
+timestamp_to_bin(Timestamp) ->
+ TimestampFloat = timestamp_to_float(Timestamp),
+ {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(Timestamp),
+ SecondsFloat = Sec + (TimestampFloat - trunc(TimestampFloat)),
+ Fmt2Digits = "~2.10.0b",
+ FmtDate = string:join(["~b" , Fmt2Digits, Fmt2Digits], "-"),
+ FmtTime = string:join([Fmt2Digits, Fmt2Digits, "~9..0f" ], ":"),
+ Separator = " ",
+ Fmt = FmtDate ++ Separator ++ FmtTime,
+ IOList = io_lib:format(Fmt, [Year, Month, Day, Hour, Min, SecondsFloat]),
+ iolist_to_binary(IOList).
+
+-spec timestamp_to_float(erlang:timestamp()) ->
+ float().
+timestamp_to_float({ComponentMega, ComponentWhole, ComponentMicro}) ->
+ OneMillion = 1000000,
+ TotalWholeSeconds = ComponentMega * OneMillion + ComponentWhole,
+ TotalMicroSeconds = (TotalWholeSeconds * OneMillion) + ComponentMicro,
+ TotalMicroSeconds / OneMillion.
+
+-spec make_pair_to_bin(binary(), binary()) ->
+ fun(({binary(), binary()}) -> binary()).
+make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
+ fun ({<<K/binary>>, <<V/binary>>}) ->
+ << TimestampBin/binary
+ , "|"
+ , NodeID/binary
+ , "|"
+ , K/binary
+ , "|"
+ , V/binary
+ , "\n"
+ >>
+ end.
+
+-spec node_id_to_bin(node()) ->
+ binary().
+node_id_to_bin(NodeID) ->
+ atom_to_binary(NodeID, utf8).
+
+-spec atom_int_to_bin_bin({atom(), integer()}) ->
+ {binary(), binary()}.
+atom_int_to_bin_bin({K, V}) ->
+ {atom_to_binary(K, latin1), integer_to_binary(V)}.
--- /dev/null
+-module(beam_stats_consumer_graphite).
+
+-include("include/beam_stats.hrl").
+
+-behaviour(beam_stats_consumer).
+
+-export_type(
+ [ option/0
+ , connect_option/0
+ ]).
+
+-export(
+ [ init/1
+ , consume/2
+ , terminate/1
+ ]).
+
+-type connect_option() ::
+ {host , inet:ip_address() | inet:hostname()}
+ | {port , inet:port_number()}
+ | {timeout , timeout()}
+ .
+
+-type option() ::
+ {connect_options , [connect_option()]}
+ | {consumption_interval , erlang:time()}
+ .
+
+-record(state,
+ { connect_options = [] :: [connect_option()]
+ , sock = none :: hope_option:t(gen_tcp:socket())
+ }).
+
+-type state() ::
+ #state{}.
+
+-define(GRAPHITE_PATH_PREFIX, "beam_stats").
+
+-spec init([option()]) ->
+ {erlang:time(), state()}.
+init(Options) ->
+ ConnectOptions = hope_kv_list:get(Options, connect_options , []),
+ ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
+ State = #state
+ { connect_options = ConnectOptions
+ , sock = none
+ },
+ {ConsumptionInterval, State}.
+
+-spec consume(beam_stats_consumer:queue(), state()) ->
+ state().
+consume(Q, #state{}=State1) ->
+ Payload = beam_stats_queue_to_binary(Q),
+ State2 = try_to_connect_if_no_socket(State1),
+ try_to_send(State2, Payload).
+
+-spec terminate(state()) ->
+ {}.
+terminate(#state{sock=SockOpt}) ->
+ ok = hope_option:iter(SockOpt, fun gen_tcp:close/1),
+ {}.
+
+%% ============================================================================
+
+-spec try_to_send(state(), binary()) ->
+ state().
+try_to_send(#state{sock=none}=State, _) ->
+ io:format("error: socket closed~n"),
+ % TODO: Maybe schedule retry?
+ State;
+try_to_send(#state{sock={some, Sock}}=State, Payload) ->
+ case gen_tcp:send(Sock, Payload)
+ of ok ->
+ State
+ ; {error, _}=Error ->
+ io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]),
+ % TODO: Maybe schedule retry?
+ ok = gen_tcp:close(Sock),
+ State#state{sock=none}
+ end.
+
+-spec try_to_connect_if_no_socket(state()) ->
+ state().
+try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
+ State;
+try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) ->
+ DefaultHost = "localhost",
+ DefaultPort = 2003,
+ DefaultTimeout = 5000,
+ Host = hope_kv_list:get(Options, host , DefaultHost),
+ Port = hope_kv_list:get(Options, port , DefaultPort),
+ Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout),
+ case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout)
+ of {ok, Sock} ->
+ State#state{sock = {some, Sock}}
+ ; {error, _}=Error ->
+ io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]),
+ State#state{sock = none}
+ end.
+
+-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
+ binary().
+beam_stats_queue_to_binary(Q) ->
+ Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)],
+ iolist_to_binary(Bins).
+
+-spec beam_stats_to_bin(beam_stats:t()) ->
+ binary().
+beam_stats_to_bin(#beam_stats
+ { timestamp = Timestamp
+ , node_id = NodeID
+ , memory = Memory
+ }
+) ->
+ TimestampInt = timestamp_to_integer(Timestamp),
+ TimestampBin = integer_to_binary(TimestampInt),
+ <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
+ PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
+ MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
+ MemoryBins = lists:map(PairToBin, MemoryBinPairs),
+ AllBins =
+ [ MemoryBins
+ ],
+ iolist_to_binary(AllBins).
+
+-spec timestamp_to_integer(erlang:timestamp()) ->
+ non_neg_integer().
+timestamp_to_integer({Megaseconds, Seconds, _}) ->
+ Megaseconds * 1000000 + Seconds.
+
+-spec make_pair_to_bin(binary(), binary()) ->
+ fun(({binary(), binary()}) -> binary()).
+make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
+ fun ({<<K/binary>>, <<V/binary>>}) ->
+ << ?GRAPHITE_PATH_PREFIX
+ , "."
+ , NodeID/binary
+ , "."
+ , K/binary
+ , " "
+ , V/binary
+ , " "
+ , TimestampBin/binary
+ , "\n"
+ >>
+ end.
+
+-spec node_id_to_bin(node()) ->
+ binary().
+node_id_to_bin(NodeID) ->
+ NodeIDBin = atom_to_binary(NodeID, utf8),
+ re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
+
+-spec atom_int_to_bin_bin({atom(), integer()}) ->
+ {binary(), binary()}.
+atom_int_to_bin_bin({K, V}) ->
+ {atom_to_binary(K, latin1), integer_to_binary(V)}.
--- /dev/null
+-module(beam_stats_producer).
+
+-include("beam_stats.hrl").
+
+-behaviour(gen_server).
+
+%% API
+-export(
+ [ start_link/0
+ , subscribe/1
+ , unsubscribe/1
+ ]).
+
+%% gen_server callbacks
+-export(
+ [ init/1
+ , handle_call/3
+ , handle_cast/2
+ , handle_info/2
+ , terminate/2
+ , code_change/3
+ ]).
+
+%% ============================================================================
+%% Internal data
+%% ============================================================================
+
+-define(SIGNAL_PRODUCTION , beam_stats_production_signal).
+
+-record(state,
+ { consumers = ordsets:new() :: ordsets:ordset(pid())
+ }).
+
+%% ============================================================================
+%% API
+%% ============================================================================
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec subscribe(pid()) ->
+ ok.
+subscribe(PID) ->
+ gen_server:cast(?MODULE, {subscribe, PID}).
+
+-spec unsubscribe(pid()) ->
+ ok.
+unsubscribe(PID) ->
+ gen_server:cast(?MODULE, {unsubscribe, PID}).
+
+%% ============================================================================
+%% gen_server callbacks (unused)
+%% ============================================================================
+
+handle_call(_Request, _From, State) ->
+ ?METHOD_SHOULD_NOT_BE_USED(handle_call, State).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+%% ============================================================================
+%% gen_server callbacks
+%% ============================================================================
+
+init([]) ->
+ ok = schedule_first_production(),
+ Consumers = ordsets:new(),
+ {ok, #state{consumers=Consumers}}.
+
+handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
+ Consumers2 = ordsets:add_element(PID, Consumers1),
+ {noreply, State#state{consumers=Consumers2}};
+
+handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) ->
+ Consumers2 = ordsets:del_element(PID, Consumers1),
+ {noreply, State#state{consumers=Consumers2}}.
+
+handle_info(?SIGNAL_PRODUCTION, #state{consumers=ConsumersSet}=State) ->
+ ConsumersList = ordsets:to_list(ConsumersSet),
+ ok = collect_and_push_to_consumers(ConsumersList),
+ ok = schedule_next_production(),
+ {noreply, State}.
+
+%% ============================================================================
+%% Private
+%% ============================================================================
+
+-spec schedule_first_production() ->
+ ok.
+schedule_first_production() ->
+ _ = self() ! ?SIGNAL_PRODUCTION,
+ ok.
+
+-spec schedule_next_production() ->
+ ok.
+schedule_next_production() ->
+ ProductionInterval = beam_stats_config:production_interval(),
+ _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION),
+ ok.
+
+-spec collect_and_push_to_consumers([pid()]) ->
+ ok.
+collect_and_push_to_consumers(Consumers) ->
+ BEAMStats = beam_stats:collect(),
+ Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end,
+ lists:foreach(Push, Consumers).
--- /dev/null
+-module(beam_stats_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(Type, Module),
+ {Module, {Module, start_link, []}, permanent, 5000, Type, [Module]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+ Children =
+ [ ?CHILD(worker , beam_stats_producer)
+ , ?CHILD(supervisor , beam_stats_sup_consumers)
+ ],
+ SupFlags = {one_for_one, 5, 10},
+ {ok, {SupFlags, Children}}.
--- /dev/null
+-module(beam_stats_sup_consumers).
+
+-behaviour(supervisor).
+
+%% API
+-export(
+ [ start_link/0
+ , start_child/2
+ ]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(Type, Module, Args),
+ {make_ref(), {Module, start_link, Args}, permanent, 5000, Type, [Module]}).
+
+%% ===================================================================
+%% API
+%% ===================================================================
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_child(Module, Options) ->
+ Child = ?CHILD(worker, beam_stats_consumer, [Module, Options]),
+ supervisor:start_child(?MODULE, Child).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+ Consumers = beam_stats_config:consumers(),
+ ConsumerSpecToChild =
+ fun ({Module, Options}) ->
+ ?CHILD(worker, beam_stats_consumer, [Module, Options])
+ end,
+ Children = lists:map(ConsumerSpecToChild, Consumers),
+ SupFlags = {one_for_one, 5, 10},
+ {ok, {SupFlags, Children}}.