Test the full produce-consume cycle for StatsD consumer.
authorSiraaj Khandkar <siraaj@khandkar.net>
Sun, 20 Sep 2015 00:37:30 +0000 (20:37 -0400)
committerSiraaj Khandkar <siraaj@khandkar.net>
Sun, 20 Sep 2015 00:37:30 +0000 (20:37 -0400)
test/beam_stats_consumer_statsd_SUITE.erl

index 16cb5fc..33d78c9 100644 (file)
@@ -13,8 +13,7 @@
 
 %% Test cases
 -export(
-    [ t_send/1
-    , t_collect/1
+    [ t_full_cycle/1
     ]).
 
 -define(GROUP, beam_stats_consumer_statsd).
@@ -28,8 +27,7 @@ all() ->
 
 groups() ->
     Tests =
-        [ t_send
-        , t_collect
+        [ t_full_cycle
         ],
     Properties = [],
     [{?GROUP, Properties, Tests}].
@@ -38,109 +36,42 @@ groups() ->
 %%  Test cases
 %% ============================================================================
 
-t_send(_Cfg) ->
-    Pid0 = list_to_pid("<0.0.0>"),
-    Pid1 = list_to_pid("<0.1.0>"),
-    Pid2 = list_to_pid("<0.2.0>"),
-    Pid3 = list_to_pid("<0.3.0>"),
-    Process1 =
-        #beam_stats_process
-        { pid               = Pid1
-        , registered_name   = {some, reg_name_foo}
-        , ancestry =
-              #beam_stats_process_ancestry
-              { raw_initial_call  = {foo_mod, foo_fun, 2}
-              , otp_initial_call  = none
-              , otp_ancestors     = none
-              }
-        , status            = running
-        , memory            = 15
-        , total_heap_size   = 25
-        , stack_size        = 10
-        , message_queue_len = 0
-        },
-    Process2 =
-        #beam_stats_process
-        { pid               = Pid2
-        , registered_name   = none
-        , ancestry =
-              #beam_stats_process_ancestry
-              { raw_initial_call  = {bar_mod, bar_fun, 1}
-              , otp_initial_call  = none
-              , otp_ancestors     = none
-              }
-        , status            = running
-        , memory            = 25
-        , total_heap_size   = 35
-        , stack_size        = 40
-        , message_queue_len = 5
-        },
-    Process3 =
-        #beam_stats_process
-        { pid               = Pid3
-        , registered_name   = none
-        , ancestry =
-              #beam_stats_process_ancestry
-              { raw_initial_call  = {baz_mod, baz_fun, 3}
-              , otp_initial_call  = {some, {baz_otp_mod, baz_otp_fun, 2}}
-              , otp_ancestors     = {some, [Pid0, Pid1]}
-              }
-        , status            = running
-        , memory            = 25
-        , total_heap_size   = 35
-        , stack_size        = 40
-        , message_queue_len = 1
-        },
-    Processes =
-        #beam_stats_processes
-        { individual_stats =
-            [ Process1
-            , Process2
-            , Process3
-            ]
-        , count_all                = 3
-        , count_exiting            = 0
-        , count_garbage_collecting = 0
-        , count_registered         = 1
-        , count_runnable           = 0
-        , count_running            = 3
-        , count_suspended          = 0
-        , count_waiting            = 0
-        },
-    ETSTableStatsFoo =
-        #beam_stats_ets_table
-        { id     = foo
-        , name   = foo
-        , size   = 5
-        , memory = 25
-        },
-    ETSTableStatsBar =
-        #beam_stats_ets_table
-        { id     = 37
-        , name   = bar
-        , size   = 8
-        , memory = 38
-        },
-    % TODO: Indent #beam_stats as #beam_stats_ets_table
-    BEAMStats = #beam_stats
-    { timestamp = {1, 2, 3}
-    , node_id   = 'node_foo@host_bar'
-    , memory    = [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}]
-    , io_bytes_in  = 3
-    , io_bytes_out = 7
-    , context_switches = 5
-    , reductions       = 9
-    , run_queue        = 17
-    , ets              = [ETSTableStatsFoo, ETSTableStatsBar]
-    , processes        = Processes
-    },
+t_full_cycle(_Cfg) ->
+    meck:new(beam_stats_source),
+    BEAMStatsExpected = meck_expect_beam_stats(),
+    BEAMStatsComputed = beam_stats_state:export(beam_stats_state:new()),
+    ct:log("BEAMStatsExpected: ~p~n", [BEAMStatsExpected]),
+    ct:log("BEAMStatsComputed: ~p~n", [BEAMStatsComputed]),
+    BEAMStatsExpected = BEAMStatsComputed,
+
+    {ok,[hope,beam_stats]} = application:ensure_all_started(beam_stats),
+    ct:log("beam_stats started~n"),
     ServerPort = 8125,
     {ok, ServerSocket} = gen_udp:open(ServerPort, [binary, {active, false}]),
-    BEAMStatsQ = queue:in(BEAMStats, queue:new()),
-    Options = [{dst_port, ServerPort}],
-    {_, State1} = beam_stats_consumer_statsd:init(Options),
-    State2 = beam_stats_consumer_statsd:consume(BEAMStatsQ, State1),
-    {} = beam_stats_consumer_statsd:terminate(State2),
+    ct:log("UDP server started started~n"),
+    {ok, _} = beam_stats_consumer:add(beam_stats_consumer_statsd,
+        [ {consumption_interval , 60000}
+        , {dst_host             , "localhost"}
+        , {dst_port             , ServerPort}
+        , {src_port             , 8124}
+        , {num_msgs_per_packet  , 10}
+        ]
+    ),
+    ct:log("consumer added~n"),
+    _ = meck_expect_beam_stats(
+        % Double the original values, so that deltas will equal originals after
+        % 1 update of new beam_stats_state:t()
+        [ {io_bytes_in      , 6}
+        , {io_bytes_out     , 14}
+        , {context_switches , 10}
+        ]
+    ),
+    ct:log("meck_expect_beam_stats ok~n"),
+    {} = beam_stats_producer:sync_produce_consume(),
+    ct:log("produced and consumed~n"),
+    ok = application:stop(beam_stats),
+    ct:log("beam_stats stopped~n"),
+
     ResultOfReceive1 = gen_udp:recv(ServerSocket, 0),
     ResultOfReceive2 = gen_udp:recv(ServerSocket, 0),
     ResultOfReceive3 = gen_udp:recv(ServerSocket, 0),
@@ -171,9 +102,9 @@ t_send(_Cfg) ->
         , <<"beam_stats.node_foo_host_bar.memory.mem_type_bar:2|g">>
         , <<"beam_stats.node_foo_host_bar.memory.mem_type_baz:3|g">>
         , <<"beam_stats.node_foo_host_bar.ets_table.size.foo.foo:5|g">>
-        , <<"beam_stats.node_foo_host_bar.ets_table.memory.foo.foo:25|g">>
+        , <<"beam_stats.node_foo_host_bar.ets_table.memory.foo.foo:40|g">>
         , <<"beam_stats.node_foo_host_bar.ets_table.size.bar.37:8|g">>
-        , <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:38|g">>
+        , <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:64|g">>
 
         % Processes totals
         , <<"beam_stats.node_foo_host_bar.processes_count_all:3|g">>
@@ -213,9 +144,16 @@ t_send(_Cfg) ->
             true = lists:member(Expected, Received),
             Received -- [Expected]
         end,
-    [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected).
+    [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected),
+    meck:unload(beam_stats_source).
+
+meck_expect_beam_stats() ->
+    meck_expect_beam_stats([]).
 
-t_collect(_Cfg) ->
+meck_expect_beam_stats(Overrides) ->
+    IOBytesIn       = hope_kv_list:get(Overrides, io_bytes_in , 3),
+    IOBytesOut      = hope_kv_list:get(Overrides, io_bytes_out, 7),
+    ContextSwitches = hope_kv_list:get(Overrides, context_switches, 5),
     Pid0 = list_to_pid("<0.0.0>"),
     Pid1 = list_to_pid("<0.1.0>"),
     Pid2 = list_to_pid("<0.2.0>"),
@@ -298,7 +236,6 @@ t_collect(_Cfg) ->
         , size   = 8
         , memory = 64
         },
