Skip to content

Commit

Permalink
Merge pull request #4498 from esl/add-tests-for-duplicate-sm-buffer
Browse files Browse the repository at this point in the history
Add tests capturing the current state of the duplicate buffer issue
  • Loading branch information
DenysGonchar authored Mar 4, 2025
2 parents ecb02f4 + e5cf259 commit eca6541
Showing 1 changed file with 96 additions and 10 deletions.
106 changes: 96 additions & 10 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ groups() ->
{tcp_tests, [], tcp_tests()},
{parallel, [parallel], parallel_cases() ++ [aggressively_pipelined_resume]},
{parallel_ws, [parallel], parallel_cases() ++ [aggressively_pipelined_resume_ws]},
{parallel_large_buffer, [parallel], parallel_large_buffer_cases()},
{parallel_manual_ack_freq_1, [parallel], parallel_manual_ack_freq_1_cases()},
{manual_ack_freq_2, [], manual_ack_freq_2_cases()},
{stale_h, [], stale_h_cases()},
Expand All @@ -69,6 +70,7 @@ groups() ->

ws_tests() ->
[{group, parallel_ws},
{group, parallel_large_buffer},
{group, parallel_manual_ack_freq_1},
{group, manual_ack_freq_2},
{group, stale_h},
Expand All @@ -78,6 +80,7 @@ ws_tests() ->

tcp_tests() ->
[{group, parallel},
{group, parallel_large_buffer},
{group, parallel_manual_ack_freq_1},
{group, manual_ack_freq_2},
{group, stale_h},
Expand Down Expand Up @@ -116,6 +119,11 @@ parallel_cases() ->
subscription_requests_are_buffered_properly,
messages_are_properly_flushed_during_resumption].

parallel_large_buffer_cases() ->
[resend_unacked_from_stopped_sessions,
resend_unacked_from_terminated_sessions,
resend_unacked_from_replaced_sessions].

parallel_manual_ack_freq_1_cases() ->
[client_acks_more_than_sent,
too_many_unacked_stanzas,
Expand Down Expand Up @@ -237,22 +245,26 @@ required_modules(Scope, Name) ->
stopped -> stopped;
ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts)
end,
[{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)}] ++
required_mod_offline(Scope, Name) ++ required_mod_ping(Scope, Name).

