Skip to content

Commit

Permalink
Fix doubled messaged in MAM bug
Browse files Browse the repository at this point in the history
The issue came from the fact that MAM looks for a `mam_id` field in the
accumulator when trying to save a message. This should prevent double messages
in scenarios such as retransmitting a message. However, for some reason, in
mod_stream_management, the accumulator was stripped of its fields. This didn't
make any sense, as the metadata could be useful (as is the case here).

I'm not certain if the retransmission itself is a correct behaviour, and if the
stanza shouldn't contain a "delay" element.
  • Loading branch information
gustawlippa committed Sep 11, 2024
1 parent aafbc75 commit 4348f52
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 4 deletions.
153 changes: 152 additions & 1 deletion big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,12 @@ basic_groups() ->
[
{mam_all, [parallel],
[{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases()},
{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases()
++ [{sm, [], sm_cases()}]},
{mam06, [parallel], mam_cases() ++ [retrieve_form_fields_extra_features]
++ stanzaid_cases() ++ retract_cases()
++ metadata_cases() ++ fetch_specific_msgs_cases()},
++ metadata_cases() ++ fetch_specific_msgs_cases()
++ [{sm, [], sm_cases()}]},
{nostore, [parallel], nostore_cases()},
{archived, [parallel], archived_cases()},
{configurable_archiveid, [], configurable_archiveid_cases()},
Expand Down Expand Up @@ -513,6 +516,11 @@ impl_specific() ->
pm_sql_query_failed,
async_pools_batch_flush].

sm_cases() ->
[reconnect_ack,
reconnect_no_ack,
reconnect_no_ack_different_resource].

suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().

Expand Down Expand Up @@ -582,6 +590,11 @@ init_per_group(with_rsm04, Config) ->
[{props, mam04_props()}, {with_rsm, true}|Config];
init_per_group(nostore, Config) ->
Config;
init_per_group(sm, Config) ->
Config1 = dynamic_modules:save_modules(host_type(), Config),
DefaultSMConfig = config_parser_helper:default_mod_config(mod_stream_management),
dynamic_modules:ensure_modules(host_type(), [{mod_stream_management, DefaultSMConfig}]),
Config1;
init_per_group(archived, Config) ->
Config;
init_per_group(muc04, Config) ->
Expand Down Expand Up @@ -683,6 +696,8 @@ end_per_group(G, Config) when G == drop_msg;
G == muc_drop_msg ->
teardown_meck(),
Config;
end_per_group(sm, Config) ->
dynamic_modules:restore_modules(Config);
end_per_group(muc_configurable_archiveid, Config) ->
dynamic_modules:restore_modules(Config),
Config;
Expand Down Expand Up @@ -3854,6 +3869,142 @@ check_user_exist(Config) ->
%% cleanup
ok = rpc(mim(), ejabberd_auth, remove_user, [JID]).

reconnect_no_ack(Config) ->
%% connect Bob and Alice
Bob = sm_helper:connect_fresh(Config, bob, presence),
Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual),
AliceJid = escalus_client:full_jid(Alice),
BobJid = escalus_client:full_jid(Bob),

sm_helper:ack_initial_presence(Alice),

% 1. Bob sends a msg to Alice
Body = <<"OH, HAI! Msg 1">>,
escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)),
mam_helper:wait_for_archive_size(Alice, 1),

% 2. Alice receives, and does not acknowledge
% she may get the ack request before the message for some reason
Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end],
Resp),

% 3. Alice disconnects abruptly
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
sm_helper:wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

% 4. Alice reconnects
NewAlice = sm_helper:connect_same(Alice, session),


% we have to send presence by hand, because the message may be received first
sm_helper:send_initial_presence(NewAlice),
% Current behaviour - unacked stanza is rerouted when a quick reconnection occurs
% there is no delay element, or any indication of retransmission
NewResp = [_, _] = escalus_client:wait_for_stanzas(NewAlice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(Presence) -> escalus_pred:is_presence(Presence) end],
NewResp),

AliceUsername = escalus_client:username(NewAlice),
AliceServer = escalus_client:server(NewAlice),