-    meck:new(beam_stats_source),
     meck:expect(beam_stats_source, erlang_memory,
         fun () -> [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}] end),
     meck:expect(beam_stats_source, erlang_node,
@@ -306,8 +243,8 @@ t_collect(_Cfg) ->
     meck:expect(beam_stats_source, erlang_registered,
         fun () -> [reg_name_foo] end),
     meck:expect(beam_stats_source, erlang_statistics,
-        fun (io              ) -> {{input, 3}, {output, 7}}
-        ;   (context_switches) -> {5, 0}
+        fun (io              ) -> {{input, IOBytesIn}, {output, IOBytesOut}}
+        ;   (context_switches) -> {ContextSwitches, 0}
         ;   (reductions      ) -> {0, 9} % 1st element is unused
         ;   (run_queue       ) -> 17
         end
@@ -369,21 +306,15 @@ t_collect(_Cfg) ->
                 end
         end
     ),
-    BEAMStatsExpected =
-        #beam_stats
-        { timestamp = {1, 2, 3}
-        , node_id   = 'node_foo@host_bar'
-        , memory    = [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}]
-        , io_bytes_in  = 3
-        , io_bytes_out = 7
-        , context_switches = 5
-        , reductions       = 9
-        , run_queue        = 17
-        , ets              = [ETSTableStatsFoo, ETSTableStatsBar]
-        , processes        = Processes
-        },
-    BEAMStatsComputed = beam_stats_state:export(beam_stats_state:new()),
-    ct:log("BEAMStatsExpected: ~p~n", [BEAMStatsExpected]),
-    ct:log("BEAMStatsComputed: ~p~n", [BEAMStatsComputed]),
-    BEAMStatsExpected = BEAMStatsComputed,
-    meck:unload(beam_stats_source).
+    #beam_stats
+    { timestamp = {1, 2, 3}
+    , node_id   = 'node_foo@host_bar'
+    , memory    = [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}]
+    , io_bytes_in  = IOBytesIn
+    , io_bytes_out = IOBytesOut
+    , context_switches = ContextSwitches
+    , reductions       = 9
+    , run_queue        = 17
+    , ets              = [ETSTableStatsFoo, ETSTableStatsBar]
+    , processes        = Processes
+    }.
This page took 0.023091 seconds and 4 git commands to generate.