| 1 | -module(beam_stats_consumer_statsd_SUITE). |
| 2 | |
| 3 | -include_lib("beam_stats/include/beam_stats.hrl"). |
| 4 | -include_lib("beam_stats/include/beam_stats_ets_table.hrl"). |
| 5 | -include_lib("beam_stats/include/beam_stats_process.hrl"). |
| 6 | -include_lib("beam_stats/include/beam_stats_process_ancestry.hrl"). |
| 7 | -include_lib("beam_stats/include/beam_stats_processes.hrl"). |
| 8 | |
| 9 | -export( |
| 10 | [ all/0 |
| 11 | , groups/0 |
| 12 | ]). |
| 13 | |
| 14 | %% Test cases |
| 15 | -export( |
| 16 | [ t_full_cycle/1 |
| 17 | , t_deltas_gc/1 |
| 18 | ]). |
| 19 | |
| 20 | -define(GROUP, beam_stats_consumer_statsd). |
| 21 | |
| 22 | %% ============================================================================ |
| 23 | %% Common Test callbacks |
| 24 | %% ============================================================================ |
| 25 | |
| 26 | all() -> |
| 27 | [{group, ?GROUP}]. |
| 28 | |
| 29 | groups() -> |
| 30 | Tests = |
| 31 | [ t_full_cycle |
| 32 | , t_deltas_gc |
| 33 | ], |
| 34 | Properties = [], |
| 35 | [{?GROUP, Properties, Tests}]. |
| 36 | |
| 37 | %% ============================================================================ |
| 38 | %% Test cases |
| 39 | %% ============================================================================ |
| 40 | |
| 41 | t_deltas_gc(_Cfg) -> |
| 42 | Pid1 = list_to_pid("<0.101.0>"), |
| 43 | Pid2 = list_to_pid("<0.102.0>"), |
| 44 | Pid3 = list_to_pid("<0.103.0>"), |
| 45 | meck:new(beam_stats_source), |
| 46 | meck:expect(beam_stats_source, erlang_process_info, |
| 47 | fun (P, reductions) when P == Pid1 -> {reductions, 1} |
| 48 | ; (P, reductions) when P == Pid2 -> {reductions, 2} |
| 49 | ; (P, reductions) when P == Pid3 -> {reductions, 3} |
| 50 | end |
| 51 | ), |
| 52 | DeltasServer = beam_stats_delta:start(), |
| 53 | {some, 1} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid1), |
| 54 | {some, 2} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid2), |
| 55 | {some, 3} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid3), |
| 56 | {some, 0} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid1), |
| 57 | {some, 0} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid2), |
| 58 | {some, 0} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid3), |
| 59 | meck:expect(beam_stats_source, erlang_is_process_alive, fun (_) -> false end), |
| 60 | {} = beam_stats_delta:gc(DeltasServer), |
| 61 | {some, 1} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid1), |
| 62 | {some, 2} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid2), |
| 63 | {some, 3} = beam_stats_delta:of_process_info_reductions(DeltasServer, Pid3), |
| 64 | {} = beam_stats_delta:stop(DeltasServer), |
| 65 | meck:unload(beam_stats_source). |
| 66 | |
| 67 | t_full_cycle(_Cfg) -> |
| 68 | meck:new(beam_stats_source), |
| 69 | _BEAMStatsExpected = meck_expect_beam_stats(), |
| 70 | |
| 71 | {ok,[hope,beam_stats]} = application:ensure_all_started(beam_stats), |
| 72 | ct:log("beam_stats started~n"), |
| 73 | ServerPort = 8125, |
| 74 | {ok, ServerSocket} = gen_udp:open(ServerPort, [binary, {active, false}]), |
| 75 | ct:log("UDP server started started~n"), |
| 76 | {ok, _} = beam_stats_consumer:add(beam_stats_consumer_statsd, |
| 77 | [ {consumption_interval , 60000} |
| 78 | , {dst_host , "localhost"} |
| 79 | , {dst_port , ServerPort} |
| 80 | , {src_port , 8124} |
| 81 | , {num_msgs_per_packet , 10} |
| 82 | ] |
| 83 | ), |
| 84 | ct:log("consumer added~n"), |
| 85 | _ = meck_expect_beam_stats( |
| 86 | % Double the original values, so that deltas will equal originals after |
| 87 | % 1 update of new beam_stats_state:t() |
| 88 | [ {io_bytes_in , 6} |
| 89 | , {io_bytes_out , 14} |
| 90 | , {context_switches , 10} |
| 91 | , {reductions , 18} |
| 92 | ] |
| 93 | ), |
| 94 | ct:log("meck_expect_beam_stats ok~n"), |
| 95 | {} = beam_stats_producer:sync_produce_consume(), |
| 96 | ct:log("produced and consumed~n"), |
| 97 | ok = application:stop(beam_stats), |
| 98 | ct:log("beam_stats stopped~n"), |
| 99 | |
| 100 | ResultOfReceive1 = gen_udp:recv(ServerSocket, 0), |
| 101 | ResultOfReceive2 = gen_udp:recv(ServerSocket, 0), |
| 102 | ResultOfReceive3 = gen_udp:recv(ServerSocket, 0), |
| 103 | ResultOfReceive4 = gen_udp:recv(ServerSocket, 0), |
| 104 | ok = gen_udp:close(ServerSocket), |
| 105 | {ok, {_, _, PacketReceived1}} = ResultOfReceive1, |
| 106 | {ok, {_, _, PacketReceived2}} = ResultOfReceive2, |
| 107 | {ok, {_, _, PacketReceived3}} = ResultOfReceive3, |
| 108 | {ok, {_, _, PacketReceived4}} = ResultOfReceive4, |
| 109 | ct:log("PacketReceived1: ~n~s~n", [PacketReceived1]), |
| 110 | ct:log("PacketReceived2: ~n~s~n", [PacketReceived2]), |
| 111 | ct:log("PacketReceived3: ~n~s~n", [PacketReceived3]), |
| 112 | ct:log("PacketReceived4: ~n~s~n", [PacketReceived4]), |
| 113 | PacketsCombined = |
| 114 | << PacketReceived1/binary |
| 115 | , PacketReceived2/binary |
| 116 | , PacketReceived3/binary |
| 117 | , PacketReceived4/binary |
| 118 | >>, |
| 119 | ct:log("PacketsCombined: ~n~s~n", [PacketsCombined]), |
| 120 | MsgsExpected = |
| 121 | [ <<"beam_stats_v0.node_foo_host_bar.io.bytes_in:3|g">> |
| 122 | , <<"beam_stats_v0.node_foo_host_bar.io.bytes_out:7|g">> |
| 123 | , <<"beam_stats_v0.node_foo_host_bar.context_switches:5|g">> |
| 124 | , <<"beam_stats_v0.node_foo_host_bar.reductions:9|g">> |
| 125 | , <<"beam_stats_v0.node_foo_host_bar.run_queue:17|g">> |
| 126 | , <<"beam_stats_v0.node_foo_host_bar.memory.mem_type_foo:1|g">> |
| 127 | , <<"beam_stats_v0.node_foo_host_bar.memory.mem_type_bar:2|g">> |
| 128 | , <<"beam_stats_v0.node_foo_host_bar.memory.mem_type_baz:3|g">> |
| 129 | , <<"beam_stats_v0.node_foo_host_bar.ets_table.size.foo.NAMED:5|g">> |
| 130 | , <<"beam_stats_v0.node_foo_host_bar.ets_table.memory.foo.NAMED:40|g">> |
| 131 | , <<"beam_stats_v0.node_foo_host_bar.ets_table.size.bar.TID:16|g">> |
| 132 | , <<"beam_stats_v0.node_foo_host_bar.ets_table.memory.bar.TID:128|g">> |
| 133 | |
| 134 | % Processes totals |
| 135 | , <<"beam_stats_v0.node_foo_host_bar.processes_count_all:4|g">> |
| 136 | , <<"beam_stats_v0.node_foo_host_bar.processes_count_exiting:0|g">> |
| 137 | , <<"beam_stats_v0.node_foo_host_bar.processes_count_garbage_collecting:0|g">> |
| 138 | , <<"beam_stats_v0.node_foo_host_bar.processes_count_registered:1|g">> |
| 139 | , <<"beam_stats_v0.node_foo_host_bar.processes_count_runnable:0|g">> |
| 140 | , <<"beam_stats_v0.node_foo_host_bar.processes_count_running:3|g">> |
| 141 | , <<"beam_stats_v0.node_foo_host_bar.processes_count_suspended:0|g">> |
| 142 | , <<"beam_stats_v0.node_foo_host_bar.processes_count_waiting:1|g">> |
| 143 | |
| 144 | % Process 1 |
| 145 | , <<"beam_stats_v0.node_foo_host_bar.process_memory.named--reg_name_foo:15|g">> |
| 146 | , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.named--reg_name_foo:25|g">> |
| 147 | , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.named--reg_name_foo:10|g">> |
| 148 | , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.named--reg_name_foo:0|g">> |
| 149 | , <<"beam_stats_v0.node_foo_host_bar.process_reductions.named--reg_name_foo:0|g">> |
| 150 | |
| 151 | % Process 2 |
| 152 | , <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--bar_mod-bar_fun-1--NONE--NONE:25|g">> |
| 153 | , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:35|g">> |
| 154 | , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--bar_mod-bar_fun-1--NONE--NONE:40|g">> |
| 155 | , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--bar_mod-bar_fun-1--NONE--NONE:5|g">> |
| 156 | , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--bar_mod-bar_fun-1--NONE--NONE:0|g">> |
| 157 | |
| 158 | % Process 3 and 4, aggregated by origin |
| 159 | , <<"beam_stats_v0.node_foo_host_bar.process_memory.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:30|g">> |
| 160 | , <<"beam_stats_v0.node_foo_host_bar.process_total_heap_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:45|g">> |
| 161 | , <<"beam_stats_v0.node_foo_host_bar.process_stack_size.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:55|g">> |
| 162 | , <<"beam_stats_v0.node_foo_host_bar.process_message_queue_len.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:1|g">> |
| 163 | , <<"beam_stats_v0.node_foo_host_bar.process_reductions.spawned-via--baz_mod-baz_fun-3--baz_otp_mod-baz_otp_fun-2--PID-PID:0|g">> |
| 164 | ], |
| 165 | MsgsReceived = binary:split(PacketsCombined, <<"\n">>, [global, trim]), |
| 166 | RemoveExpectedFromReceived = |
| 167 | fun (Expected, Received) -> |
| 168 | ct:log( |
| 169 | "Looking for expected msg ~p in remaining received ~p~n", |
| 170 | [Expected, Received] |
| 171 | ), |
| 172 | true = lists:member(Expected, Received), |
| 173 | Received -- [Expected] |
| 174 | end, |
| 175 | MsgsRemaining = lists:foldl(RemoveExpectedFromReceived, MsgsReceived, MsgsExpected), |
| 176 | ct:log("MsgsRemaining: ~p", [MsgsRemaining]), |
| 177 | [] = MsgsRemaining, |
| 178 | meck:unload(beam_stats_source). |
| 179 | |
| 180 | meck_expect_beam_stats() -> |
| 181 | meck_expect_beam_stats([]). |
| 182 | |
| 183 | meck_expect_beam_stats(Overrides) -> |
| 184 | IOBytesIn = hope_kv_list:get(Overrides, io_bytes_in , 3), |
| 185 | IOBytesOut = hope_kv_list:get(Overrides, io_bytes_out, 7), |
| 186 | ContextSwitches = hope_kv_list:get(Overrides, context_switches, 5), |
| 187 | Reductions = hope_kv_list:get(Overrides, reductions, 9), |
| 188 | Pid0 = list_to_pid("<0.0.0>"), |
| 189 | Pid1 = list_to_pid("<0.1.0>"), |
| 190 | Pid2 = list_to_pid("<0.2.0>"), |
| 191 | Pid3 = list_to_pid("<0.3.0>"), |
| 192 | Pid4 = list_to_pid("<0.4.0>"), |
| 193 | Process1 = |
| 194 | #beam_stats_process |
| 195 | { pid = Pid1 |
| 196 | , registered_name = {some, reg_name_foo} |
| 197 | , ancestry = |
| 198 | #beam_stats_process_ancestry |
| 199 | { raw_initial_call = {foo_mod, foo_fun, 2} |
| 200 | , otp_initial_call = none |
| 201 | , otp_ancestors = none |
| 202 | } |
| 203 | , status = running |
| 204 | , memory = 15 |
| 205 | , total_heap_size = 25 |
| 206 | , stack_size = 10 |
| 207 | , message_queue_len = 0 |
| 208 | }, |
| 209 | Process2 = |
| 210 | #beam_stats_process |
| 211 | { pid = Pid2 |
| 212 | , registered_name = none |
| 213 | , ancestry = |
| 214 | #beam_stats_process_ancestry |
| 215 | { raw_initial_call = {bar_mod, bar_fun, 1} |
| 216 | , otp_initial_call = none |
| 217 | , otp_ancestors = none |
| 218 | } |
| 219 | , status = running |
| 220 | , memory = 25 |
| 221 | , total_heap_size = 35 |
| 222 | , stack_size = 40 |
| 223 | , message_queue_len = 5 |
| 224 | }, |
| 225 | Process3 = |
| 226 | #beam_stats_process |
| 227 | { pid = Pid3 |
| 228 | , registered_name = none |
| 229 | , ancestry = |
| 230 | #beam_stats_process_ancestry |
| 231 | { raw_initial_call = {baz_mod, baz_fun, 3} |
| 232 | , otp_initial_call = {some, {baz_otp_mod, baz_otp_fun, 2}} |
| 233 | , otp_ancestors = {some, [Pid0, Pid1]} |
| 234 | } |
| 235 | , status = running |
| 236 | , memory = 25 |
| 237 | , total_heap_size = 35 |
| 238 | , stack_size = 40 |
| 239 | , message_queue_len = 1 |
| 240 | }, |
| 241 | Process4 = |
| 242 | #beam_stats_process |
| 243 | { pid = Pid4 |
| 244 | , registered_name = none |
| 245 | , ancestry = |
| 246 | #beam_stats_process_ancestry |
| 247 | { raw_initial_call = {baz_mod, baz_fun, 3} |
| 248 | , otp_initial_call = {some, {baz_otp_mod, baz_otp_fun, 2}} |
| 249 | , otp_ancestors = {some, [Pid0, Pid1]} |
| 250 | } |
| 251 | , status = waiting |
| 252 | , memory = 5 |
| 253 | , total_heap_size = 10 |
| 254 | , stack_size = 15 |
| 255 | , message_queue_len = 0 |
| 256 | }, |
| 257 | Processes = |
| 258 | #beam_stats_processes |
| 259 | { individual_stats = |
| 260 | [ Process1 |
| 261 | , Process2 |
| 262 | , Process3 |
| 263 | , Process4 |
| 264 | ] |
| 265 | , count_all = 4 |
| 266 | , count_exiting = 0 |
| 267 | , count_garbage_collecting = 0 |
| 268 | , count_registered = 1 |
| 269 | , count_runnable = 0 |
| 270 | , count_running = 3 |
| 271 | , count_suspended = 0 |
| 272 | , count_waiting = 1 |
| 273 | }, |
| 274 | ETSTableStatsFoo = |
| 275 | #beam_stats_ets_table |
| 276 | { id = foo |
| 277 | , name = foo |
| 278 | , size = 5 |
| 279 | , memory = 40 |
| 280 | }, |
| 281 | ETSTableStatsBarA = |
| 282 | #beam_stats_ets_table |
| 283 | { id = 37 |
| 284 | , name = bar |
| 285 | , size = 8 |
| 286 | , memory = 64 |
| 287 | }, |
| 288 | ETSTableStatsBarB = |
| 289 | #beam_stats_ets_table |
| 290 | { id = 38 |
| 291 | , name = bar |
| 292 | , size = 8 |
| 293 | , memory = 64 |
| 294 | }, |
| 295 | meck:expect(beam_stats_source, erlang_is_process_alive, |
| 296 | fun (_) -> true end), |
| 297 | meck:expect(beam_stats_source, erlang_memory, |
| 298 | fun () -> [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}] end), |
| 299 | meck:expect(beam_stats_source, erlang_node, |
| 300 | fun () -> 'node_foo@host_bar' end), |
| 301 | meck:expect(beam_stats_source, erlang_registered, |
| 302 | fun () -> [reg_name_foo] end), |
| 303 | meck:expect(beam_stats_source, erlang_statistics, |
| 304 | fun (io ) -> {{input, IOBytesIn}, {output, IOBytesOut}} |
| 305 | ; (context_switches) -> {ContextSwitches, 0} |
| 306 | ; (reductions ) -> {Reductions, undefined} % 2nd element is unused |
| 307 | ; (run_queue ) -> 17 |
| 308 | end |
| 309 | ), |
| 310 | meck:expect(beam_stats_source, ets_all, |
| 311 | fun () -> [foo, 37, 38] end), |
| 312 | meck:expect(beam_stats_source, erlang_system_info, |
| 313 | fun (wordsize) -> 8 end), |
| 314 | meck:expect(beam_stats_source, ets_info, |
| 315 | fun (foo, memory) -> 5 |
| 316 | ; (foo, name ) -> foo |
| 317 | ; (foo, size ) -> 5 |
| 318 | |
| 319 | ; (37 , memory) -> 8 |
| 320 | ; (37 , name ) -> bar |
| 321 | ; (37 , size ) -> 8 |
| 322 | |
| 323 | ; (38 , memory) -> 8 |
| 324 | ; (38 , name ) -> bar |
| 325 | ; (38 , size ) -> 8 |
| 326 | end |
| 327 | ), |
| 328 | meck:expect(beam_stats_source, erlang_processes, |
| 329 | fun () -> [Pid1, Pid2, Pid3, Pid4] end), |
| 330 | meck:expect(beam_stats_source, os_timestamp, |
| 331 | fun () -> {1, 2, 3} end), |
| 332 | meck:expect(beam_stats_source, erlang_process_info, |
| 333 | fun (P, K) when P == Pid1 -> |
| 334 | case K |
| 335 | of dictionary -> {K, []} |
| 336 | ; initial_call -> {K, {foo_mod, foo_fun, 2}} |
| 337 | ; registered_name -> {K, reg_name_foo} |
| 338 | ; status -> {K, running} |
| 339 | ; memory -> {K, 15} |
| 340 | ; total_heap_size -> {K, 25} |
| 341 | ; stack_size -> {K, 10} |
| 342 | ; message_queue_len -> {K, 0} |
| 343 | ; reductions -> {K, 1} |
| 344 | end |
| 345 | ; (P, K) when P == Pid2 -> |
| 346 | case K |
| 347 | of dictionary -> {K, []} |
| 348 | ; initial_call -> {K, {bar_mod, bar_fun, 1}} |
| 349 | ; registered_name -> [] |
| 350 | ; status -> {K, running} |
| 351 | ; memory -> {K, 25} |
| 352 | ; total_heap_size -> {K, 35} |
| 353 | ; stack_size -> {K, 40} |
| 354 | ; message_queue_len -> {K, 5} |
| 355 | ; reductions -> {K, 2} |
| 356 | end |
| 357 | ; (P, K) when P == Pid3 -> |
| 358 | Dict = |
| 359 | [ {'$initial_call', {baz_otp_mod, baz_otp_fun, 2}} |
| 360 | , {'$ancestors' , [Pid0, Pid1]} |
| 361 | ], |
| 362 | case K |
| 363 | of dictionary -> {K, Dict} |
| 364 | ; initial_call -> {K, {baz_mod, baz_fun, 3}} |
| 365 | ; registered_name -> [] |
| 366 | ; status -> {K, running} |
| 367 | ; memory -> {K, 25} |
| 368 | ; total_heap_size -> {K, 35} |
| 369 | ; stack_size -> {K, 40} |
| 370 | ; message_queue_len -> {K, 1} |
| 371 | ; reductions -> {K, 3} |
| 372 | end |
| 373 | ; (P, K) when P == Pid4 -> |
| 374 | Dict = |
| 375 | [ {'$initial_call', {baz_otp_mod, baz_otp_fun, 2}} |
| 376 | , {'$ancestors' , [Pid0, Pid1]} |
| 377 | ], |
| 378 | case K |
| 379 | of dictionary -> {K, Dict} |
| 380 | ; initial_call -> {K, {baz_mod, baz_fun, 3}} |
| 381 | ; registered_name -> [] |
| 382 | ; status -> {K, waiting} |
| 383 | ; memory -> {K, 5} |
| 384 | ; total_heap_size -> {K, 10} |
| 385 | ; stack_size -> {K, 15} |
| 386 | ; message_queue_len -> {K, 0} |
| 387 | ; reductions -> {K, 4} |
| 388 | end |
| 389 | end |
| 390 | ), |
| 391 | #beam_stats |
| 392 | { timestamp = {1, 2, 3} |
| 393 | , node_id = 'node_foo@host_bar' |
| 394 | , memory = [{mem_type_foo, 1}, {mem_type_bar, 2}, {mem_type_baz, 3}] |
| 395 | , io_bytes_in = IOBytesIn |
| 396 | , io_bytes_out = IOBytesOut |
| 397 | , context_switches = ContextSwitches |
| 398 | , reductions = 9 |
| 399 | , run_queue = 17 |
| 400 | , ets = [ETSTableStatsFoo, ETSTableStatsBarA, ETSTableStatsBarB] |
| 401 | , processes = Processes |
| 402 | }. |