required_mod_offline(group, parallel_large_buffer) ->
[{mod_offline_stub, #{}}];
required_mod_offline(_, _) ->
Backend = mongoose_helper:mnesia_or_rdbms_backend(),
BaseModules = [
{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)},
{mod_offline, config_parser_helper:mod_config(mod_offline, #{backend => Backend})}
],
case Name of
ping_timeout ->
BaseModules ++ [{mod_ping, config_parser_helper:mod_config(mod_ping, mod_ping_opts())}];
_ ->
BaseModules
end.
[{mod_offline, config_parser_helper:mod_config(mod_offline, #{backend => Backend})}].

required_mod_ping(testcase, ping_timeout) ->
[{mod_ping, config_parser_helper:mod_config(mod_ping, mod_ping_opts())}];
required_mod_ping(_, _) ->
[].

required_sm_opts(group, parallel) ->
#{ack_freq => never};
required_sm_opts(group, parallel_ws) ->
#{ack_freq => never};
required_sm_opts(group, parallel_large_buffer) ->
#{ack_freq => never, buffer_max => 1000};
required_sm_opts(group, parallel_manual_ack_freq_1) ->
#{ack_freq => 1,
resume_timeout => ?LONG_TIMEOUT};
Expand Down Expand Up @@ -544,6 +556,80 @@ resend_more_offline_messages_than_buffer_size(Config) ->
escalus_connection:stop(User),
escalus_connection:stop(Bob).

%% Test cases for duplicate buffer

resend_unacked_from_stopped_sessions(Config) ->
Texts = [<<"msg-1">>],
{Bob, UserSpecs, Users} = connect_initial_users(Texts, Config),

%% Bob sends messages to User's bare jid
UserJid = escalus_users:get_jid(Config, hd(UserSpecs)),
sm_helper:send_messages(Bob, UserJid, Texts),
C2SPids = lists:map(fun mongoose_helper:get_session_pid/1, Users),
[sm_helper:wait_for_c2s_unacked_count(Pid, length(Texts)) || Pid <- C2SPids],

%% Each User session checks messages and stops,
%% causing buffer rerouting and message duplication
Funs = [fun() -> User end || User <- Users],
reconnect_and_receive_messages(Funs, Texts). % Reconnection is skipped in this test case

resend_unacked_from_terminated_sessions(Config) ->
Texts = [<<"msg-1">>],
{Bob, UserSpecs, Users} = connect_initial_users(Texts, Config),

%% User disconnects all sessions abruptly
lists:foreach(fun escalus_connection:kill/1, Users),
C2SPids = lists:map(fun mongoose_helper:get_session_pid/1, Users),
lists:foreach(fun sm_helper:wait_until_resume_session/1, C2SPids),

%% Bob sends messages to User's bare jid
UserJid = escalus_users:get_jid(Config, hd(UserSpecs)),
sm_helper:send_messages(Bob, UserJid, Texts),
[sm_helper:wait_for_c2s_unacked_count(Pid, length(Texts)) || Pid <- C2SPids],

%% User replaces each terminated session with a new one,
%% causing buffer rerouting and message duplication
Funs = [fun() -> connect_spec(Spec, session) end || Spec <- UserSpecs],
reconnect_and_receive_messages(Funs, Texts).

resend_unacked_from_replaced_sessions(Config) ->
Texts = [<<"msg-1">>],
{Bob, UserSpecs, Users} = connect_initial_users(Texts, Config),

%% Bob sends messages to User's bare jid
UserJid = escalus_users:get_jid(Config, hd(UserSpecs)),
sm_helper:send_messages(Bob, UserJid, Texts),
C2SPids = lists:map(fun mongoose_helper:get_session_pid/1, Users),
[sm_helper:wait_for_c2s_unacked_count(Pid, length(Texts)) || Pid <- C2SPids],

%% User replaces each online session with a new one,
%% causing buffer rerouting and message duplication
Funs = [fun() -> connect_spec(Spec, session) end || Spec <- UserSpecs],
reconnect_and_receive_messages(Funs, Texts).

connect_initial_users(Texts, Config) ->
Resources = [<<"res-", (integer_to_binary(I))/binary>> || I <- lists:seq(1, 4)],
Bob = connect_fresh(Config, bob, session),
BasicUserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)),
UserSpecs = [[{resource, Res} | BasicUserSpec] || Res <- Resources],
Users = [connect_spec(Spec, sm_after_session) || Spec <- UserSpecs],
{Bob, UserSpecs, Users}.

%% Reconnect (optionally), receive messages and disconnect each resource cleanly, in sequence
%% There is an issue causing each subsequent session
%% to receive twice as many messages as the previous one, i.e. 1, 2, 4, 8, ...
%% See https://github.com/esl/MongooseIM/pull/4498 for a more detailed description with a diagram
reconnect_and_receive_messages([UserF | Rest], Texts) ->
NewUser = UserF(),
sm_helper:wait_for_messages(NewUser, Texts),
timer:sleep(100), % wait a short time to ensure no extra messages arrive
escalus_assert:has_no_stanzas(NewUser),
escalus_connection:stop(NewUser),
%% Expect duplicated buffer in the next session
reconnect_and_receive_messages(Rest, Texts ++ Texts);
reconnect_and_receive_messages([], _Texts) ->
ok.

resend_unacked_on_reconnection(Config) ->
Texts = three_texts(),
Bob = connect_fresh(Config, bob, presence),
Expand Down

0 comments on commit eca6541

Please sign in to comment.