%% Test cases
-export(
- [ t_send/1
- , t_collect/1
+ [ t_full_cycle/1
]).
-define(GROUP, beam_stats_consumer_statsd).
groups() ->
Tests =
- [ t_send
- , t_collect
+ [ t_full_cycle
],
Properties = [],
[{?GROUP, Properties, Tests}].
%% Test cases
%% ============================================================================
-t_send(_Cfg) ->
- Pid0 = list_to_pid("<0.0.0>"),
- Pid1 = list_to_pid("<0.1.0>"),
- Pid2 = list_to_pid("<0.2.0>"),
- Pid3 = list_to_pid("<0.3.0>"),
- Process1 =
- #beam_stats_process
- { pid = Pid1
- , registered_name = {some, reg_name_foo}
- , ancestry =
- #beam_stats_process_ancestry
- { raw_initial_call = {foo_mod, foo_fun, 2}
- , otp_initial_call = none
- , otp_ancestors = none
- }
- , status = running
- , memory = 15
- , total_heap_size = 25
- , stack_size = 10
- , message_queue_len = 0
- },
- Process2 =
- #beam_stats_process
- { pid = Pid2
- , registered_name = none
- , ancestry =
- #beam_stats_process_ancestry
- { raw_initial_call = {bar_mod, bar_fun, 1}
- , otp_initial_call = none
- , otp_ancestors = none
- }
- , status = running
- , memory = 25
- , total_heap_size = 35
- , stack_size = 40
- , message_queue_len = 5
- },
- Process3 =
- #beam_stats_process
- { pid = Pid3
- , registered_name = none
- , ancestry =
- #beam_stats_process_ancestry
- { raw_initial_call = {baz_mod, baz_fun, 3}
- , otp_initial_call = {some, {baz_otp_mod, baz_otp_fun, 2}}
- , otp_ancestors = {some, [Pid0, Pid1]}
- }
- , status = running
- , memory = 25
- , total_heap_size = 35
- , stack_size = 40
- , message_queue_len = 1
- },
- Processes =
- #beam_stats_processes
- { individual_stats =
- [ Process1
- , Process2
- , Process3
- ]
- , count_all = 3
- , count_exiting = 0
- , count_garbage_collecting = 0
- , count_registered = 1
- , count_runnable = 0
- , count_running = 3
- , count_suspended = 0
- , count_waiting = 0
- },
- ETSTableStatsFoo =
- #beam_stats_ets_table
- { id = foo
- , name = foo
- , size = 5
- , memory = 25
- },
- ETSTableStatsBar =
- #beam_stats_ets_table
- { id = 37
- , name = bar
- , size = 8
- , memory = 38
- },
- % TODO: Indent #beam_stats as #beam_stats_ets_table
- BEAMStats = #beam_stats
- { timestamp = {1, 2, 3}
- , node_id = 'node_foo@host_bar'
- , memory = [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}]
- , io_bytes_in = 3
- , io_bytes_out = 7
- , context_switches = 5
- , reductions = 9
- , run_queue = 17
- , ets = [ETSTableStatsFoo, ETSTableStatsBar]
- , processes = Processes
- },
+t_full_cycle(_Cfg) ->
+ meck:new(beam_stats_source),
+ BEAMStatsExpected = meck_expect_beam_stats(),
+ BEAMStatsComputed = beam_stats_state:export(beam_stats_state:new()),
+ ct:log("BEAMStatsExpected: ~p~n", [BEAMStatsExpected]),
+ ct:log("BEAMStatsComputed: ~p~n", [BEAMStatsComputed]),
+ BEAMStatsExpected = BEAMStatsComputed,
+
+ {ok,[hope,beam_stats]} = application:ensure_all_started(beam_stats),
+ ct:log("beam_stats started~n"),
ServerPort = 8125,
{ok, ServerSocket} = gen_udp:open(ServerPort, [binary, {active, false}]),
- BEAMStatsQ = queue:in(BEAMStats, queue:new()),
- Options = [{dst_port, ServerPort}],
- {_, State1} = beam_stats_consumer_statsd:init(Options),
- State2 = beam_stats_consumer_statsd:consume(BEAMStatsQ, State1),
- {} = beam_stats_consumer_statsd:terminate(State2),
+ ct:log("UDP server started started~n"),
+ {ok, _} = beam_stats_consumer:add(beam_stats_consumer_statsd,
+ [ {consumption_interval , 60000}
+ , {dst_host , "localhost"}
+ , {dst_port , ServerPort}
+ , {src_port , 8124}
+ , {num_msgs_per_packet , 10}
+ ]
+ ),
+ ct:log("consumer added~n"),
+ _ = meck_expect_beam_stats(
+ % Double the original values, so that deltas will equal originals after
+ % 1 update of new beam_stats_state:t()
+ [ {io_bytes_in , 6}
+ , {io_bytes_out , 14}
+ , {context_switches , 10}
+ ]
+ ),
+ ct:log("meck_expect_beam_stats ok~n"),
+ {} = beam_stats_producer:sync_produce_consume(),
+ ct:log("produced and consumed~n"),
+ ok = application:stop(beam_stats),
+ ct:log("beam_stats stopped~n"),
+
ResultOfReceive1 = gen_udp:recv(ServerSocket, 0),
ResultOfReceive2 = gen_udp:recv(ServerSocket, 0),
ResultOfReceive3 = gen_udp:recv(ServerSocket, 0),
, <<"beam_stats.node_foo_host_bar.memory.mem_type_bar:2|g">>
, <<"beam_stats.node_foo_host_bar.memory.mem_type_baz:3|g">>
, <<"beam_stats.node_foo_host_bar.ets_table.size.foo.foo:5|g">>
- , <<"beam_stats.node_foo_host_bar.ets_table.memory.foo.foo:25|g">>
+ , <<"beam_stats.node_foo_host_bar.ets_table.memory.foo.foo:40|g">>
, <<"beam_stats.node_foo_host_bar.ets_table.size.bar.37:8|g">>
- , <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:38|g">>
+ , <<"beam_stats.node_foo_host_bar.ets_table.memory.bar.37:64|g">>
% Processes totals
, <<"beam_stats.node_foo_host_bar.processes_count_all:3|g">>
true = lists:member(Expected, Received),
Received -- [Expected]
end,
- [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected).
+ [] = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected),
+ meck:unload(beam_stats_source).
+
+meck_expect_beam_stats() ->
+ meck_expect_beam_stats([]).
-t_collect(_Cfg) ->
+meck_expect_beam_stats(Overrides) ->
+ IOBytesIn = hope_kv_list:get(Overrides, io_bytes_in , 3),
+ IOBytesOut = hope_kv_list:get(Overrides, io_bytes_out, 7),
+ ContextSwitches = hope_kv_list:get(Overrides, context_switches, 5),
Pid0 = list_to_pid("<0.0.0>"),
Pid1 = list_to_pid("<0.1.0>"),
Pid2 = list_to_pid("<0.2.0>"),
, size = 8
, memory = 64
},
- meck:new(beam_stats_source),
meck:expect(beam_stats_source, erlang_memory,
fun () -> [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}] end),
meck:expect(beam_stats_source, erlang_node,
meck:expect(beam_stats_source, erlang_registered,
fun () -> [reg_name_foo] end),
meck:expect(beam_stats_source, erlang_statistics,
- fun (io ) -> {{input, 3}, {output, 7}}
- ; (context_switches) -> {5, 0}
+ fun (io ) -> {{input, IOBytesIn}, {output, IOBytesOut}}
+ ; (context_switches) -> {ContextSwitches, 0}
; (reductions ) -> {0, 9} % 1st element is unused
; (run_queue ) -> 17
end
end
end
),
- BEAMStatsExpected =
- #beam_stats
- { timestamp = {1, 2, 3}
- , node_id = 'node_foo@host_bar'
- , memory = [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}]
- , io_bytes_in = 3
- , io_bytes_out = 7
- , context_switches = 5
- , reductions = 9
- , run_queue = 17
- , ets = [ETSTableStatsFoo, ETSTableStatsBar]
- , processes = Processes
- },
- BEAMStatsComputed = beam_stats_state:export(beam_stats_state:new()),
- ct:log("BEAMStatsExpected: ~p~n", [BEAMStatsExpected]),
- ct:log("BEAMStatsComputed: ~p~n", [BEAMStatsComputed]),
- BEAMStatsExpected = BEAMStatsComputed,
- meck:unload(beam_stats_source).
+ #beam_stats
+ { timestamp = {1, 2, 3}
+ , node_id = 'node_foo@host_bar'
+ , memory = [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}]
+ , io_bytes_in = IOBytesIn
+ , io_bytes_out = IOBytesOut
+ , context_switches = ContextSwitches
+ , reductions = 9
+ , run_queue = 17
+ , ets = [ETSTableStatsFoo, ETSTableStatsBar]
+ , processes = Processes
+ }.