Skip to content

Commit

Permalink
Parallelize async_workers tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jacekwegr committed Jul 30, 2024
1 parent 3c21c8c commit 72c3de4
Showing 1 changed file with 82 additions and 51 deletions.
133 changes: 82 additions & 51 deletions test/batches_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ groups() ->
external_stop_does_nothing,
shared_cache_inserts_in_shared_table
]},
{async_workers, [sequence],
{async_workers, [parallel],
[
broadcast_reaches_all_workers,
broadcast_reaches_all_keys,
Expand All @@ -42,27 +42,64 @@ groups() ->

init_per_suite(Config) ->
mongoose_config:set_opts(opts()),
async_helper:start(Config, mongoose_instrument, start_link, []).
Config.

end_per_suite(Config) ->
async_helper:stop_all(Config),
end_per_suite(_Config) ->
mongoose_config:erase_opts(),
meck:unload().

init_per_group(_, Config) ->
Config.
init_per_group(cache, Config) ->
async_helper:start(Config, mongoose_instrument, start_link, []);
init_per_group(async_workers, Config) ->
meck:new(logger, [passthrough, unstick, no_link]),
async_helper:start(Config, [{mongoose_instrument, start_link, []},
{mim_ct_sup, start_link, [ejabberd_sup]}]).

end_per_group(_, _Config) ->
ok.
end_per_group(cache, Config) ->
async_helper:stop_all(Config);
end_per_group(async_workers, Config) ->
async_helper:stop_all(Config),
meck:unload(logger).

init_per_testcase(_TestCase, Config) ->
init_per_testcase(TestCase, Config) when TestCase =:= internal_starts_another_cache;
TestCase =:= external_does_not_start_another_cache;
TestCase =:= internal_stop_does_stop_the_cache;
TestCase =:= external_stop_does_nothing;
TestCase =:= shared_cache_inserts_in_shared_table ->
pg:start_link(mim_scope),
mim_ct_sup:start_link(ejabberd_sup),
meck:new(gen_mod, [passthrough]),
Config;
init_per_testcase(broadcast_reaches_all_workers = TestCase, Config) ->
{ok, Server} = gen_server:start_link(?MODULE, [], []),
WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate,
pool_size => 10},
{ok, _} = mongoose_async_pools:start_pool(host_type(), TestCase, WPoolOpts),
[{server_pid, Server} | Config];
init_per_testcase(broadcast_reaches_all_keys = TestCase, Config) ->
{ok, Server} = gen_server:start_link(?MODULE, [], []),
Tid = ets:new(table, [public, {read_concurrency, true}]),
Req = create_request_callback(Tid, Server),
WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate,
pool_size => 3,
request_callback => Req},
{ok, _} = mongoose_async_pools:start_pool(host_type(), TestCase, WPoolOpts),
[{server_pid, Server}, {tid, Tid} | Config];
init_per_testcase(TestCase, Config) ->
{ok, Server} = gen_server:start_link(?MODULE, [], []),
WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate,
pool_size => 3},
{ok, _} = mongoose_async_pools:start_pool(host_type(), TestCase, WPoolOpts),
Config.


end_per_testcase(TestCase, _Config) when TestCase =:= internal_starts_another_cache;
TestCase =:= external_does_not_start_another_cache;
TestCase =:= internal_stop_does_stop_the_cache;
TestCase =:= external_stop_does_nothing;
TestCase =:= shared_cache_inserts_in_shared_table ->
meck:unload(gen_mod);
end_per_testcase(_TestCase, _Config) ->
meck:unload(gen_mod),
ok.

opts() ->
Expand All @@ -77,6 +114,17 @@ cache_config(internal) ->
cache_config(Module) ->
#{module => Module}.

create_request_callback(Tid, Server) ->
fun(Task, _) ->
case ets:member(Tid, continue) of
true ->
gen_server:send_request(Server, Task);
false ->
async_helper:wait_until(fun() -> ets:member(Tid, continue) end, true),
gen_server:send_request(Server, 0)
end
end.

