Skip to content

Commit

Permalink
Merge pull request #3149 from esl/updating-mod_stream_management-module
Browse files Browse the repository at this point in the history
updating mod_stream_management module
  • Loading branch information
chrzaszcz authored Jun 8, 2021
2 parents 85e657c + f1cb05c commit 647b6fb
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 162 deletions.
17 changes: 17 additions & 0 deletions big_tests/dynamic_domains.spec
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@
disco_rooms_rsm],
"at the moment mod_disco doesn't support dynamic domains"}.

{suites, "tests", sm_SUITE}.
{skip_cases, "tests", sm_SUITE,
[basic_ack,
h_ok_after_session_enabled_after_session,
subscription_requests_are_buffered_properly],
"at the moment mod_roster doesn't support dynamic domains"}.
{skip_cases, "tests", sm_SUITE,
[resend_unacked_on_reconnection,
session_established,
wait_for_resumption,
resume_session_kills_old_C2S_gracefully,
resend_unacked_after_resume_timeout,
resend_more_offline_messages_than_buffer_size,
resume_expired_session_returns_correct_h,
unacknowledged_message_hook_offline],
"at the moment mod_offline doesn't support dynamic domains"}.

{config, ["dynamic_domains.config", "test.config"]}.

{logdir, "ct_report"}.
Expand Down
19 changes: 16 additions & 3 deletions big_tests/tests/ct_helper.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
-module(ct_helper).
-export([is_ct_running/0,
repeat_all_until_all_ok/1,
repeat_all_until_all_ok/2
]).
repeat_all_until_all_ok/1,
repeat_all_until_all_ok/2,
repeat_all_until_any_fail/1,
repeat_all_until_any_fail/2]).

-type group_name() :: atom().

Expand All @@ -27,6 +28,11 @@ is_ct_running() ->
repeat_all_until_all_ok(GroupDefs) ->
repeat_all_until_all_ok(GroupDefs, 3).

-spec repeat_all_until_any_fail([group_def() | group_def_dirty() | group_def_incomplete()]) ->
[group_def()].
repeat_all_until_any_fail(GroupDefs) ->
repeat_all_until_any_fail(GroupDefs, 100).

%% @doc repeat_all_until_all_ok/2 will rewrite your group definitions so that
%% the `{repeat_until_all_ok, Retries}` property is added to all of them.
%% For example, for the following definitions:
Expand All @@ -51,6 +57,13 @@ repeat_all_until_all_ok(GroupDefs, Retries) ->
[ {Name, maybe_add_repeat_type(repeat_until_all_ok, Retries, Properties), Tests}
|| {Name, Properties, Tests} <- prepare_group_defs(GroupDefs) ].

-spec repeat_all_until_any_fail([group_def() | group_def_dirty() | group_def_incomplete()],
repeat_num()) ->
[group_def()].
repeat_all_until_any_fail(GroupDefs, Retries) ->
[ {Name, maybe_add_repeat_type(repeat_until_any_fail, Retries, Properties), Tests}
|| {Name, Properties, Tests} <- prepare_group_defs(GroupDefs) ].

-spec maybe_add_repeat_type(repeat_type(), repeat_num(), group_props()) -> group_props().
maybe_add_repeat_type(RepeatType, Retries, Properties) ->
case lists:any(fun(El) -> proplists:is_defined(El, Properties) end, all_repeat_modes()) of
Expand Down
113 changes: 63 additions & 50 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

-import(escalus_stanza, [setattr/3]).

-import(domain_helper, [host_type/0]).

-define(BIG_BIG_BIG_TIMEOUT, 3600).
-define(SHORT_RESUME_TIMEOUT, 3).
-define(SMALL_SM_BUFFER, 3).
Expand Down Expand Up @@ -109,9 +111,6 @@ unacknowledged_message_hook() ->
suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().

domain() ->
ct:get_config({hosts, mim, domain}).

