WIP
authorSiraaj Khandkar <siraaj@khandkar.net>
Tue, 11 Aug 2015 05:05:20 +0000 (01:05 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Tue, 11 Aug 2015 05:31:32 +0000 (01:31 -0400)
19 files changed:
.gitignore [new file with mode: 0644]
Makefile [new file with mode: 0644]
README.md [new file with mode: 0644]
beam_stats.csv [new file with mode: 0644]
include/beam_stats.hrl [new file with mode: 0644]
rebar [new file with mode: 0755]
rebar.config [new file with mode: 0644]
rebar_test_build.config [new file with mode: 0644]
src/beam_stats.app.src [new file with mode: 0644]
src/beam_stats.erl [new file with mode: 0644]
src/beam_stats.hrl [new file with mode: 0644]
src/beam_stats_app.erl [new file with mode: 0644]
src/beam_stats_config.erl [new file with mode: 0644]
src/beam_stats_consumer.erl [new file with mode: 0644]
src/beam_stats_consumer_csv.erl [new file with mode: 0644]
src/beam_stats_consumer_graphite.erl [new file with mode: 0644]
src/beam_stats_producer.erl [new file with mode: 0644]
src/beam_stats_sup.erl [new file with mode: 0644]
src/beam_stats_sup_consumers.erl [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..1204ed7
--- /dev/null
@@ -0,0 +1,2 @@
+deps/
+ebin/
diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..338f685
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,52 @@
+REBAR := ./rebar --config rebar_test_build.config
+
+.PHONY: \
+       all \
+       clean_all \
+       clean_app \
+       compile_all \
+       compile_app \
+       deps \
+       deps_get \
+       deps_update \
+       dialyze \
+       dialyzer_blt_build
+
+all: \
+       clean_all \
+       deps_get \
+       compile_all
+
+deps_get:
+       @$(REBAR) get-deps
+
+deps_update:
+       @$(REBAR) update-deps
+
+deps: \
+       deps_get \
+       deps_update
+
+compile_all:
+       $(REBAR) compile skip_deps=false
+
+compile_app:
+       $(REBAR) compile skip_deps=true
+
+clean_all:
+       $(REBAR) clean skip_deps=false
+
+clean_app:
+       $(REBAR) clean skip_deps=true
+
+dialyze:
+       @dialyzer deps/*/ebin ebin
+
+dialyzer_blt_build:
+       @dialyzer \
+               --build_plt \
+               --apps $(shell ls $(shell \
+                       erl -eval 'io:format(code:lib_dir()), init:stop().' -noshell) \
+                       | grep -v interface \
+                       | sed -e 's/-[0-9.]*//' \
+               )
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..12e2e3c
--- /dev/null
+++ b/README.md
@@ -0,0 +1,15 @@
+beam_stats
+==========
+
+Periodically collects and pushes VM metrics to arbitrary consumers (Graphite
+and CSV by default).
+
+Essentially like `folsomite`, but better. Better in the following ways:
+
+- More-general: consumers other than graphite can be defined
+- More-focused: only concerned with VM metrics, while `folsomite` ships off
+  _everything_ from `folsom` (in addition to VM metrics)
+- Easier-to-reason-about implementation: well-defined metrics-to-binary
+  conversions, as opposed to the nearly-arbitrary term-to-string conversions
+  used in `folsomite`
+- Spec'd, tested and Dialyzed
diff --git a/beam_stats.csv b/beam_stats.csv
new file mode 100644 (file)
index 0000000..37d2b5d
--- /dev/null
@@ -0,0 +1,36 @@
+1439265086|nonode@nohost|total|19511216
+1439265086|nonode@nohost|processes|5198144
+1439265086|nonode@nohost|processes_used|5198144
+1439265086|nonode@nohost|system|14313072
+1439265086|nonode@nohost|atom|202481
+1439265086|nonode@nohost|atom_used|189359
+1439265086|nonode@nohost|binary|431680
+1439265086|nonode@nohost|code|4447323
+1439265086|nonode@nohost|ets|310096
+1439265146|nonode@nohost|total|19636224
+1439265146|nonode@nohost|processes|5256696
+1439265146|nonode@nohost|processes_used|5256696
+1439265146|nonode@nohost|system|14379528
+1439265146|nonode@nohost|atom|202481
+1439265146|nonode@nohost|atom_used|190107
+1439265146|nonode@nohost|binary|445632
+1439265146|nonode@nohost|code|4495278
+1439265146|nonode@nohost|ets|311248
+2015-08-11 00:17:32.260189|nonode@nohost|total|18823432
+2015-08-11 00:17:32.260189|nonode@nohost|processes|4517464
+2015-08-11 00:17:32.260189|nonode@nohost|processes_used|4517464
+2015-08-11 00:17:32.260189|nonode@nohost|system|14305968
+2015-08-11 00:17:32.260189|nonode@nohost|atom|202481
+2015-08-11 00:17:32.260189|nonode@nohost|atom_used|189163
+2015-08-11 00:17:32.260189|nonode@nohost|binary|435592
+2015-08-11 00:17:32.260189|nonode@nohost|code|4439027
+2015-08-11 00:17:32.260189|nonode@nohost|ets|308784
+2015-08-11 00:18:32.261018|nonode@nohost|total|19010544
+2015-08-11 00:18:32.261018|nonode@nohost|processes|4537640
+2015-08-11 00:18:32.261018|nonode@nohost|processes_used|4537640
+2015-08-11 00:18:32.261018|nonode@nohost|system|14472904
+2015-08-11 00:18:32.261018|nonode@nohost|atom|202481
+2015-08-11 00:18:32.261018|nonode@nohost|atom_used|190558
+2015-08-11 00:18:32.261018|nonode@nohost|binary|499792
+2015-08-11 00:18:32.261018|nonode@nohost|code|4530778
+2015-08-11 00:18:32.261018|nonode@nohost|ets|312400
diff --git a/include/beam_stats.hrl b/include/beam_stats.hrl
new file mode 100644 (file)
index 0000000..6b4df10
--- /dev/null
@@ -0,0 +1,11 @@
+-record(beam_stats,
+    { timestamp    :: erlang:timestamp()
+    , node_id      :: atom()
+    , memory       :: [{erlang:memory_type(), non_neg_integer()}]
+    %, statistics   :: [{atom()       , term()}]
+    %, system       :: [{atom()       , term()}]
+    %, process      :: [{atom()       , term()}]
+    %, port         :: [{atom()       , term()}]
+    %, ets          :: [{atom()       , term()}]
+    %, dets         :: [{atom()       , term()}]
+    }).
diff --git a/rebar b/rebar
new file mode 100755 (executable)
index 0000000..49711dc
Binary files /dev/null and b/rebar differ
diff --git a/rebar.config b/rebar.config
new file mode 100644 (file)
index 0000000..8d9caba
--- /dev/null
@@ -0,0 +1,4 @@
+{ deps
+, [ {hope, ".*"}
+  ]
+}.
diff --git a/rebar_test_build.config b/rebar_test_build.config
new file mode 100644 (file)
index 0000000..b65a1df
--- /dev/null
@@ -0,0 +1,4 @@
+{ deps
+, [ {hope, ".*", {git, "https://github.com/ibnfirnas/hope.git", {tag, "3.7.0"}}}
+  ]
+}.
diff --git a/src/beam_stats.app.src b/src/beam_stats.app.src
new file mode 100644 (file)
index 0000000..1750455
--- /dev/null
@@ -0,0 +1,31 @@
+{application, beam_stats,
+ [
+  {description, "Periodic VM stats production and consumption."},
+  {vsn, "0.0.0"},
+  {registered, []},
+  {applications, [
+                  kernel,
+                  stdlib
+                 ]},
+
+  {mod, { beam_stats_app, []}},
+
+  {env,
+    [ {production_interval , 30000}
+    , {consumers,
+        [ {beam_stats_consumer_graphite,
+            [ {consumption_interval , 60000}
+            , {connect_options,
+                [ {host    , "localhost"}
+                , {port    , 2003}
+                , {timeout , 5000}
+                ]}
+            ]}
+        , {beam_stats_consumer_csv,
+            [ {consumption_interval , 60000}
+            , {path                 , "beam_stats.csv"}
+            ]}
+        ]}
+    ]}
+
+ ]}.
diff --git a/src/beam_stats.erl b/src/beam_stats.erl
new file mode 100644 (file)
index 0000000..e388396
--- /dev/null
@@ -0,0 +1,25 @@
+-module(beam_stats).
+
+-include("include/beam_stats.hrl").
+
+-export_type(
+    [ t/0
+    ]).
+
+-export(
+    [ collect/0
+    ]).
+
+-define(T, #?MODULE).
+
+-type t() ::
+    ?T{}.
+
+-spec collect() ->
+    t().
+collect() ->
+    ?T
+    { timestamp = os:timestamp()
+    , node_id   = erlang:node()
+    , memory    = erlang:memory()
+    }.
diff --git a/src/beam_stats.hrl b/src/beam_stats.hrl
new file mode 100644 (file)
index 0000000..c7df52a
--- /dev/null
@@ -0,0 +1,3 @@
+-define(METHOD_SHOULD_NOT_BE_USED(Method, State),
+    {stop, {method_should_not_be_used, {?MODULE, Method}}, State}
+).
diff --git a/src/beam_stats_app.erl b/src/beam_stats_app.erl
new file mode 100644 (file)
index 0000000..c72415c
--- /dev/null
@@ -0,0 +1,16 @@
+-module(beam_stats_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+    beam_stats_sup:start_link().
+
+stop(_State) ->
+    ok.
diff --git a/src/beam_stats_config.erl b/src/beam_stats_config.erl
new file mode 100644 (file)
index 0000000..68a44ed
--- /dev/null
@@ -0,0 +1,32 @@
+-module(beam_stats_config).
+
+-export(
+    [ production_interval/0
+    , consumers/0
+    ]).
+
+-define(APP, beam_stats).
+
+%% ============================================================================
+%%  API
+%% ============================================================================
+
+-spec production_interval() ->
+    erlang:time().
+production_interval() ->
+    get_env(production_interval).
+
+-spec consumers() ->
+    [{ConsumerModule :: atom(), ConsumerDefinedOptionsData :: term()}].
+consumers() ->
+    get_env(consumers).
+
+%% ============================================================================
+%%  Internal
+%% ============================================================================
+
+-spec get_env(atom()) ->
+    term().
+get_env(Key) ->
+    {ok, Value} = application:get_env(?APP, Key),
+    Value.
diff --git a/src/beam_stats_consumer.erl b/src/beam_stats_consumer.erl
new file mode 100644 (file)
index 0000000..8d25fa4
--- /dev/null
@@ -0,0 +1,143 @@
+-module(beam_stats_consumer).
+
+-include("include/beam_stats.hrl").
+-include(        "beam_stats.hrl").
+
+-behaviour(gen_server).
+
+-export_type(
+    [ queue/0
+    ]).
+
+%% Public API
+-export(
+    [ add/2
+    ]).
+
+%% Internal API
+-export(
+    [ start_link/2
+    ]).
+
+%% gen_server callbacks
+-export(
+    [ init/1
+    , handle_call/3
+    , handle_cast/2
+    , handle_info/2
+    , terminate/2
+    , code_change/3
+    ]).
+
+-type queue() ::
+    queue:queue(beam_stats:t()).
+
+%% ============================================================================
+%%  Consumer interface
+%% ============================================================================
+
+-callback init(Options :: term()) ->
+    {ConsumptionInterval :: erlang:time(), State :: term()}.
+
+-callback consume(queue(), State) ->
+    State.
+
+-callback terminate(State :: term()) ->
+    {}.
+
+%% ============================================================================
+%% Internal data
+%% ============================================================================
+
+-define(SIGNAL_CONSUMPTION , beam_stats_consumption_signal).
+
+-record(state,
+    { consumer_module      :: atom()
+    , consumer_state       :: term()
+    , consumption_interval :: erlang:time()
+    , beam_stats_queue     :: queue()
+    }).
+
+%% ============================================================================
+%%  Public API
+%% ============================================================================
+
+-spec add(atom(), term()) ->
+    supervisor:startchild_ret().
+add(ConsumerModule, ConsumerOptions) ->
+    beam_stats_sup_consumers:start_child(ConsumerModule, ConsumerOptions).
+
+%% ============================================================================
+%%  Internal API
+%% ============================================================================
+
+start_link(ConsumerModule, ConsumerOptions) ->
+    GenServerModule = ?MODULE,
+    GenServerOpts   = [],
+    InitArgs        = [ConsumerModule, ConsumerOptions],
+    gen_server:start_link(GenServerModule, InitArgs, GenServerOpts).
+
+%% ============================================================================
+%%  gen_server callbacks (unused)
+%% ============================================================================
+
+handle_call(_Request, _From, State) ->
+    ?METHOD_SHOULD_NOT_BE_USED(handle_call, State).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%% ============================================================================
+%%  gen_server callbacks
+%% ============================================================================
+
+init([ConsumerModule, ConsumerOptions]) ->
+    {ConsumptionInterval, ConsumerState} = ConsumerModule:init(ConsumerOptions),
+    State = #state
+        { consumer_module      = ConsumerModule
+        , consumer_state       = ConsumerState
+        , consumption_interval = ConsumptionInterval
+        , beam_stats_queue     = queue:new()
+        },
+    ok = beam_stats_producer:subscribe(self()),
+    ok = schedule_first_consumption(),
+    {ok, State}.
+
+terminate(_Reason, _State) ->
+    ok = beam_stats_producer:unsubscribe(self()).
+
+handle_cast(#beam_stats{}=BEAMStats, #state{beam_stats_queue=Q1}=State) ->
+    Q2 = queue:in(BEAMStats, Q1),
+    {noreply, State#state{beam_stats_queue = Q2}}.
+
+handle_info(
+    ?SIGNAL_CONSUMPTION,
+    #state
+    { consumer_module      = ConsumerModule
+    , consumer_state       = ConsumerState
+    , consumption_interval = ConsumptionInterval
+    , beam_stats_queue     = Q
+    }=State1
+) ->
+    State2 = State1#state
+        { consumer_state   = ConsumerModule:consume(Q, ConsumerState)
+        , beam_stats_queue = queue:new()
+        },
+    ok = schedule_next_consumption(ConsumptionInterval),
+    {noreply, State2}.
+
+%% ============================================================================
+%%  Internal
+%% ============================================================================
+
+-spec schedule_first_consumption() ->
+    ok.
+schedule_first_consumption() ->
+    _ = self() ! ?SIGNAL_CONSUMPTION,
+    ok.
+
+-spec schedule_next_consumption(erlang:time()) ->
+    ok.
+schedule_next_consumption(Time) ->
+    _ = erlang:send_after(Time, self(), ?SIGNAL_CONSUMPTION),
+    ok.
diff --git a/src/beam_stats_consumer_csv.erl b/src/beam_stats_consumer_csv.erl
new file mode 100644 (file)
index 0000000..c00c28f
--- /dev/null
@@ -0,0 +1,155 @@
+-module(beam_stats_consumer_csv).
+
+-include("include/beam_stats.hrl").
+
+-behaviour(beam_stats_consumer).
+
+-export_type(
+    [ option/0
+    ]).
+
+-export(
+    [ init/1
+    , consume/2
+    , terminate/1
+    ]).
+
+-type option() ::
+      {path                 , file:filename()}
+    | {consumption_interval , erlang:time()}
+    .
+
+-record(state,
+    { path        :: file:filename()
+    , file = none :: hope_option:t(file:io_device())
+    }).
+
+-type state() ::
+    #state{}.
+
+-spec init([option()]) ->
+    {erlang:time(), state()}.
+init(Options) ->
+    ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
+    {some, Path} = hope_kv_list:get(Options, path),
+    State = #state
+        { path = Path
+        , file = none
+        },
+    {ConsumptionInterval, State}.
+
+-spec consume(beam_stats_consumer:queue(), state()) ->
+    state().
+consume(Q, #state{}=State1) ->
+    Payload = beam_stats_queue_to_binary(Q),
+    State2 = try_to_open_if_no_file(State1),
+    try_to_write(State2, Payload).
+
+-spec terminate(state()) ->
+    {}.
+terminate(#state{file=FileOpt}) ->
+    ok = hope_option:iter(FileOpt, fun file:close/1),
+    {}.
+
+%% ============================================================================
+
+-spec try_to_write(state(), binary()) ->
+    state().
+try_to_write(#state{file=none, path=Path}=State, _) ->
+    io:format("error: file closed: ~s~n", [Path]),
+    State;
+try_to_write(#state{file={some, File}}=State, Payload) ->
+    case file:write(File, Payload)
+    of  ok ->
+            State
+    ;   {error, _}=Error ->
+            io:format("error: file:write/2 failed: ~p~n", [Error]),
+            % TODO: Maybe schedule retry?
+            ok = file:close(File),
+            State#state{file=none}
+    end.
+
+-spec try_to_open_if_no_file(state()) ->
+    state().
+try_to_open_if_no_file(#state{file={some, _}}=State) ->
+    State;
+try_to_open_if_no_file(#state{file=none, path=Path}=State) ->
+    case file:open(Path, [append])
+    of  {ok, File} ->
+            State#state{file = {some, File}}
+    ;   {error, _}=Error ->
+            io:format("error: file:open/2 failed: ~p~n", [Error]),
+            State#state{file = none}
+    end.
+
+-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
+    binary().
+beam_stats_queue_to_binary(BEAMStatsQ) ->
+    Bins = [beam_stats_to_bin(B) || B <- queue:to_list(BEAMStatsQ)],
+    iolist_to_binary(Bins).
+
+
+-spec beam_stats_to_bin(beam_stats:t()) ->
+    binary().
+beam_stats_to_bin(#beam_stats
+    { timestamp = Timestamp
+    , node_id   = NodeID
+    , memory    = Memory
+    }
+) ->
+    <<TimestampBin/binary>> = timestamp_to_bin(Timestamp),
+    <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
+    PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
+    MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
+    MemoryBins     = lists:map(PairToBin, MemoryBinPairs),
+    AllBins =
+        [ MemoryBins
+        ],
+    iolist_to_binary(AllBins).
+
+-spec timestamp_to_bin(erlang:timestamp()) ->
+    binary().
+timestamp_to_bin(Timestamp) ->
+    TimestampFloat = timestamp_to_float(Timestamp),
+    {{Year, Month, Day}, {Hour, Min, Sec}} = calendar:now_to_local_time(Timestamp),
+    SecondsFloat = Sec + (TimestampFloat - trunc(TimestampFloat)),
+    Fmt2Digits = "~2.10.0b",
+    FmtDate = string:join(["~b"      , Fmt2Digits, Fmt2Digits], "-"),
+    FmtTime = string:join([Fmt2Digits, Fmt2Digits, "~9..0f"  ], ":"),
+    Separator = " ",
+    Fmt = FmtDate ++ Separator ++ FmtTime,
+    IOList = io_lib:format(Fmt, [Year, Month, Day, Hour, Min, SecondsFloat]),
+    iolist_to_binary(IOList).
+
+-spec timestamp_to_float(erlang:timestamp()) ->
+    float().
+timestamp_to_float({ComponentMega, ComponentWhole, ComponentMicro}) ->
+    OneMillion = 1000000,
+    TotalWholeSeconds = ComponentMega * OneMillion + ComponentWhole,
+    TotalMicroSeconds = (TotalWholeSeconds * OneMillion) + ComponentMicro,
+    TotalMicroSeconds / OneMillion.
+
+-spec make_pair_to_bin(binary(), binary()) ->
+    fun(({binary(), binary()}) -> binary()).
+make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
+    fun ({<<K/binary>>, <<V/binary>>}) ->
+        << TimestampBin/binary
+         , "|"
+         , NodeID/binary
+         , "|"
+         , K/binary
+         , "|"
+         , V/binary
+         , "\n"
+        >>
+    end.
+
+-spec node_id_to_bin(node()) ->
+    binary().
+node_id_to_bin(NodeID) ->
+    atom_to_binary(NodeID, utf8).
+
+-spec atom_int_to_bin_bin({atom(), integer()}) ->
+    {binary(), binary()}.
+atom_int_to_bin_bin({K, V}) ->
+    {atom_to_binary(K, latin1), integer_to_binary(V)}.
diff --git a/src/beam_stats_consumer_graphite.erl b/src/beam_stats_consumer_graphite.erl
new file mode 100644 (file)
index 0000000..c60c6e8
--- /dev/null
@@ -0,0 +1,157 @@
+-module(beam_stats_consumer_graphite).
+
+-include("include/beam_stats.hrl").
+
+-behaviour(beam_stats_consumer).
+
+-export_type(
+    [         option/0
+    , connect_option/0
+    ]).
+
+-export(
+    [ init/1
+    , consume/2
+    , terminate/1
+    ]).
+
+-type connect_option() ::
+      {host    , inet:ip_address() | inet:hostname()}
+    | {port    , inet:port_number()}
+    | {timeout , timeout()}
+    .
+
+-type option() ::
+      {connect_options      , [connect_option()]}
+    | {consumption_interval , erlang:time()}
+    .
+
+-record(state,
+    { connect_options = []   :: [connect_option()]
+    , sock            = none :: hope_option:t(gen_tcp:socket())
+    }).
+
+-type state() ::
+    #state{}.
+
+-define(GRAPHITE_PATH_PREFIX, "beam_stats").
+
+-spec init([option()]) ->
+    {erlang:time(), state()}.
+init(Options) ->
+    ConnectOptions      = hope_kv_list:get(Options, connect_options     , []),
+    ConsumptionInterval = hope_kv_list:get(Options, consumption_interval, 60000),
+    State = #state
+        { connect_options = ConnectOptions
+        , sock            = none
+        },
+    {ConsumptionInterval, State}.
+
+-spec consume(beam_stats_consumer:queue(), state()) ->
+    state().
+consume(Q, #state{}=State1) ->
+    Payload = beam_stats_queue_to_binary(Q),
+    State2 = try_to_connect_if_no_socket(State1),
+    try_to_send(State2, Payload).
+
+-spec terminate(state()) ->
+    {}.
+terminate(#state{sock=SockOpt}) ->
+    ok = hope_option:iter(SockOpt, fun gen_tcp:close/1),
+    {}.
+
+%% ============================================================================
+
+-spec try_to_send(state(), binary()) ->
+    state().
+try_to_send(#state{sock=none}=State, _) ->
+    io:format("error: socket closed~n"),
+    % TODO: Maybe schedule retry?
+    State;
+try_to_send(#state{sock={some, Sock}}=State, Payload) ->
+    case gen_tcp:send(Sock, Payload)
+    of  ok ->
+            State
+    ;   {error, _}=Error ->
+            io:format("error: gen_tcp:send/2 failed: ~p~n", [Error]),
+            % TODO: Maybe schedule retry?
+            ok = gen_tcp:close(Sock),
+            State#state{sock=none}
+    end.
+
+-spec try_to_connect_if_no_socket(state()) ->
+    state().
+try_to_connect_if_no_socket(#state{sock={some, _}}=State) ->
+    State;
+try_to_connect_if_no_socket(#state{sock=none, connect_options=Options}=State) ->
+    DefaultHost    = "localhost",
+    DefaultPort    = 2003,
+    DefaultTimeout = 5000,
+    Host    = hope_kv_list:get(Options, host   , DefaultHost),
+    Port    = hope_kv_list:get(Options, port   , DefaultPort),
+    Timeout = hope_kv_list:get(Options, timeout, DefaultTimeout),
+    case gen_tcp:connect(Host, Port, [binary, {active, false}], Timeout)
+    of  {ok, Sock} ->
+            State#state{sock = {some, Sock}}
+    ;   {error, _}=Error ->
+            io:format("error: gen_tcp:connect/4 failed: ~p~n", [Error]),
+            State#state{sock = none}
+    end.
+
+-spec beam_stats_queue_to_binary(beam_stats_consumer:queue()) ->
+    binary().
+beam_stats_queue_to_binary(Q) ->
+    Bins = [beam_stats_to_bin(B) || B <- queue:to_list(Q)],
+    iolist_to_binary(Bins).
+
+-spec beam_stats_to_bin(beam_stats:t()) ->
+    binary().
+beam_stats_to_bin(#beam_stats
+    { timestamp = Timestamp
+    , node_id   = NodeID
+    , memory    = Memory
+    }
+) ->
+    TimestampInt = timestamp_to_integer(Timestamp),
+    TimestampBin = integer_to_binary(TimestampInt),
+    <<NodeIDBin/binary>> = node_id_to_bin(NodeID),
+    PairToBin = make_pair_to_bin(NodeIDBin, TimestampBin),
+    MemoryBinPairs = lists:map(fun atom_int_to_bin_bin/1, Memory),
+    MemoryBins     = lists:map(PairToBin, MemoryBinPairs),
+    AllBins =
+        [ MemoryBins
+        ],
+    iolist_to_binary(AllBins).
+
+-spec timestamp_to_integer(erlang:timestamp()) ->
+    non_neg_integer().
+timestamp_to_integer({Megaseconds, Seconds, _}) ->
+    Megaseconds * 1000000 + Seconds.
+
+-spec make_pair_to_bin(binary(), binary()) ->
+    fun(({binary(), binary()}) -> binary()).
+make_pair_to_bin(<<NodeID/binary>>, <<TimestampBin/binary>>) ->
+    fun ({<<K/binary>>, <<V/binary>>}) ->
+        << ?GRAPHITE_PATH_PREFIX
+         , "."
+         , NodeID/binary
+         , "."
+         , K/binary
+         , " "
+         , V/binary
+         , " "
+         , TimestampBin/binary
+         , "\n"
+        >>
+    end.
+
+-spec node_id_to_bin(node()) ->
+    binary().
+node_id_to_bin(NodeID) ->
+    NodeIDBin = atom_to_binary(NodeID, utf8),
+    re:replace(NodeIDBin, "[\@\.]", "_", [global, {return, binary}]).
+
+-spec atom_int_to_bin_bin({atom(), integer()}) ->
+    {binary(), binary()}.
+atom_int_to_bin_bin({K, V}) ->
+    {atom_to_binary(K, latin1), integer_to_binary(V)}.
diff --git a/src/beam_stats_producer.erl b/src/beam_stats_producer.erl
new file mode 100644 (file)
index 0000000..5c47951
--- /dev/null
@@ -0,0 +1,109 @@
+-module(beam_stats_producer).
+
+-include("beam_stats.hrl").
+
+-behaviour(gen_server).
+
+%% API
+-export(
+    [ start_link/0
+    ,   subscribe/1
+    , unsubscribe/1
+    ]).
+
+%% gen_server callbacks
+-export(
+    [ init/1
+    , handle_call/3
+    , handle_cast/2
+    , handle_info/2
+    , terminate/2
+    , code_change/3
+    ]).
+
+%% ============================================================================
+%% Internal data
+%% ============================================================================
+
+-define(SIGNAL_PRODUCTION , beam_stats_production_signal).
+
+-record(state,
+    { consumers = ordsets:new() :: ordsets:ordset(pid())
+    }).
+
+%% ============================================================================
+%%  API
+%% ============================================================================
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec subscribe(pid()) ->
+    ok.
+subscribe(PID) ->
+    gen_server:cast(?MODULE, {subscribe, PID}).
+
+-spec unsubscribe(pid()) ->
+    ok.
+unsubscribe(PID) ->
+    gen_server:cast(?MODULE, {unsubscribe, PID}).
+
+%% ============================================================================
+%%  gen_server callbacks (unused)
+%% ============================================================================
+
+handle_call(_Request, _From, State) ->
+    ?METHOD_SHOULD_NOT_BE_USED(handle_call, State).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+%% ============================================================================
+%%  gen_server callbacks
+%% ============================================================================
+
+init([]) ->
+    ok = schedule_first_production(),
+    Consumers = ordsets:new(),
+    {ok, #state{consumers=Consumers}}.
+
+handle_cast({subscribe, PID}, #state{consumers=Consumers1}=State) ->
+    Consumers2 = ordsets:add_element(PID, Consumers1),
+    {noreply, State#state{consumers=Consumers2}};
+
+handle_cast({unsubscribe, PID}, #state{consumers=Consumers1}=State) ->
+    Consumers2 = ordsets:del_element(PID, Consumers1),
+    {noreply, State#state{consumers=Consumers2}}.
+
+handle_info(?SIGNAL_PRODUCTION, #state{consumers=ConsumersSet}=State) ->
+    ConsumersList = ordsets:to_list(ConsumersSet),
+    ok = collect_and_push_to_consumers(ConsumersList),
+    ok = schedule_next_production(),
+    {noreply, State}.
+
+%% ============================================================================
+%%  Private
+%% ============================================================================
+
+-spec schedule_first_production() ->
+    ok.
+schedule_first_production() ->
+    _ = self() ! ?SIGNAL_PRODUCTION,
+    ok.
+
+-spec schedule_next_production() ->
+    ok.
+schedule_next_production() ->
+    ProductionInterval = beam_stats_config:production_interval(),
+    _ = erlang:send_after(ProductionInterval, self(), ?SIGNAL_PRODUCTION),
+    ok.
+
+-spec collect_and_push_to_consumers([pid()]) ->
+    ok.
+collect_and_push_to_consumers(Consumers) ->
+    BEAMStats = beam_stats:collect(),
+    Push = fun (Consumer) -> gen_server:cast(Consumer, BEAMStats) end,
+    lists:foreach(Push, Consumers).
diff --git a/src/beam_stats_sup.erl b/src/beam_stats_sup.erl
new file mode 100644 (file)
index 0000000..7cfec55
--- /dev/null
@@ -0,0 +1,32 @@
+-module(beam_stats_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(Type, Module),
+    {Module, {Module, start_link, []}, permanent, 5000, Type, [Module]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+    Children =
+        [ ?CHILD(worker     , beam_stats_producer)
+        , ?CHILD(supervisor , beam_stats_sup_consumers)
+        ],
+    SupFlags = {one_for_one, 5, 10},
+    {ok, {SupFlags, Children}}.
diff --git a/src/beam_stats_sup_consumers.erl b/src/beam_stats_sup_consumers.erl
new file mode 100644 (file)
index 0000000..727cf3d
--- /dev/null
@@ -0,0 +1,41 @@
+-module(beam_stats_sup_consumers).
+
+-behaviour(supervisor).
+
+%% API
+-export(
+    [ start_link/0
+    , start_child/2
+    ]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(Type, Module, Args),
+    {make_ref(), {Module, start_link, Args}, permanent, 5000, Type, [Module]}).
+
+%% ===================================================================
+%% API
+%% ===================================================================
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_child(Module, Options) ->
+    Child = ?CHILD(worker, beam_stats_consumer, [Module, Options]),
+    supervisor:start_child(?MODULE, Child).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+    Consumers = beam_stats_config:consumers(),
+    ConsumerSpecToChild =
+        fun ({Module, Options}) ->
+            ?CHILD(worker, beam_stats_consumer, [Module, Options])
+        end,
+    Children = lists:map(ConsumerSpecToChild, Consumers),
+    SupFlags = {one_for_one, 5, 10},
+    {ok, {SupFlags, Children}}.
This page took 0.056664 seconds and 4 git commands to generate.