%% Tests
internal_starts_another_cache(_) ->
mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()),
Expand Down Expand Up @@ -121,66 +169,49 @@ aggregation_might_produce_noop_requests(_) ->
{ok, Server} = gen_server:start_link(?MODULE, [], []),
Requestor = fun(1, _) -> timer:sleep(1), gen_server:send_request(Server, 1);
(_, _) -> drop end,
Opts = (default_aggregator_opts(Server))#{pool_id => test_pool,
Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME,
request_callback => Requestor},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
[ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ],
async_helper:wait_until(
fun() -> gen_server:call(Server, get_acc) end, 1).

broadcast_reaches_all_workers(_) ->
{ok, Server} = gen_server:start_link(?MODULE, [], []),
WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate,
pool_size => 10},
{ok, _} = mongoose_async_pools:start_pool(host_type(), first_pool, WPoolOpts),
mongoose_async_pools:broadcast_task(host_type(), first_pool, key, 1),
broadcast_reaches_all_workers(Config) ->
Server = proplists:get_value(server_pid, Config),
mongoose_async_pools:broadcast_task(host_type(), ?FUNCTION_NAME, key, 1),
async_helper:wait_until(
fun() -> gen_server:call(Server, get_acc) end, 10).

broadcast_reaches_all_keys(_) ->
broadcast_reaches_all_keys(Config) ->
HostType = host_type(),
{ok, Server} = gen_server:start_link(?MODULE, [], []),
Tid = ets:new(table, [public, {read_concurrency, true}]),
Req = fun(Task, _) ->
case ets:member(Tid, continue) of
true ->
gen_server:send_request(Server, Task);
false ->
async_helper:wait_until(fun() -> ets:member(Tid, continue) end, true),
gen_server:send_request(Server, 0)
end
end,
WPoolOpts = (default_aggregator_opts(Server))#{pool_type => aggregate,
pool_size => 3,
request_callback => Req},
{ok, _} = mongoose_async_pools:start_pool(HostType, test_pool, WPoolOpts),
[ mongoose_async_pools:put_task(HostType, test_pool, N, 1) || N <- lists:seq(0, 1000) ],
mongoose_async_pools:broadcast(HostType, test_pool, -1),
Server = proplists:get_value(server_pid, Config),
Tid = proplists:get_value(tid, Config),
[ mongoose_async_pools:put_task(HostType, ?FUNCTION_NAME, N, 1) || N <- lists:seq(0, 1000) ],
mongoose_async_pools:broadcast(HostType, ?FUNCTION_NAME, -1),
ets:insert(Tid, {continue, true}),
async_helper:wait_until(
fun() -> gen_server:call(Server, get_acc) end, 0).