stream_management_with_stale_h(RepeatAfter, Geriatric) ->
[{mod_stream_management,
[
Expand Down Expand Up @@ -141,45 +140,45 @@ end_per_suite(Config) ->

init_per_group(G, Config) when G =:= unacknowledged_message_hook;
G =:= manual_ack_freq_long_session_timeout ->
true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), 1]),
true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), 1]),
escalus_users:update_userspec(Config, alice, manual_ack, true);
init_per_group(parallel_manual_ack_freq_1, Config) ->
true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), 1]),
rpc(mim(), ?MOD_SM, set_resume_timeout, [domain(), ?SHORT_RESUME_TIMEOUT]),
true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), 1]),
rpc(mim(), ?MOD_SM, set_resume_timeout, [host_type(), ?SHORT_RESUME_TIMEOUT]),
escalus_users:update_userspec(Config, alice, manual_ack, true);
init_per_group(stale_h, Config) ->
escalus_users:update_userspec(Config, alice, manual_ack, true);
init_per_group(stream_mgmt_disabled, Config) ->
Config2 = dynamic_modules:save_modules(domain(), Config),
dynamic_modules:stop(domain(), ?MOD_SM),
Config2 = dynamic_modules:save_modules(host_type(), Config),
dynamic_modules:stop(host_type(), ?MOD_SM),
rpc(mim(), mnesia, delete_table, [sm_session]),
escalus_users:update_userspec(Config2, alice, manual_ack, true);
init_per_group(_GroupName, Config) ->
Config.

end_per_group(stream_mgmt_disabled, Config) ->
dynamic_modules:restore_modules(domain(), Config);
dynamic_modules:restore_modules(host_type(), Config);
end_per_group(G, Config) when G =:= unacknowledged_message_hook;
G =:= manual_ack_freq_long_session_timeout ->
true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), never]),
true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), never]),
Config;
end_per_group(parallel_manual_ack_freq_1, Config) ->
true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), never]),
rpc(mim(), ?MOD_SM, set_resume_timeout, [domain(), 600]),
true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), never]),
rpc(mim(), ?MOD_SM, set_resume_timeout, [host_type(), 600]),
Config;
end_per_group(_GroupName, Config) ->
Config.


set_gc_parameters(RepeatAfter, Geriatric, Config) ->
Config2 = dynamic_modules:save_modules(domain(), Config),
Config2 = dynamic_modules:save_modules(host_type(), Config),
dynamic_modules:ensure_modules(
domain(), stream_management_with_stale_h(RepeatAfter, Geriatric)),
host_type(), stream_management_with_stale_h(RepeatAfter, Geriatric)),
Config2.

register_smid(IntSmidId) ->
S = {SMID = make_smid(), IntSmidId},
ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [domain(), SMID, IntSmidId]),
ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]),
S.

register_some_smid_h(Config) ->
Expand All @@ -188,8 +187,8 @@ register_some_smid_h(Config) ->

init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) ->
Config2 = set_gc_parameters(?BIG_BIG_BIG_TIMEOUT, ?BIG_BIG_BIG_TIMEOUT, Config),
rpc(mim(), ?MOD_SM, set_resume_timeout, [domain(), ?SHORT_RESUME_TIMEOUT]),
true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), 1]),
rpc(mim(), ?MOD_SM, set_resume_timeout, [host_type(), ?SHORT_RESUME_TIMEOUT]),
true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), 1]),
escalus:init_per_testcase(CN, Config2);
init_per_testcase(gc_repeat_after_never_means_no_cleaning = CN, Config) ->
Config2 = set_gc_parameters(?BIG_BIG_BIG_TIMEOUT, ?SHORT_RESUME_TIMEOUT, Config),
Expand All @@ -200,10 +199,10 @@ init_per_testcase(gc_repeat_after_timeout_does_clean = CN, Config) ->
Config3 = register_some_smid_h(Config2),
escalus:init_per_testcase(CN, Config3);
init_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), 2]),
true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), 2]),
escalus:init_per_testcase(CN, Config);
init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
register_handler(<<"localhost">>),
register_handler(),
escalus:init_per_testcase(CN, Config);
init_per_testcase(CaseName, Config) ->
escalus:init_per_testcase(CaseName, Config).
Expand All @@ -212,15 +211,15 @@ end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_
CN =:= gc_repeat_after_never_means_no_cleaning;
CN =:= gc_repeat_after_timeout_does_clean
->
dynamic_modules:stop(domain(), ?MOD_SM),
dynamic_modules:stop(host_type(), ?MOD_SM),
rpc(mim(), ejabberd_sup, stop_child, [stream_management_stale_h]),
dynamic_modules:restore_modules(domain(), Config),
dynamic_modules:restore_modules(host_type(), Config),
escalus:end_per_testcase(CN, Config);
end_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
true = rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), never]),
true = rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), never]),
escalus:end_per_testcase(CN, Config);
end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
unregister_handler(<<"localhost">>),
unregister_handler(),
escalus:end_per_testcase(CN, Config);
end_per_testcase(CaseName, Config) ->
escalus:end_per_testcase(CaseName, Config).
Expand Down Expand Up @@ -676,15 +675,15 @@ resume_expired_session_returns_correct_h(Config) ->
escalus_connection:stop(NewAlice).

