From f5d86423bccd933890cc693f529afa7fe19626d2 Mon Sep 17 00:00:00 2001 From: jacekwegr Date: Thu, 25 Jul 2024 16:02:45 +0200 Subject: [PATCH 1/8] Migrate `async_pools` to `mongoose_instrument` --- src/async_pools/mongoose_aggregator_worker.erl | 3 ++- src/async_pools/mongoose_async_pools.erl | 17 +++++++++++------ src/async_pools/mongoose_batch_worker.erl | 9 ++++++--- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/async_pools/mongoose_aggregator_worker.erl b/src/async_pools/mongoose_aggregator_worker.erl index a6fa07437f0..4c77b709264 100644 --- a/src/async_pools/mongoose_aggregator_worker.erl +++ b/src/async_pools/mongoose_aggregator_worker.erl @@ -229,7 +229,8 @@ make_async_request(Request, #state{host_type = HostType, pool_id = PoolId, drop -> State; ReqId -> - mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, async_request], 1), + mongoose_instrument:execute(async_pool_request, #{host_type => HostType, pool_id => PoolId}, + #{count => 1}), State#state{async_request = {ReqId, Request}} end. diff --git a/src/async_pools/mongoose_async_pools.erl b/src/async_pools/mongoose_async_pools.erl index 6808f5e5855..46b559cbc43 100644 --- a/src/async_pools/mongoose_async_pools.erl +++ b/src/async_pools/mongoose_async_pools.erl @@ -75,11 +75,13 @@ start_pool(HostType, PoolId, PoolOpts) -> start => {?MODULE, start_link, [HostType, PoolId, PoolOpts]}, restart => transient, type => supervisor}, + mongoose_instrument:set_up(instrumentation(HostType, PoolId)), ejabberd_sup:start_child(ChildSpec). -spec stop_pool(mongooseim:host_type(), pool_id()) -> ok. stop_pool(HostType, PoolId) -> ?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}), + mongoose_instrument:tear_down(instrumentation(HostType, PoolId)), ejabberd_sup:stop_child(sup_name(HostType, PoolId)). -spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name(). @@ -137,7 +139,7 @@ gen_pool_name(HostType, PoolId) -> -spec process_pool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> [wpool:option()]. process_pool_opts(HostType, PoolId, #{pool_size := NumWorkers} = Opts) -> - WorkerModule = select_worker_module(HostType, PoolId, Opts), + WorkerModule = select_worker_module(Opts), WorkerOpts = make_worker_opts(HostType, PoolId, Opts), Worker = {WorkerModule, WorkerOpts}, [{worker, Worker}, @@ -145,12 +147,9 @@ process_pool_opts(HostType, PoolId, #{pool_size := NumWorkers} = Opts) -> {worker_opt, [{spawn_opt, [{message_queue_data, off_heap}]}]}, {worker_shutdown, 10000}]. -select_worker_module(HostType, PoolId, #{pool_type := batch}) -> - mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, timed_flushes], counter), - mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, batch_flushes], counter), +select_worker_module(#{pool_type := batch}) -> mongoose_batch_worker; -select_worker_module(HostType, PoolId, #{pool_type := aggregate}) -> - mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, async_request], counter), +select_worker_module(#{pool_type := aggregate}) -> mongoose_aggregator_worker. -spec make_worker_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> map(). @@ -172,3 +171,9 @@ maybe_init_handler(HostType, PoolId, Opts = #{init_callback := InitFun}, Extra) Extra#{init_data => InitFun(HostType, PoolId, Opts)}; maybe_init_handler(_, _, _, Extra) -> Extra. + +instrumentation(HostType, PoolId) -> + [{async_pool_flush, #{pool_id => PoolId, host_type => HostType}, + #{metrics => #{timed => counter, batch => counter}}}, + {async_pool_request, #{pool_id => PoolId, host_type => HostType}, + #{metrics => #{count => counter}}}]. diff --git a/src/async_pools/mongoose_batch_worker.erl b/src/async_pools/mongoose_batch_worker.erl index 31180bc87b7..62d6ce4b822 100644 --- a/src/async_pools/mongoose_batch_worker.erl +++ b/src/async_pools/mongoose_batch_worker.erl @@ -61,7 +61,8 @@ init(#{host_type := HostType, -spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}. handle_call(sync, _From, State = #state{host_type = HostType, pool_id = PoolId, flush_queue = [_|_]}) -> - mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1), + mongoose_instrument:execute(async_pool_flush, #{host_type => HostType, pool_id => PoolId}, + #{timed => 1}), {reply, ok, run_flush(State)}; handle_call(sync, _From, State = #state{flush_queue = []}) -> {reply, skipped, State}; @@ -84,7 +85,8 @@ handle_cast(Msg, State) -> handle_info({timeout, TimerRef, flush}, State = #state{flush_interval_tref = TimerRef, host_type = HostType, pool_id = PoolId}) -> - mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1), + mongoose_instrument:execute(async_pool_flush, #{host_type => HostType, pool_id => PoolId}, + #{timed => 1}), {noreply, run_flush(State)}; handle_info({garbage_collect, asynchronous_gc_triggered, true}, State) -> {noreply, State}; @@ -153,7 +155,8 @@ maybe_run_flush(#state{host_type = HostType, case Length >= MaxSize of false -> State; true -> - mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, batch_flushes], 1), + mongoose_instrument:execute(async_pool_flush, #{host_type => HostType, pool_id => PoolId}, + #{batch => 1}), run_flush(State) end. From 07270d1751e5a7a4f437c4b5cdc1e1191c628022 Mon Sep 17 00:00:00 2001 From: jacekwegr Date: Thu, 25 Jul 2024 16:03:17 +0200 Subject: [PATCH 2/8] Test async pool request event --- big_tests/tests/inbox_SUITE.erl | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/big_tests/tests/inbox_SUITE.erl b/big_tests/tests/inbox_SUITE.erl index 0bba92ca988..6b16119ca49 100644 --- a/big_tests/tests/inbox_SUITE.erl +++ b/big_tests/tests/inbox_SUITE.erl @@ -159,10 +159,13 @@ suite() -> init_per_suite(Config) -> mongoose_helper:inject_module(?MODULE), + HostType = domain_helper:host_type(), + instrument_helper:start([{async_pool_request, #{host_type => HostType, pool_id => inbox}}]), escalus:init_per_suite(Config). end_per_suite(Config) -> - escalus:end_per_suite(Config). + escalus:end_per_suite(Config), + instrument_helper:stop(). init_per_group(GroupName, Config) when GroupName =:= regular; GroupName =:= async_pools -> HostType = domain_helper:host_type(), @@ -1348,6 +1351,7 @@ bin_is_not_included_by_default(Config) -> end). rest_api_bin_flush_user(Config) -> + TS = instrument_helper:timestamp(), escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> create_room_and_make_users_leave(Alice, Bob, Kate), %% It is not in his bin anymore after triggering a bin flush @@ -1357,7 +1361,8 @@ rest_api_bin_flush_user(Config) -> {{<<"200">>, <<"OK">>}, NumOfRows} = rest_helper:delete(admin, Path), ?assertEqual(1, NumOfRows), check_inbox(Bob, [], #{box => bin}) - end). + end), + assert_async_request_event(TS, 10). rest_api_bin_flush_user_errors(Config) -> Config1 = escalus_fresh:create_users(Config, [{alice, 1}]), @@ -1373,6 +1378,7 @@ rest_api_bin_flush_user_errors(Config) -> rest_helper:delete(admin, <<"/inbox/", Domain/binary, "/baduser/0/bin">>). rest_api_bin_flush_all(Config) -> + TS = instrument_helper:timestamp(), escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> create_room_and_make_users_leave(Alice, Bob, Kate), %% It is not in any bin anymore after triggering a bin flush @@ -1382,7 +1388,8 @@ rest_api_bin_flush_all(Config) -> ?assertEqual(2, NumOfRows), check_inbox(Bob, [], #{box => bin}), check_inbox(Kate, [], #{box => bin}) - end). + end), + assert_async_request_event(TS, 10). rest_api_bin_flush_all_errors(_Config) -> HostTypePath = uri_string:normalize(#{path => domain_helper:host_type()}), @@ -1392,14 +1399,17 @@ rest_api_bin_flush_all_errors(_Config) -> rest_helper:delete(admin, <<"/inbox/bad_host_type/0/bin">>). timeout_cleaner_flush_all(Config) -> + TS = instrument_helper:timestamp(), escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> create_room_and_make_users_leave(Alice, Bob, Kate), %% It is eventually not in any bin thanks to the periodic cleanouts check_inbox(Bob, [], #{box => bin}), check_inbox(Kate, [], #{box => bin}) - end). + end), + assert_async_request_event(TS, 10). xmpp_bin_flush(Config) -> + TS = instrument_helper:timestamp(), escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> create_room_and_make_users_leave(Alice, Bob, Kate), %% It is eventually not in any bin thanks to the periodic cleanouts @@ -1411,7 +1421,8 @@ xmpp_bin_flush(Config) -> escalus:send(Bob, Iq), escalus:assert(is_iq_result, [Iq], escalus:wait_for_stanza(Bob)), check_inbox(Bob, [], #{box => bin}) - end). + end), + assert_async_request_event(TS, 10). %% helpers @@ -1463,3 +1474,9 @@ verify_hook_listener(RoomName) -> after 100 -> ct:pal("OK") end. + +assert_async_request_event(TS, ExpectedCount) -> + instrument_helper:assert(async_pool_request, + #{host_type => domain_helper:host_type(), pool_id => inbox}, + fun(#{count := 1}) -> true end, + #{min_timestamp => TS, expected_count => ExpectedCount}). From 87a21889ff5a494ff58f90bd3952442cf8f25954 Mon Sep 17 00:00:00 2001 From: jacekwegr Date: Thu, 25 Jul 2024 16:03:45 +0200 Subject: [PATCH 3/8] Test async pool flush event --- big_tests/tests/mam_SUITE.erl | 60 ++++++++++++++++++++++++++++++---- big_tests/tests/mam_helper.erl | 19 +++++++++++ 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/big_tests/tests/mam_SUITE.erl b/big_tests/tests/mam_SUITE.erl index ac1550cea0c..d6160bb827c 100644 --- a/big_tests/tests/mam_SUITE.erl +++ b/big_tests/tests/mam_SUITE.erl @@ -113,6 +113,8 @@ assert_archive_message_event/2, assert_lookup_event/2, assert_flushed_event_if_async/2, + assert_async_batch_flush_event/3, + assert_async_timed_flush_event/4, assert_dropped_iq_event/2, assert_event_with_jid/2, assert_no_event_with_jid/2 @@ -430,7 +432,8 @@ muc_light_cases() -> muc_light_chat_markers_are_archived_if_enabled, muc_light_chat_markers_are_not_archived_if_disabled, muc_light_failed_to_decode_message_in_database, - muc_light_sql_query_failed + muc_light_sql_query_failed, + muc_light_async_pools_batch_flush ]. muc_rsm_cases() -> @@ -503,13 +506,18 @@ muc_prefs_cases() -> impl_specific() -> [check_user_exist, pm_failed_to_decode_message_in_database, - pm_sql_query_failed]. + pm_sql_query_failed, + async_pools_batch_flush]. suite() -> require_rpc_nodes([mim]) ++ escalus:suite(). init_per_suite(Config) -> - instrument_helper:start(instrument_helper:declared_events(instrumented_modules())), + PoolIds = [pm_mam, muc_mam], + AsyncPoolsEvents = [{async_pool_flush, #{host_type => host_type(), pool_id => PoolId}} + || PoolId <- PoolIds], + instrument_helper:start( + instrument_helper:declared_events(instrumented_modules()) ++ AsyncPoolsEvents), muc_helper:load_muc(), mongoose_helper:inject_module(mim(), ?MODULE, reload), mam_helper:prepare_for_suite( @@ -568,7 +576,6 @@ init_per_group(rsm04_comp, Config) -> [{props, mam04_props()}|Config]; init_per_group(with_rsm04, Config) -> [{props, mam04_props()}, {with_rsm, true}|Config]; - init_per_group(nostore, Config) -> Config; init_per_group(archived, Config) -> @@ -801,6 +808,10 @@ maybe_skip(C, Config) when C =:= easy_text_search_request; C =:= muc_text_search_request -> skip_if(?config(configuration, Config) =:= cassandra, "full text search is not implemented for cassandra backend"); +maybe_skip(C, Config) when C =:= muc_light_async_pools_batch_flush; + C =:= async_pools_batch_flush -> + skip_if(?config(configuration, Config) =/= rdbms_async_pool, + "only for async pool"); maybe_skip(_C, _Config) -> ok. @@ -905,6 +916,11 @@ required_modules(CaseName, Config) when CaseName =:= muc_light_service_discovery Opts = #{pm := PM} = ?config(mam_meta_opts, Config), NewOpts = Opts#{pm := PM#{archive_groupchats => true}}, [{mod_mam, NewOpts}]; +required_modules(CaseName, Config) when CaseName =:= async_pools_batch_flush; + CaseName =:= muc_light_async_pools_batch_flush -> + Opts = #{async_writer := Async} = ?config(mam_meta_opts, Config), + NewOpts = Opts#{async_writer := Async#{batch_size => 3, flush_interval => 5000}}, + [{mod_mam, NewOpts}]; required_modules(muc_light_chat_markers_are_archived_if_enabled, Config) -> Opts = #{muc := MUC} = ?config(mam_meta_opts, Config), NewOpts = Opts#{muc := MUC#{archive_chat_markers => true}}, @@ -1705,6 +1721,7 @@ muc_light_easy(Config) -> end). muc_light_shouldnt_modify_pm_archive(Config) -> + TS = instrument_helper:timestamp(), escalus:story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) -> Room = muc_helper:fresh_room_name(), given_muc_light_room(Room, Alice, [{Bob, member}]), @@ -1731,7 +1748,9 @@ muc_light_shouldnt_modify_pm_archive(Config) -> then_archive_response_is(Alice, [{message, Alice, <<"private hi!">>}], Config), when_archive_query_is_sent(Bob, undefined, Config), then_archive_response_is(Bob, [{message, Alice, <<"private hi!">>}], Config) - end). + end), + assert_async_timed_flush_event(Config, TS, 2, pm_mam), + assert_async_timed_flush_event(Config, TS, 2, muc_mam). muc_light_stored_in_pm_if_allowed_to(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) -> @@ -1911,6 +1930,23 @@ muc_light_sql_query_failed(Config) -> escalus:assert(is_error, [<<"wait">>, <<"internal-server-error">>], Error) end). +muc_light_async_pools_batch_flush(Config) -> + TS = instrument_helper:timestamp(), + escalus:story(Config, [{alice, 1}], fun(Alice) -> + Room = muc_helper:fresh_room_name(), + given_muc_light_room(Room, Alice, []), + + M1 = when_muc_light_message_is_sent(Alice, Room,<<"Msg 1">>, <<"Id1">>), + then_muc_light_message_is_received_by([Alice], M1), + + M2 = when_muc_light_message_is_sent(Alice, Room, <<"Msg 2">>, <<"Id2">>), + then_muc_light_message_is_received_by([Alice], M2), + + M3 = when_muc_light_message_is_sent(Alice, Room, <<"Msg 3">>, <<"Id3">>), + then_muc_light_message_is_received_by([Alice], M3) + end), + assert_async_batch_flush_event(TS, 1, muc_mam). + pm_failed_to_decode_message_in_database(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) -> escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi">>)), @@ -1932,6 +1968,16 @@ pm_sql_query_failed(Config) -> escalus:assert(is_error, [<<"wait">>, <<"internal-server-error">>], Error) end). +async_pools_batch_flush(Config) -> + TS = instrument_helper:timestamp(), + escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) -> + escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Msg 1">>)), + escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Msg 2">>)), + escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Msg 3">>)), + mam_helper:wait_for_archive_size(Alice, 3) + end), + assert_async_batch_flush_event(TS, 2, pm_mam). + retrieve_form_fields(ConfigIn) -> escalus_fresh:story(ConfigIn, [{alice, 1}], fun(Alice) -> P = ?config(props, ConfigIn), @@ -3449,7 +3495,7 @@ muc_prefs_set_request(ConfigIn) -> %% %% %% - + Room = ?config(room, Config), RoomAddr = room_address(Room), escalus:send(Alice, stanza_to_room(stanza_prefs_set_request(<<"roster">>, @@ -3666,7 +3712,7 @@ muc_run_prefs_cases(DefaultPolicy, ConfigIn) -> maybe_wait_for_archive(Config), %% Get ALL messages using several queries if required - + Room = ?config(room, Config), IQ = stanza_archive_request([{mam_ns, <<"urn:xmpp:mam:1">>}], <<>>), escalus:send(Alice, stanza_to_room(IQ, Room)), diff --git a/big_tests/tests/mam_helper.erl b/big_tests/tests/mam_helper.erl index cd2ca1e4de9..b0b4f46e29f 100644 --- a/big_tests/tests/mam_helper.erl +++ b/big_tests/tests/mam_helper.erl @@ -1453,6 +1453,25 @@ assert_flushed_event_if_async(EventName, Config) -> ok end. +assert_async_batch_flush_event(TS, ExpectedCount, PoolId) -> + instrument_helper:assert( + async_pool_flush, + #{host_type => domain_helper:host_type(), pool_id => PoolId}, + fun(#{batch := 1}) -> true end, + #{min_timestamp => TS, expected_count => ExpectedCount}). + +assert_async_timed_flush_event(Config, TS, ExpectedCount, PoolId) -> + case ?config(configuration, Config) of + rdbms_async_pool -> + instrument_helper:assert( + async_pool_flush, + #{host_type => domain_helper:host_type(), pool_id => PoolId}, + fun(#{timed := 1}) -> true end, + #{min_timestamp => TS, expected_count => ExpectedCount}); + _ -> + ok + end. + assert_dropped_iq_event(Config, BinJid) -> EventName = case ?config(room, Config) of undefined -> mod_mam_pm_dropped_iq; From 60e1b6febacd220e3fd90ebe8d3ea1fcdd318215 Mon Sep 17 00:00:00 2001 From: jacekwegr Date: Thu, 25 Jul 2024 16:04:09 +0200 Subject: [PATCH 4/8] Update small tests for async pools --- big_tests/tests/inbox_SUITE.erl | 9 +--- big_tests/tests/inbox_helper.erl | 9 +++- src/instrument/mongoose_instrument.erl | 3 +- test/batches_SUITE.erl | 68 +++++++------------------- 4 files changed, 30 insertions(+), 59 deletions(-) diff --git a/big_tests/tests/inbox_SUITE.erl b/big_tests/tests/inbox_SUITE.erl index 6b16119ca49..a0570115ba2 100644 --- a/big_tests/tests/inbox_SUITE.erl +++ b/big_tests/tests/inbox_SUITE.erl @@ -22,7 +22,8 @@ given_conversations_between/2, assert_invalid_inbox_form_value_error/3, assert_invalid_reset_inbox/4, - extract_user_specs/1 + extract_user_specs/1, + assert_async_request_event/2 ]). -define(ROOM3, <<"testroom3">>). @@ -1474,9 +1475,3 @@ verify_hook_listener(RoomName) -> after 100 -> ct:pal("OK") end. - -assert_async_request_event(TS, ExpectedCount) -> - instrument_helper:assert(async_pool_request, - #{host_type => domain_helper:host_type(), pool_id => inbox}, - fun(#{count := 1}) -> true end, - #{min_timestamp => TS, expected_count => ExpectedCount}). diff --git a/big_tests/tests/inbox_helper.erl b/big_tests/tests/inbox_helper.erl index 3d4d642bd8a..998d1afc372 100644 --- a/big_tests/tests/inbox_helper.erl +++ b/big_tests/tests/inbox_helper.erl @@ -72,7 +72,8 @@ muc_domain/0, domain/0, to_bare_lower/1, - extract_user_specs/1 + extract_user_specs/1, + assert_async_request_event/2 ]). -import(muc_helper, [foreach_recipient/2]). @@ -867,6 +868,12 @@ assert_message_content(Msg, Field, Value) -> ?assertNotEqual(nomatch, binary:match(Msg, Field)), ?assertNotEqual(nomatch, binary:match(Msg, Value)). +assert_async_request_event(TS, ExpectedCount) -> + instrument_helper:assert(async_pool_request, + #{host_type => domain_helper:host_type(), pool_id => inbox}, + fun(#{count := 1}) -> true end, + #{min_timestamp => TS, expected_count => ExpectedCount}). + %% TODO: properly extract the specs from Bob extract_user_specs(User) -> {client,_,_,_,_,UserSpecs} = User, diff --git a/src/instrument/mongoose_instrument.erl b/src/instrument/mongoose_instrument.erl index 0eecdc27d7d..c1b717c0a16 100644 --- a/src/instrument/mongoose_instrument.erl +++ b/src/instrument/mongoose_instrument.erl @@ -25,8 +25,9 @@ -type labels() :: #{host_type => mongooseim:host_type(), function => atom(), cache_name => atom(), + pool_id => atom(), pool_tag => mongoose_wpool:tag()}. % to be extended --type label_key() :: host_type | function | cache_name | pool_tag. % to be extended +-type label_key() :: host_type | function | cache_name | pool_id | pool_tag. % to be extended -type label_value() :: mongooseim:host_type() | atom() | mongoose_wpool:tag(). % to be extended -type metrics() :: #{metric_name() => metric_type()}. -type metric_name() :: atom(). diff --git a/test/batches_SUITE.erl b/test/batches_SUITE.erl index 33465a09727..5c412e77ee4 100644 --- a/test/batches_SUITE.erl +++ b/test/batches_SUITE.erl @@ -25,8 +25,6 @@ groups() -> [ broadcast_reaches_all_workers, broadcast_reaches_all_keys, - filled_batch_raises_batch_metric, - unfilled_batch_raises_flush_metric, timeouts_and_canceled_timers_do_not_need_to_log_messages, prepare_task_works, sync_flushes_down_everything, @@ -43,7 +41,6 @@ groups() -> ]. init_per_suite(Config) -> - meck:new(mongoose_metrics, [stub_all, no_link]), mongoose_config:set_opts(opts()), async_helper:start(Config, mongoose_instrument, start_link, []). @@ -124,7 +121,7 @@ aggregation_might_produce_noop_requests(_) -> {ok, Server} = gen_server:start_link(?MODULE, [], []), Requestor = fun(1, _) -> timer:sleep(1), gen_server:send_request(Server, 1); (_, _) -> drop end, - Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME, + Opts = (default_aggregator_opts(Server))#{pool_id => test_pool, request_callback => Requestor}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], @@ -135,8 +132,8 @@ broadcast_reaches_all_workers(_) -> {ok, Server} = gen_server:start_link(?MODULE, [], []), WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate, pool_size => 10}, - {ok, _} = mongoose_async_pools:start_pool(host_type(), ?FUNCTION_NAME, WPoolOpts), - mongoose_async_pools:broadcast_task(host_type(), ?FUNCTION_NAME, key, 1), + {ok, _} = mongoose_async_pools:start_pool(host_type(), first_pool, WPoolOpts), + mongoose_async_pools:broadcast_task(host_type(), first_pool, key, 1), async_helper:wait_until( fun() -> gen_server:call(Server, get_acc) end, 10). @@ -156,45 +153,19 @@ broadcast_reaches_all_keys(_) -> WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate, pool_size => 3, request_callback => Req}, - {ok, _} = mongoose_async_pools:start_pool(HostType, ?FUNCTION_NAME, WPoolOpts), - [ mongoose_async_pools:put_task(HostType, ?FUNCTION_NAME, N, 1) || N <- lists:seq(0, 1000) ], - mongoose_async_pools:broadcast(HostType, ?FUNCTION_NAME, -1), + {ok, _} = mongoose_async_pools:start_pool(HostType, test_pool, WPoolOpts), + [ mongoose_async_pools:put_task(HostType, test_pool, N, 1) || N <- lists:seq(0, 1000) ], + mongoose_async_pools:broadcast(HostType, test_pool, -1), ets:insert(Tid, {continue, true}), async_helper:wait_until( fun() -> gen_server:call(Server, get_acc) end, 0). -filled_batch_raises_batch_metric(_) -> - Opts = #{host_type => host_type(), - pool_id => ?FUNCTION_NAME, - batch_size => 1, - flush_interval => 1000, - flush_callback => fun(_, _) -> ok end, - flush_extra => #{host_type => host_type(), queue_length => 0}}, - {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), - gen_server:cast(Pid, {task, key, ok}), - MetricName = [mongoose_async_pools, '_', batch_flushes], - async_helper:wait_until( - fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true). - -unfilled_batch_raises_flush_metric(_) -> - Opts = #{host_type => host_type(), - pool_id => ?FUNCTION_NAME, - batch_size => 1000, - flush_interval => 5, - flush_callback => fun(_, _) -> ok end, - flush_extra => #{host_type => host_type(), queue_length => 0}}, - {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), - gen_server:cast(Pid, {task, key, ok}), - MetricName = [mongoose_async_pools, '_', timed_flushes], - async_helper:wait_until( - fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true). - timeouts_and_canceled_timers_do_not_need_to_log_messages(_) -> Timeout = 10, QueueSize = 2, meck:new(logger, [passthrough, unstick]), Opts = #{host_type => host_type(), - pool_id => ?FUNCTION_NAME, + pool_id => test_pool, batch_size => QueueSize, flush_interval => Timeout, flush_callback => fun(_, _) -> ok end, @@ -203,14 +174,13 @@ timeouts_and_canceled_timers_do_not_need_to_log_messages(_) -> [ gen_server:cast(Pid, {task, ok}) || _ <- lists:seq(1, QueueSize) ], ct:sleep(Timeout*2), ?assertEqual(0, meck:num_calls(logger, macro_log, '_')). - prepare_task_works(_) -> Timeout = 1000, QueueSize = 2, T = self(), meck:new(logger, [passthrough, unstick]), Opts = #{host_type => host_type(), - pool_id => ?FUNCTION_NAME, + pool_id => test_pool, batch_size => QueueSize, flush_interval => Timeout, prep_callback => fun(0, _) -> {error, bad}; @@ -230,7 +200,7 @@ prepare_task_works(_) -> sync_flushes_down_everything(_) -> Opts = #{host_type => host_type(), - pool_id => ?FUNCTION_NAME, + pool_id => test_pool, batch_size => 5000, flush_interval => 5000, flush_callback => fun(_, _) -> ok end, @@ -238,13 +208,11 @@ sync_flushes_down_everything(_) -> {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), ?assertEqual(skipped, gen_server:call(Pid, sync)), gen_server:cast(Pid, {task, key, ok}), - ?assertEqual(ok, gen_server:call(Pid, sync)), - MetricName = [mongoose_async_pools, '_', timed_flushes], - ?assert(0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_'])). + ?assertEqual(ok, gen_server:call(Pid, sync)). sync_aggregates_down_everything(_) -> {ok, Server} = gen_server:start_link(?MODULE, [], []), - Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME}, + Opts = (default_aggregator_opts(Server))#{pool_id => test_pool}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), ?assertEqual(skipped, gen_server:call(Pid, sync)), [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], @@ -254,7 +222,7 @@ sync_aggregates_down_everything(_) -> aggregating_error_is_handled_and_can_continue(_) -> {ok, Server} = gen_server:start_link(?MODULE, [], []), Requestor = fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end, - Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME, + Opts = (default_aggregator_opts(Server))#{pool_id => test_pool, request_callback => Requestor}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 10) ], @@ -268,14 +236,14 @@ aggregating_error_is_handled_and_can_continue(_) -> async_request(_) -> {ok, Server} = gen_server:start_link(?MODULE, [], []), - Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME}, + Opts = (default_aggregator_opts(Server))#{pool_id => test_pool}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], async_helper:wait_until( fun() -> gen_server:call(Server, get_acc) end, 500500). retry_request(_) -> - Opts = (retry_aggregator_opts())#{pool_id => retry_request}, + Opts = (retry_aggregator_opts())#{pool_id => test_pool}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), gen_server:cast(Pid, {task, key, 1}), receive_task_called(0), @@ -286,7 +254,7 @@ retry_request(_) -> ensure_no_tasks_to_receive(). retry_request_cancelled(_) -> - Opts = (retry_aggregator_opts())#{pool_id => retry_request_cancelled, + Opts = (retry_aggregator_opts())#{pool_id => test_pool, request_callback => fun do_cancel_request/2}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), gen_server:cast(Pid, {task, key, 1}), @@ -301,7 +269,7 @@ retry_request_cancelled(_) -> receive_task_called(0). retry_request_cancelled_in_verify_function(_) -> - Opts = (retry_aggregator_opts())#{pool_id => retry_request_cancelled_in_verify_function, + Opts = (retry_aggregator_opts())#{pool_id => test_pool, request_callback => fun do_request/2, verify_callback => fun validate_all_fails/3}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), @@ -317,7 +285,7 @@ retry_request_cancelled_in_verify_function(_) -> receive_task_called(0). ignore_msg_when_waiting_for_reply(_) -> - Opts = (retry_aggregator_opts())#{pool_id => ignore_msg_when_waiting_for_reply, + Opts = (retry_aggregator_opts())#{pool_id => test_pool, request_callback => fun do_request_but_ignore_other_messages/2}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), gen_server:cast(Pid, {task, key, 1}), @@ -330,7 +298,7 @@ async_request_fails(_) -> %% Does request that crashes the gen_server, but not the aggregator {ok, Server} = gen_server:start({local, async_req_fails_server}, ?MODULE, [], []), Ref = monitor(process, Server), - Opts = (default_aggregator_opts(async_req_fails_server))#{pool_id => ?FUNCTION_NAME}, + Opts = (default_aggregator_opts(async_req_fails_server))#{pool_id => test_pool}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), gen_server:cast(Pid, {task, key, {ack_and_die, self()}}), %% Acked and the server dies From bd46278b2f58cdb3b29a732f04cf06e7128308e3 Mon Sep 17 00:00:00 2001 From: jacekwegr Date: Thu, 25 Jul 2024 16:52:50 +0200 Subject: [PATCH 5/8] Remove unused `mongoose_metrics` functions --- src/metrics/mongoose_metrics.erl | 40 -------------------------------- 1 file changed, 40 deletions(-) diff --git a/src/metrics/mongoose_metrics.erl b/src/metrics/mongoose_metrics.erl index 803094b55c0..20e18b69c54 100644 --- a/src/metrics/mongoose_metrics.erl +++ b/src/metrics/mongoose_metrics.erl @@ -19,8 +19,6 @@ %% API -export([init/0, - update/3, - ensure_metric/3, get_metric_value/1, get_metric_values/1, get_metric_value/2, @@ -36,10 +34,6 @@ -define(PREFIXES, mongoose_metrics_prefixes). --type metric_name() :: atom() | list(atom() | binary()). --type short_metric_type() :: spiral | histogram | counter | gauge. --type metric_type() :: tuple() | short_metric_type(). - %% --------------------------------------------------------------------- %% API %% --------------------------------------------------------------------- @@ -48,20 +42,6 @@ init() -> prepare_prefixes(). --spec update(HostType :: mongooseim:host_type_or_global(), Name :: term() | list(), - Change :: term()) -> any(). -update(HostType, Name, Change) when is_list(Name) -> - exometer:update(name_by_all_metrics_are_global(HostType, Name), Change); -update(HostType, Name, Change) -> - update(HostType, [Name], Change). - --spec ensure_metric(mongooseim:host_type_or_global(), metric_name(), metric_type()) -> - ok | {ok, already_present} | {error, any()}. -ensure_metric(HostType, Metric, Type) when is_tuple(Type) -> - ensure_metric(HostType, Metric, Type, element(1, Type)); -ensure_metric(HostType, Metric, Type) -> - ensure_metric(HostType, Metric, Type, Type). - get_metric_value(HostType, Name) when is_list(Name) -> get_metric_value(name_by_all_metrics_are_global(HostType, Name)); get_metric_value(HostType, Name) -> @@ -142,23 +122,3 @@ name_by_all_metrics_are_global(HostType, Name) -> remove_metric({Name, _, _}) -> exometer_admin:delete_entry(Name). - -ensure_metric(HostType, Metric, Type, ShortType) when is_atom(Metric) -> - ensure_metric(HostType, [Metric], Type, ShortType); - -ensure_metric(HostType, Metric, Type, ShortType) when is_list(Metric) -> - %% the split into ShortType and Type is needed because function metrics are - %% defined as tuples (that is Type), while exometer:info returns only 'function' - PrefixedMetric = name_by_all_metrics_are_global(HostType, Metric), - case exometer:info(PrefixedMetric, type) of - undefined -> - do_create_metric(PrefixedMetric, Type, []); - ShortType -> {ok, already_present} - end. - -do_create_metric(PrefixedMetric, ExometerType, ExometerOpts) -> - case catch exometer:new(PrefixedMetric, ExometerType, ExometerOpts) of - {'EXIT', {exists, _}} -> {ok, already_present}; - ok -> ok; - {'EXIT', Error} -> {error, Error} - end. From 11f0193dedc8aa600cd7e4ebd018187632c6d41a Mon Sep 17 00:00:00 2001 From: jacekwegr Date: Fri, 26 Jul 2024 12:48:44 +0200 Subject: [PATCH 6/8] Change `async_pools` metric type --- src/async_pools/mongoose_async_pools.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/async_pools/mongoose_async_pools.erl b/src/async_pools/mongoose_async_pools.erl index 46b559cbc43..8d7aa3fbc3b 100644 --- a/src/async_pools/mongoose_async_pools.erl +++ b/src/async_pools/mongoose_async_pools.erl @@ -174,6 +174,6 @@ maybe_init_handler(_, _, _, Extra) -> instrumentation(HostType, PoolId) -> [{async_pool_flush, #{pool_id => PoolId, host_type => HostType}, - #{metrics => #{timed => counter, batch => counter}}}, + #{metrics => #{timed => spiral, batch => spiral}}}, {async_pool_request, #{pool_id => PoolId, host_type => HostType}, - #{metrics => #{count => counter}}}]. + #{metrics => #{count => spiral}}}]. From 3c21c8cca34528499f2a8409c3f4840b81b6b583 Mon Sep 17 00:00:00 2001 From: jacekwegr Date: Fri, 26 Jul 2024 13:55:02 +0200 Subject: [PATCH 7/8] Fix flaky `async_pools` instrumentation tests --- big_tests/tests/inbox_SUITE.erl | 10 +++++----- big_tests/tests/inbox_helper.erl | 9 +++++---- big_tests/tests/mam_SUITE.erl | 6 +++--- big_tests/tests/mam_helper.erl | 8 ++++---- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/big_tests/tests/inbox_SUITE.erl b/big_tests/tests/inbox_SUITE.erl index a0570115ba2..b922e0b5e67 100644 --- a/big_tests/tests/inbox_SUITE.erl +++ b/big_tests/tests/inbox_SUITE.erl @@ -23,7 +23,7 @@ assert_invalid_inbox_form_value_error/3, assert_invalid_reset_inbox/4, extract_user_specs/1, - assert_async_request_event/2 + assert_async_request_event/1 ]). -define(ROOM3, <<"testroom3">>). @@ -1363,7 +1363,7 @@ rest_api_bin_flush_user(Config) -> ?assertEqual(1, NumOfRows), check_inbox(Bob, [], #{box => bin}) end), - assert_async_request_event(TS, 10). + assert_async_request_event(TS). rest_api_bin_flush_user_errors(Config) -> Config1 = escalus_fresh:create_users(Config, [{alice, 1}]), @@ -1390,7 +1390,7 @@ rest_api_bin_flush_all(Config) -> check_inbox(Bob, [], #{box => bin}), check_inbox(Kate, [], #{box => bin}) end), - assert_async_request_event(TS, 10). + assert_async_request_event(TS). rest_api_bin_flush_all_errors(_Config) -> HostTypePath = uri_string:normalize(#{path => domain_helper:host_type()}), @@ -1407,7 +1407,7 @@ timeout_cleaner_flush_all(Config) -> check_inbox(Bob, [], #{box => bin}), check_inbox(Kate, [], #{box => bin}) end), - assert_async_request_event(TS, 10). + assert_async_request_event(TS). xmpp_bin_flush(Config) -> TS = instrument_helper:timestamp(), @@ -1423,7 +1423,7 @@ xmpp_bin_flush(Config) -> escalus:assert(is_iq_result, [Iq], escalus:wait_for_stanza(Bob)), check_inbox(Bob, [], #{box => bin}) end), - assert_async_request_event(TS, 10). + assert_async_request_event(TS). %% helpers diff --git a/big_tests/tests/inbox_helper.erl b/big_tests/tests/inbox_helper.erl index 998d1afc372..449f41191fc 100644 --- a/big_tests/tests/inbox_helper.erl +++ b/big_tests/tests/inbox_helper.erl @@ -73,7 +73,7 @@ domain/0, to_bare_lower/1, extract_user_specs/1, - assert_async_request_event/2 + assert_async_request_event/1 ]). -import(muc_helper, [foreach_recipient/2]). @@ -868,11 +868,12 @@ assert_message_content(Msg, Field, Value) -> ?assertNotEqual(nomatch, binary:match(Msg, Field)), ?assertNotEqual(nomatch, binary:match(Msg, Value)). -assert_async_request_event(TS, ExpectedCount) -> - instrument_helper:assert(async_pool_request, +assert_async_request_event(TS) -> + instrument_helper:assert( + async_pool_request, #{host_type => domain_helper:host_type(), pool_id => inbox}, fun(#{count := 1}) -> true end, - #{min_timestamp => TS, expected_count => ExpectedCount}). + #{min_timestamp => TS}). %% TODO: properly extract the specs from Bob extract_user_specs(User) -> diff --git a/big_tests/tests/mam_SUITE.erl b/big_tests/tests/mam_SUITE.erl index d6160bb827c..9376116b3c8 100644 --- a/big_tests/tests/mam_SUITE.erl +++ b/big_tests/tests/mam_SUITE.erl @@ -114,7 +114,7 @@ assert_lookup_event/2, assert_flushed_event_if_async/2, assert_async_batch_flush_event/3, - assert_async_timed_flush_event/4, + assert_async_timed_flush_event/3, assert_dropped_iq_event/2, assert_event_with_jid/2, assert_no_event_with_jid/2 @@ -1749,8 +1749,8 @@ muc_light_shouldnt_modify_pm_archive(Config) -> when_archive_query_is_sent(Bob, undefined, Config), then_archive_response_is(Bob, [{message, Alice, <<"private hi!">>}], Config) end), - assert_async_timed_flush_event(Config, TS, 2, pm_mam), - assert_async_timed_flush_event(Config, TS, 2, muc_mam). + assert_async_timed_flush_event(Config, TS, pm_mam), + assert_async_timed_flush_event(Config, TS, muc_mam). muc_light_stored_in_pm_if_allowed_to(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) -> diff --git a/big_tests/tests/mam_helper.erl b/big_tests/tests/mam_helper.erl index b0b4f46e29f..cd0a6058972 100644 --- a/big_tests/tests/mam_helper.erl +++ b/big_tests/tests/mam_helper.erl @@ -1456,18 +1456,18 @@ assert_flushed_event_if_async(EventName, Config) -> assert_async_batch_flush_event(TS, ExpectedCount, PoolId) -> instrument_helper:assert( async_pool_flush, - #{host_type => domain_helper:host_type(), pool_id => PoolId}, + #{host_type => host_type(), pool_id => PoolId}, fun(#{batch := 1}) -> true end, #{min_timestamp => TS, expected_count => ExpectedCount}). -assert_async_timed_flush_event(Config, TS, ExpectedCount, PoolId) -> +assert_async_timed_flush_event(Config, TS, PoolId) -> case ?config(configuration, Config) of rdbms_async_pool -> instrument_helper:assert( async_pool_flush, - #{host_type => domain_helper:host_type(), pool_id => PoolId}, + #{host_type => host_type(), pool_id => PoolId}, fun(#{timed := 1}) -> true end, - #{min_timestamp => TS, expected_count => ExpectedCount}); + #{min_timestamp => TS}); _ -> ok end. From 72c3de42877886bdcedca91770c34f9ae483b2a4 Mon Sep 17 00:00:00 2001 From: jacekwegr Date: Tue, 30 Jul 2024 10:26:54 +0200 Subject: [PATCH 8/8] Parallelize `async_workers` tests --- test/batches_SUITE.erl | 133 +++++++++++++++++++++++++---------------- 1 file changed, 82 insertions(+), 51 deletions(-) diff --git a/test/batches_SUITE.erl b/test/batches_SUITE.erl index 5c412e77ee4..97eb4ad5509 100644 --- a/test/batches_SUITE.erl +++ b/test/batches_SUITE.erl @@ -21,7 +21,7 @@ groups() -> external_stop_does_nothing, shared_cache_inserts_in_shared_table ]}, - {async_workers, [sequence], + {async_workers, [parallel], [ broadcast_reaches_all_workers, broadcast_reaches_all_keys, @@ -42,27 +42,64 @@ groups() -> init_per_suite(Config) -> mongoose_config:set_opts(opts()), - async_helper:start(Config, mongoose_instrument, start_link, []). + Config. -end_per_suite(Config) -> - async_helper:stop_all(Config), +end_per_suite(_Config) -> mongoose_config:erase_opts(), meck:unload(). -init_per_group(_, Config) -> - Config. +init_per_group(cache, Config) -> + async_helper:start(Config, mongoose_instrument, start_link, []); +init_per_group(async_workers, Config) -> + meck:new(logger, [passthrough, unstick, no_link]), + async_helper:start(Config, [{mongoose_instrument, start_link, []}, + {mim_ct_sup, start_link, [ejabberd_sup]}]). -end_per_group(_, _Config) -> - ok. +end_per_group(cache, Config) -> + async_helper:stop_all(Config); +end_per_group(async_workers, Config) -> + async_helper:stop_all(Config), + meck:unload(logger). -init_per_testcase(_TestCase, Config) -> +init_per_testcase(TestCase, Config) when TestCase =:= internal_starts_another_cache; + TestCase =:= external_does_not_start_another_cache; + TestCase =:= internal_stop_does_stop_the_cache; + TestCase =:= external_stop_does_nothing; + TestCase =:= shared_cache_inserts_in_shared_table -> pg:start_link(mim_scope), mim_ct_sup:start_link(ejabberd_sup), meck:new(gen_mod, [passthrough]), + Config; +init_per_testcase(broadcast_reaches_all_workers = TestCase, Config) -> + {ok, Server} = gen_server:start_link(?MODULE, [], []), + WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate, + pool_size => 10}, + {ok, _} = mongoose_async_pools:start_pool(host_type(), TestCase, WPoolOpts), + [{server_pid, Server} | Config]; +init_per_testcase(broadcast_reaches_all_keys = TestCase, Config) -> + {ok, Server} = gen_server:start_link(?MODULE, [], []), + Tid = ets:new(table, [public, {read_concurrency, true}]), + Req = create_request_callback(Tid, Server), + WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate, + pool_size => 3, + request_callback => Req}, + {ok, _} = mongoose_async_pools:start_pool(host_type(), TestCase, WPoolOpts), + [{server_pid, Server}, {tid, Tid} | Config]; +init_per_testcase(TestCase, Config) -> + {ok, Server} = gen_server:start_link(?MODULE, [], []), + WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate, + pool_size => 3}, + {ok, _} = mongoose_async_pools:start_pool(host_type(), TestCase, WPoolOpts), Config. + +end_per_testcase(TestCase, _Config) when TestCase =:= internal_starts_another_cache; + TestCase =:= external_does_not_start_another_cache; + TestCase =:= internal_stop_does_stop_the_cache; + TestCase =:= external_stop_does_nothing; + TestCase =:= shared_cache_inserts_in_shared_table -> + meck:unload(gen_mod); end_per_testcase(_TestCase, _Config) -> - meck:unload(gen_mod), ok. opts() -> @@ -77,6 +114,17 @@ cache_config(internal) -> cache_config(Module) -> #{module => Module}. +create_request_callback(Tid, Server) -> + fun(Task, _) -> + case ets:member(Tid, continue) of + true -> + gen_server:send_request(Server, Task); + false -> + async_helper:wait_until(fun() -> ets:member(Tid, continue) end, true), + gen_server:send_request(Server, 0) + end + end. + %% Tests internal_starts_another_cache(_) -> mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()), @@ -121,41 +169,25 @@ aggregation_might_produce_noop_requests(_) -> {ok, Server} = gen_server:start_link(?MODULE, [], []), Requestor = fun(1, _) -> timer:sleep(1), gen_server:send_request(Server, 1); (_, _) -> drop end, - Opts = (default_aggregator_opts(Server))#{pool_id => test_pool, + Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME, request_callback => Requestor}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], async_helper:wait_until( fun() -> gen_server:call(Server, get_acc) end, 1). -broadcast_reaches_all_workers(_) -> - {ok, Server} = gen_server:start_link(?MODULE, [], []), - WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate, - pool_size => 10}, - {ok, _} = mongoose_async_pools:start_pool(host_type(), first_pool, WPoolOpts), - mongoose_async_pools:broadcast_task(host_type(), first_pool, key, 1), +broadcast_reaches_all_workers(Config) -> + Server = proplists:get_value(server_pid, Config), + mongoose_async_pools:broadcast_task(host_type(), ?FUNCTION_NAME, key, 1), async_helper:wait_until( fun() -> gen_server:call(Server, get_acc) end, 10). -broadcast_reaches_all_keys(_) -> +broadcast_reaches_all_keys(Config) -> HostType = host_type(), - {ok, Server} = gen_server:start_link(?MODULE, [], []), - Tid = ets:new(table, [public, {read_concurrency, true}]), - Req = fun(Task, _) -> - case ets:member(Tid, continue) of - true -> - gen_server:send_request(Server, Task); - false -> - async_helper:wait_until(fun() -> ets:member(Tid, continue) end, true), - gen_server:send_request(Server, 0) - end - end, - WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate, - pool_size => 3, - request_callback => Req}, - {ok, _} = mongoose_async_pools:start_pool(HostType, test_pool, WPoolOpts), - [ mongoose_async_pools:put_task(HostType, test_pool, N, 1) || N <- lists:seq(0, 1000) ], - mongoose_async_pools:broadcast(HostType, test_pool, -1), + Server = proplists:get_value(server_pid, Config), + Tid = proplists:get_value(tid, Config), + [ mongoose_async_pools:put_task(HostType, ?FUNCTION_NAME, N, 1) || N <- lists:seq(0, 1000) ], + mongoose_async_pools:broadcast(HostType, ?FUNCTION_NAME, -1), ets:insert(Tid, {continue, true}), async_helper:wait_until( fun() -> gen_server:call(Server, get_acc) end, 0). @@ -163,9 +195,8 @@ broadcast_reaches_all_keys(_) -> timeouts_and_canceled_timers_do_not_need_to_log_messages(_) -> Timeout = 10, QueueSize = 2, - meck:new(logger, [passthrough, unstick]), Opts = #{host_type => host_type(), - pool_id => test_pool, + pool_id => ?FUNCTION_NAME, batch_size => QueueSize, flush_interval => Timeout, flush_callback => fun(_, _) -> ok end, @@ -173,14 +204,14 @@ timeouts_and_canceled_timers_do_not_need_to_log_messages(_) -> {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), [ gen_server:cast(Pid, {task, ok}) || _ <- lists:seq(1, QueueSize) ], ct:sleep(Timeout*2), - ?assertEqual(0, meck:num_calls(logger, macro_log, '_')). + ?assertEqual(0, meck:num_calls(logger, macro_log, ['_', '_', #{pool_id => ?FUNCTION_NAME}])). + prepare_task_works(_) -> Timeout = 1000, QueueSize = 2, T = self(), - meck:new(logger, [passthrough, unstick]), Opts = #{host_type => host_type(), - pool_id => test_pool, + pool_id => ?FUNCTION_NAME, batch_size => QueueSize, flush_interval => Timeout, prep_callback => fun(0, _) -> {error, bad}; @@ -196,11 +227,11 @@ prepare_task_works(_) -> after Timeout*2 -> ct:fail(no_answer_received) end, - ?assert(0 < meck:num_calls(logger, macro_log, '_')). + ?assert(0 < meck:num_calls(logger, macro_log, ['_', '_', #{pool_id => ?FUNCTION_NAME}])). sync_flushes_down_everything(_) -> Opts = #{host_type => host_type(), - pool_id => test_pool, + pool_id => ?FUNCTION_NAME, batch_size => 5000, flush_interval => 5000, flush_callback => fun(_, _) -> ok end, @@ -212,7 +243,7 @@ sync_flushes_down_everything(_) -> sync_aggregates_down_everything(_) -> {ok, Server} = gen_server:start_link(?MODULE, [], []), - Opts = (default_aggregator_opts(Server))#{pool_id => test_pool}, + Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), ?assertEqual(skipped, gen_server:call(Pid, sync)), [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], @@ -222,7 +253,7 @@ sync_aggregates_down_everything(_) -> aggregating_error_is_handled_and_can_continue(_) -> {ok, Server} = gen_server:start_link(?MODULE, [], []), Requestor = fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end, - Opts = (default_aggregator_opts(Server))#{pool_id => test_pool, + Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME, request_callback => Requestor}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 10) ], @@ -236,14 +267,14 @@ aggregating_error_is_handled_and_can_continue(_) -> async_request(_) -> {ok, Server} = gen_server:start_link(?MODULE, [], []), - Opts = (default_aggregator_opts(Server))#{pool_id => test_pool}, + Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], async_helper:wait_until( fun() -> gen_server:call(Server, get_acc) end, 500500). retry_request(_) -> - Opts = (retry_aggregator_opts())#{pool_id => test_pool}, + Opts = (retry_aggregator_opts())#{pool_id => ?FUNCTION_NAME}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), gen_server:cast(Pid, {task, key, 1}), receive_task_called(0), @@ -254,7 +285,7 @@ retry_request(_) -> ensure_no_tasks_to_receive(). retry_request_cancelled(_) -> - Opts = (retry_aggregator_opts())#{pool_id => test_pool, + Opts = (retry_aggregator_opts())#{pool_id => ?FUNCTION_NAME, request_callback => fun do_cancel_request/2}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), gen_server:cast(Pid, {task, key, 1}), @@ -269,7 +300,7 @@ retry_request_cancelled(_) -> receive_task_called(0). retry_request_cancelled_in_verify_function(_) -> - Opts = (retry_aggregator_opts())#{pool_id => test_pool, + Opts = (retry_aggregator_opts())#{pool_id => ?FUNCTION_NAME, request_callback => fun do_request/2, verify_callback => fun validate_all_fails/3}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), @@ -285,7 +316,7 @@ retry_request_cancelled_in_verify_function(_) -> receive_task_called(0). ignore_msg_when_waiting_for_reply(_) -> - Opts = (retry_aggregator_opts())#{pool_id => test_pool, + Opts = (retry_aggregator_opts())#{pool_id => ?FUNCTION_NAME, request_callback => fun do_request_but_ignore_other_messages/2}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), gen_server:cast(Pid, {task, key, 1}), @@ -298,7 +329,7 @@ async_request_fails(_) -> %% Does request that crashes the gen_server, but not the aggregator {ok, Server} = gen_server:start({local, async_req_fails_server}, ?MODULE, [], []), Ref = monitor(process, Server), - Opts = (default_aggregator_opts(async_req_fails_server))#{pool_id => test_pool}, + Opts = (default_aggregator_opts(async_req_fails_server))#{pool_id => ?FUNCTION_NAME}, {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), gen_server:cast(Pid, {task, key, {ack_and_die, self()}}), %% Acked and the server dies @@ -413,7 +444,7 @@ handle_call(get_acc, _From, Acc) -> {reply, Acc, Acc}; handle_call(return_error, _From, Acc) -> {reply, {error, return_error}, Acc}; -handle_call({ack_and_die, Pid}, _From, Acc) -> +handle_call({ack_and_die, Pid}, _From, _Acc) -> Pid ! {acked, self()}, error(oops); handle_call({ack, Pid}, _From, Acc) ->