timeouts_and_canceled_timers_do_not_need_to_log_messages(_) ->
Timeout = 10,
QueueSize = 2,
meck:new(logger, [passthrough, unstick]),
Opts = #{host_type => host_type(),
pool_id => test_pool,
pool_id => ?FUNCTION_NAME,
batch_size => QueueSize,
flush_interval => Timeout,
flush_callback => fun(_, _) -> ok end,
flush_extra => #{host_type => host_type(), queue_length => 0}},
{ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
[ gen_server:cast(Pid, {task, ok}) || _ <- lists:seq(1, QueueSize) ],
ct:sleep(Timeout*2),
?assertEqual(0, meck:num_calls(logger, macro_log, '_')).
?assertEqual(0, meck:num_calls(logger, macro_log, ['_', '_', #{pool_id => ?FUNCTION_NAME}])).

prepare_task_works(_) ->
Timeout = 1000,
QueueSize = 2,
T = self(),
meck:new(logger, [passthrough, unstick]),
Opts = #{host_type => host_type(),
pool_id => test_pool,
pool_id => ?FUNCTION_NAME,
batch_size => QueueSize,
flush_interval => Timeout,
prep_callback => fun(0, _) -> {error, bad};
Expand All @@ -196,11 +227,11 @@ prepare_task_works(_) ->
after
Timeout*2 -> ct:fail(no_answer_received)
end,
?assert(0 < meck:num_calls(logger, macro_log, '_')).
?assert(0 < meck:num_calls(logger, macro_log, ['_', '_', #{pool_id => ?FUNCTION_NAME}])).

sync_flushes_down_everything(_) ->
Opts = #{host_type => host_type(),
pool_id => test_pool,
pool_id => ?FUNCTION_NAME,
batch_size => 5000,
flush_interval => 5000,
flush_callback => fun(_, _) -> ok end,
Expand All @@ -212,7 +243,7 @@ sync_flushes_down_everything(_) ->

sync_aggregates_down_everything(_) ->
{ok, Server} = gen_server:start_link(?MODULE, [], []),
Opts = (default_aggregator_opts(Server))#{pool_id => test_pool},
Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
?assertEqual(skipped, gen_server:call(Pid, sync)),
[ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ],
Expand All @@ -222,7 +253,7 @@ sync_aggregates_down_everything(_) ->
aggregating_error_is_handled_and_can_continue(_) ->
{ok, Server} = gen_server:start_link(?MODULE, [], []),
Requestor = fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end,
Opts = (default_aggregator_opts(Server))#{pool_id => test_pool,
Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME,
request_callback => Requestor},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
[ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 10) ],
Expand All @@ -236,14 +267,14 @@ aggregating_error_is_handled_and_can_continue(_) ->

async_request(_) ->
{ok, Server} = gen_server:start_link(?MODULE, [], []),
Opts = (default_aggregator_opts(Server))#{pool_id => test_pool},
Opts = (default_aggregator_opts(Server))#{pool_id => ?FUNCTION_NAME},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
[ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ],
async_helper:wait_until(
fun() -> gen_server:call(Server, get_acc) end, 500500).

retry_request(_) ->
Opts = (retry_aggregator_opts())#{pool_id => test_pool},
Opts = (retry_aggregator_opts())#{pool_id => ?FUNCTION_NAME},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
gen_server:cast(Pid, {task, key, 1}),
receive_task_called(0),
Expand All @@ -254,7 +285,7 @@ retry_request(_) ->
ensure_no_tasks_to_receive().

retry_request_cancelled(_) ->
Opts = (retry_aggregator_opts())#{pool_id => test_pool,
Opts = (retry_aggregator_opts())#{pool_id => ?FUNCTION_NAME,
request_callback => fun do_cancel_request/2},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
gen_server:cast(Pid, {task, key, 1}),
Expand All @@ -269,7 +300,7 @@ retry_request_cancelled(_) ->
receive_task_called(0).

retry_request_cancelled_in_verify_function(_) ->
Opts = (retry_aggregator_opts())#{pool_id => test_pool,
Opts = (retry_aggregator_opts())#{pool_id => ?FUNCTION_NAME,
request_callback => fun do_request/2,
verify_callback => fun validate_all_fails/3},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
Expand All @@ -285,7 +316,7 @@ retry_request_cancelled_in_verify_function(_) ->
receive_task_called(0).

ignore_msg_when_waiting_for_reply(_) ->
Opts = (retry_aggregator_opts())#{pool_id => test_pool,
Opts = (retry_aggregator_opts())#{pool_id => ?FUNCTION_NAME,
request_callback => fun do_request_but_ignore_other_messages/2},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
gen_server:cast(Pid, {task, key, 1}),
Expand All @@ -298,7 +329,7 @@ async_request_fails(_) ->
%% Does request that crashes the gen_server, but not the aggregator
{ok, Server} = gen_server:start({local, async_req_fails_server}, ?MODULE, [], []),
Ref = monitor(process, Server),
Opts = (default_aggregator_opts(async_req_fails_server))#{pool_id => test_pool},
Opts = (default_aggregator_opts(async_req_fails_server))#{pool_id => ?FUNCTION_NAME},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
gen_server:cast(Pid, {task, key, {ack_and_die, self()}}),
%% Acked and the server dies
Expand Down Expand Up @@ -413,7 +444,7 @@ handle_call(get_acc, _From, Acc) ->
{reply, Acc, Acc};
handle_call(return_error, _From, Acc) ->
{reply, {error, return_error}, Acc};
handle_call({ack_and_die, Pid}, _From, Acc) ->
handle_call({ack_and_die, Pid}, _From, _Acc) ->
Pid ! {acked, self()},
error(oops);
handle_call({ack, Pid}, _From, Acc) ->
Expand Down

0 comments on commit 72c3de4

Please sign in to comment.