gc_repeat_after_never_means_no_cleaning(Config) ->
true = rpc(mim(), ?MOD_SM, set_stale_h_repeat_after, [domain(), ?BIG_BIG_BIG_TIMEOUT]),
true = rpc(mim(), ?MOD_SM, set_stale_h_repeat_after, [host_type(), ?BIG_BIG_BIG_TIMEOUT]),
[{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config),
{stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [domain(), SMID1]),
{stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [domain(), SMID2]),
{stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [domain(), SMID3]).
{stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]),
{stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]),
{stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]).
gc_repeat_after_timeout_does_clean(Config) ->
[{SMID1, _} | _ ] = ?config(smid_test, Config),
mongoose_helper:wait_until(fun() ->
rpc(mim(), ?MOD_SM, get_stale_h, [domain(), SMID1])
rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1])
end,
{error, smid_not_found},
#{name => smid_garbage_collected}).
Expand Down Expand Up @@ -718,17 +717,22 @@ resume_session_state_send_message(Config) ->
%% send some messages and check if c2s can handle it
escalus_connection:send(Bob, escalus_stanza:chat_to(common_helper:get_bjid(AliceSpec), <<"msg-2">>)),
escalus_connection:send(Bob, escalus_stanza:chat_to(common_helper:get_bjid(AliceSpec), <<"msg-3">>)),
%% suspend the process to ensure that Alice has enough time to reconnect,
%% before resumption timeout occurs.
ok = rpc(mim(), sys, suspend, [C2SPid]),

%% alice comes back and receives unacked message
{ok, NewAlice, _} = escalus_connection:start(AliceSpec, ConnSteps),
escalus_connection:send(NewAlice, escalus_stanza:presence(<<"available">>)),

Stanzas = [escalus_connection:get_stanza(NewAlice, msg) || _ <- lists:seq(1,4) ],
escalus:assert(is_presence, escalus_connection:get_stanza(NewAlice, presence)),
%% now we can resume c2s process of the old connection
%% and let it process session resumption timeout
ok = rpc(mim(), sys, resume, [C2SPid]),
Stanzas = escalus:wait_for_stanzas(NewAlice, 3),

% what about order ?
% alice receive presence from herself and 3 unacked messages from bob
escalus_new_assert:mix_match([is_presence,
is_chat(<<"msg-1">>),
escalus_new_assert:mix_match([is_chat(<<"msg-1">>),
is_chat(<<"msg-2">>),
is_chat(<<"msg-3">>)],
Stanzas),
Expand Down Expand Up @@ -768,14 +772,19 @@ resume_session_state_stop_c2s(Config) ->
1 = length(get_user_alive_resources(AliceSpec)),
rpc(mim(), ejabberd_c2s, stop, [C2SPid] ),
wait_for_c2s_state_change(C2SPid, resume_session),
%% suspend the process to ensure that Alice has enough time to reconnect,
%% before resumption timeout occurs.
ok = rpc(mim(), sys, suspend, [C2SPid]),

%% alice comes back and receives unacked message
{ok, NewAlice, _} = escalus_connection:start(AliceSpec, ConnSteps),
escalus_connection:send(NewAlice, escalus_stanza:presence(<<"available">>)),
escalus:assert(is_presence, escalus_connection:get_stanza(NewAlice, presence)),
%% now we can resume c2s process of the old connection
%% and let it process session resumption timeout
ok = rpc(mim(), sys, resume, [C2SPid]),

Stanzas = [escalus_connection:get_stanza(NewAlice, msg),
escalus_connection:get_stanza(NewAlice, msg)],
escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas),
escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)),