% There is only one message in MAM, even though it was sent twice
?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)),

escalus_connection:stop(Bob),
escalus_connection:stop(Alice).

reconnect_ack(Config) ->
% connect Bob and Alice
Bob = sm_helper:connect_fresh(Config, bob, presence),
Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual),
AliceJid = escalus_client:full_jid(Alice),
BobJid = escalus_client:full_jid(Bob),
sm_helper:ack_initial_presence(Alice),

% 1. Bob sends a msg to Alice
Body = <<"OH, HAI! Msg 1">>,
escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)),
mam_helper:wait_for_archive_size(Alice, 1),

% 2. Alice receives, and acknowledges
Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end],
Resp),
escalus_connection:send(Alice, escalus_stanza:sm_ack(2)),

% 3. Alice disconnects abruptly
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
sm_helper:wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

% 4. Alice reconnects
NewAlice = sm_helper:connect_same(Alice, presence),

% 5. check no new messages received
timer:sleep(timer:seconds(1)),
escalus_assert:has_no_stanzas(NewAlice),

% and no new messages in MAM
AliceUsername = escalus_client:username(NewAlice),
AliceServer = escalus_client:server(NewAlice),
?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)),

escalus_connection:stop(Bob),
escalus_connection:stop(Alice).

reconnect_no_ack_different_resource(Config) ->
%% connect Bob and Alice
Bob = sm_helper:connect_fresh(Config, bob, presence),
Spec = escalus_fresh:create_fresh_user(Config, {alice, 2}),
Alice = sm_helper:connect_spec(Spec, sr_presence, manual),
AliceJid = escalus_client:full_jid(Alice),
BobJid = escalus_client:full_jid(Bob),

sm_helper:ack_initial_presence(Alice),

% 1. Bob sends a msg to Alice
Body = <<"OH, HAI! Msg 1">>,
escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)),
mam_helper:wait_for_archive_size(Alice, 1),

% 2. Alice receives, and does not acknowledge
Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end],
Resp),

% 3. Alice disconnects abruptly
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
sm_helper:wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

% 4. Alice reconnects a different resource
NewAlice = sm_helper:connect_spec([{resource, <<"mam_sm_test_2nd_resource">>} | Spec], presence, manual),

% 2nd resource doesn't get the stanza, only the delayed presence.
Presence = escalus:wait_for_stanza(NewAlice),
escalus:assert(is_presence, Presence),

% 5. check no new messages received
timer:sleep(timer:seconds(1)),
escalus_assert:has_no_stanzas(NewAlice),

% and no new messages in MAM
AliceUsername = escalus_client:username(NewAlice),
AliceServer = escalus_client:server(NewAlice),
?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)),

escalus_connection:stop(Bob),
escalus_connection:stop(Alice).

%% This function supports only one device, one user.
%% We don't send initial presence to avoid presence broadcasts between resources
%% of the same user from different stories.
Expand Down
2 changes: 1 addition & 1 deletion src/mongoose_acc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
% Strip with or without stanza replacement
-export([strip/1, strip/2]).

-ignore_xref([delete/2, ref/1]).
-ignore_xref([delete/2, ref/1, strip/1]).

%% Note about 'undefined' to_jid and from_jid: these are the special cases when JID may be
%% truly unknown: before a client is authorized.
Expand Down
3 changes: 1 addition & 2 deletions src/stream_management/mod_stream_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,7 @@ maybe_notify_unacknowledged_msg(Acc, Jid) ->

-spec notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t().
notify_unacknowledged_msg(Acc, Jid) ->
NewAcc = mongoose_hooks:unacknowledged_message(Acc, Jid),
mongoose_acc:strip(NewAcc).
mongoose_hooks:unacknowledged_message(Acc, Jid).

Check warning on line 355 in src/stream_management/mod_stream_management.erl

View check run for this annotation

Codecov / codecov/patch

src/stream_management/mod_stream_management.erl#L355

Added line #L355 was not covered by tests

-spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
mongoose_c2s_hooks:result().
Expand Down

0 comments on commit 4348f52

Please sign in to comment.