escalus_connection:stop(NewAlice),
escalus_connection:stop(Bob).
Expand Down Expand Up @@ -1312,9 +1321,9 @@ no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption
%%--------------------------------------------------------------------
start_hook_listener(Resource) ->
TestCasePid = self(),
rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource]).
rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]).

rpc_start_hook_handler(TestCasePid, User) ->
rpc_start_hook_handler(TestCasePid, User, HostType) ->
LUser=jid:nodeprep(User),
Handler = fun(Acc, Jid) ->
{U, _S, R} = jid:to_lower(Jid),
Expand All @@ -1327,7 +1336,7 @@ rpc_start_hook_handler(TestCasePid, User) ->
_ -> Acc
end
end,
ejabberd_hooks:add(unacknowledged_message, <<"localhost">>, Handler, 50).
ejabberd_hooks:add(unacknowledged_message, HostType, Handler, 50).

wait_for_unacked_msg_hook(Counter, Res, Timeout) ->
receive
Expand Down Expand Up @@ -1373,26 +1382,26 @@ discard_offline_messages(Config, User, H) ->
buffer_max(BufferMax) ->
{buffer_max,
fun () ->
rpc(mim(), ?MOD_SM, get_buffer_max, [domain(), unset])
rpc(mim(), ?MOD_SM, get_buffer_max, [host_type(), unset])
end,
fun (unset) ->
ct:pal("buffer_max was not set - setting to 'undefined'"),
rpc(mim(), ?MOD_SM, set_buffer_max, [domain(), undefined]);
rpc(mim(), ?MOD_SM, set_buffer_max, [host_type(), undefined]);
(V) ->
rpc(mim(), ?MOD_SM, set_buffer_max, [domain(), V])
rpc(mim(), ?MOD_SM, set_buffer_max, [host_type(), V])
end,
BufferMax}.

ack_freq(AckFreq) ->
{ack_freq,
fun () ->
rpc(mim(), ?MOD_SM, get_ack_freq, [domain(), unset])
rpc(mim(), ?MOD_SM, get_ack_freq, [host_type(), unset])
end,
fun (unset) ->
ct:pal("ack_freq was not set - setting to 'undefined'"),
rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), undefined]);
rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), undefined]);
(V) ->
rpc(mim(), ?MOD_SM, set_ack_freq, [domain(), V])
rpc(mim(), ?MOD_SM, set_ack_freq, [host_type(), V])
end,
AckFreq}.

Expand Down Expand Up @@ -1512,14 +1521,18 @@ wait_for_queue_length(Pid, Length) ->
regression_ns() ->
<<"regression">>.

register_handler(Host) ->
rpc(mim(), gen_iq_handler, add_iq_handler,
[ejabberd_sm, Host, regression_ns(), ?MODULE, regression_handler, one_queue]).
register_handler() ->
HostType = host_type(),
rpc(mim(), gen_iq_handler, add_iq_handler_for_domain,
[HostType, regression_ns(), ejabberd_sm,
fun ?MODULE:regression_handler/5, #{}, one_queue]).

unregister_handler(Host) ->
rpc(mim(), gen_iq_handler, remove_iq_handler, [ejabberd_sm, Host, regression_ns()]).
unregister_handler() ->
HostType = host_type(),
rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain,
[HostType, regression_ns(), ejabberd_sm]).

regression_handler(_From, _To, Acc, IQ) ->
regression_handler(Acc, _From, _To, IQ, _Extra) ->
%% A bit of a hack - will no longer work when the SID format changes
{_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc),
erlang:monitor(process, Pid),
Expand Down
3 changes: 2 additions & 1 deletion rel/mim1.vars-toml.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
auth.methods = [\"dummy\"]
auth.dummy.base_time = 1
auth.dummy.variance = 5
[host_config.modules.mod_carboncopy]"}.
[host_config.modules.mod_carboncopy]
[host_config.modules.mod_stream_management]"}.
{password_format, "password.format = \"scram\"
password.hash = [\"sha256\"]"}.
{scram_iterations, 64}.
Expand Down
Loading

0 comments on commit 647b6fb

Please sign in to comment.