From 729c23768109ee6d1ea28e4335c00db449f9b4ad Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Wed, 11 Sep 2024 16:14:13 +0200 Subject: [PATCH] Fix doubled messaged in MAM bug The doubled message in MAM issue comes from the fact that MAM looks for a `mam_id` field in the accumulator when trying to save a message, and in the re-routing case, it is being stripped just a moment before. This happens, because re-routing uses the normal routing procedure, which strips the non-persistent accumulator fields, including the `mam_id`, in `mongoose_local_delivery`. Stripping is done, because in the usual routing case the perspective of the message processing changes at that point - sender processing is finished, and receiver processing starts. When retransmitting a message, we would like to process it from the receiver perspective once more, but with accumulator fields saved from the last time the it was processed. I decided to use the filter_unacknowledged_messages` hook in `mod_mam_pm`, which is called before retransmitting messages to save the `mam_id` as a permanent field in the accumulator. The "filter" name is analogous to the `filter_local_packet hook`, which is also used for processing in the broader sense than only filtering. --- big_tests/tests/mam_SUITE.erl | 153 +++++++++++++++++- src/mam/mod_mam_pm.erl | 20 ++- src/mongoose_acc.erl | 2 +- .../mod_stream_management.erl | 3 +- 4 files changed, 171 insertions(+), 7 deletions(-) diff --git a/big_tests/tests/mam_SUITE.erl b/big_tests/tests/mam_SUITE.erl index 78e073d83e7..c05d3c27f9a 100644 --- a/big_tests/tests/mam_SUITE.erl +++ b/big_tests/tests/mam_SUITE.erl @@ -239,10 +239,12 @@ is_skipped(_, _) -> basic_groups() -> [ {mam_all, [parallel], - [{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases()}, + [{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases() + ++ [{stream_management, [], stream_management_cases()}]}, {mam06, [parallel], mam_cases() ++ [retrieve_form_fields_extra_features] ++ stanzaid_cases() ++ retract_cases() - ++ metadata_cases() ++ fetch_specific_msgs_cases()}, + ++ metadata_cases() ++ fetch_specific_msgs_cases() + ++ [{stream_management, [], stream_management_cases()}]}, {nostore, [parallel], nostore_cases()}, {archived, [parallel], archived_cases()}, {configurable_archiveid, [], configurable_archiveid_cases()}, @@ -513,6 +515,11 @@ impl_specific() -> pm_sql_query_failed, async_pools_batch_flush]. +stream_management_cases() -> + [reconnect_ack, + reconnect_no_ack, + reconnect_no_ack_different_resource]. + suite() -> require_rpc_nodes([mim]) ++ escalus:suite(). @@ -582,6 +589,13 @@ init_per_group(with_rsm04, Config) -> [{props, mam04_props()}, {with_rsm, true}|Config]; init_per_group(nostore, Config) -> Config; +init_per_group(stream_management, Config) -> + Config1 = dynamic_modules:save_modules(host_type(), Config), + DefaultSMConfig = config_parser_helper:default_mod_config(mod_stream_management), + MnesiaOrCets = ct_helper:get_internal_database(), + SMConfig = DefaultSMConfig#{backend => MnesiaOrCets}, + dynamic_modules:ensure_modules(host_type(), [{mod_stream_management, SMConfig}]), + Config1; init_per_group(archived, Config) -> Config; init_per_group(muc04, Config) -> @@ -683,6 +697,8 @@ end_per_group(G, Config) when G == drop_msg; G == muc_drop_msg -> teardown_meck(), Config; +end_per_group(stream_management, Config) -> + dynamic_modules:restore_modules(Config); end_per_group(muc_configurable_archiveid, Config) -> dynamic_modules:restore_modules(Config), Config; @@ -3854,6 +3870,139 @@ check_user_exist(Config) -> %% cleanup ok = rpc(mim(), ejabberd_auth, remove_user, [JID]). +reconnect_no_ack(Config) -> + %% Connect Bob and Alice + Bob = sm_helper:connect_fresh(Config, bob, presence), + Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual), + AliceJid = escalus_client:full_jid(Alice), + BobJid = escalus_client:full_jid(Bob), + sm_helper:ack_initial_presence(Alice), + + % 1. Bob sends a msg to Alice + Body = <<"OH, HAI! Msg 1">>, + escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)), + mam_helper:wait_for_archive_size(Alice, 1), + + % 2. Alice receives, and does not acknowledge + % She may get the ack request before the message for some reason + Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end], + Resp), + + % 3. Alice disconnects abruptly + C2SPid = mongoose_helper:get_session_pid(Alice), + escalus_connection:kill(Alice), + sm_helper:wait_until_resume_session(C2SPid), + sm_helper:assert_alive_resources(Alice, 1), + + % 4. Alice reconnects + NewAlice = sm_helper:connect_same(Alice, session), + + % We have to send presence by hand, because the message may be received first + sm_helper:send_initial_presence(NewAlice), + % Current behaviour - unacked stanza is rerouted when a quick reconnection occurs + % there is no delay element, or any indication of retransmission + NewResp = [_, _] = escalus_client:wait_for_stanzas(NewAlice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(Presence) -> escalus_pred:is_presence(Presence) end], + NewResp), + + AliceUsername = escalus_client:username(NewAlice), + AliceServer = escalus_client:server(NewAlice), + + % There is only one message in MAM, even though it was resent + ?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)), + + escalus_connection:stop(Bob), + escalus_connection:stop(Alice). + +reconnect_ack(Config) -> + % Connect Bob and Alice + Bob = sm_helper:connect_fresh(Config, bob, presence), + Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual), + AliceJid = escalus_client:full_jid(Alice), + BobJid = escalus_client:full_jid(Bob), + sm_helper:ack_initial_presence(Alice), + + % 1. Bob sends a msg to Alice + Body = <<"OH, HAI! Msg 1">>, + escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)), + mam_helper:wait_for_archive_size(Alice, 1), + + % 2. Alice receives, and acknowledges + Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end], + Resp), + escalus_connection:send(Alice, escalus_stanza:sm_ack(2)), + + % 3. Alice disconnects abruptly + C2SPid = mongoose_helper:get_session_pid(Alice), + escalus_connection:kill(Alice), + sm_helper:wait_until_resume_session(C2SPid), + sm_helper:assert_alive_resources(Alice, 1), + + % 4. Alice reconnects + NewAlice = sm_helper:connect_same(Alice, presence), + + % 5. Check no new messages received + timer:sleep(timer:seconds(1)), + escalus_assert:has_no_stanzas(NewAlice), + + % No new messages in MAM as well + AliceUsername = escalus_client:username(NewAlice), + AliceServer = escalus_client:server(NewAlice), + ?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)), + + escalus_connection:stop(Bob), + escalus_connection:stop(Alice). + +reconnect_no_ack_different_resource(Config) -> + %% Connect Bob and Alice + Bob = sm_helper:connect_fresh(Config, bob, presence), + Spec = escalus_fresh:create_fresh_user(Config, {alice, 2}), + Alice = sm_helper:connect_spec(Spec, sr_presence, manual), + AliceJid = escalus_client:full_jid(Alice), + BobJid = escalus_client:full_jid(Bob), + sm_helper:ack_initial_presence(Alice), + + % 1. Bob sends a msg to Alice + Body = <<"OH, HAI! Msg 1">>, + escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)), + mam_helper:wait_for_archive_size(Alice, 1), + + % 2. Alice receives, and does not acknowledge + Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2), + escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end, + fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end], + Resp), + + % 3. Alice disconnects abruptly + C2SPid = mongoose_helper:get_session_pid(Alice), + escalus_connection:kill(Alice), + sm_helper:wait_until_resume_session(C2SPid), + sm_helper:assert_alive_resources(Alice, 1), + + % 4. Alice reconnects a different resource + NewAlice = sm_helper:connect_spec([{resource, <<"mam_sm_test_2nd_resource">>} | Spec], presence, manual), + + % 2nd resource doesn't get the stanza, only the delayed presence. + Presence = escalus:wait_for_stanza(NewAlice), + escalus:assert(is_presence, Presence), + + % 5. Check no new messages received + timer:sleep(timer:seconds(1)), + escalus_assert:has_no_stanzas(NewAlice), + + % No new messages in MAM as well + AliceUsername = escalus_client:username(NewAlice), + AliceServer = escalus_client:server(NewAlice), + ?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)), + + escalus_connection:stop(Bob), + escalus_connection:stop(Alice). + %% This function supports only one device, one user. %% We don't send initial presence to avoid presence broadcasts between resources %% of the same user from different stories. diff --git a/src/mam/mod_mam_pm.erl b/src/mam/mod_mam_pm.erl index 3eab0fc8c6d..1e8a664aa20 100644 --- a/src/mam/mod_mam_pm.erl +++ b/src/mam/mod_mam_pm.erl @@ -49,7 +49,8 @@ filter_packet/3, remove_user/3, determine_amp_strategy/3, - sm_filter_offline_message/3]). + sm_filter_offline_message/3, + filter_unacknowledged_messages/3]). %% ejabberd handlers -export([process_mam_iq/5]). @@ -273,6 +274,11 @@ get_personal_data(Acc, #{jid := ArcJID}, #{host_type := HostType}) -> Entries = mongoose_hooks:get_mam_pm_gdpr_data(HostType, ArcJID), {ok, [{mam_pm, Schema, Entries} | Acc]}. +-spec filter_unacknowledged_messages(Buffer :: [mongoose_acc:t()], Params :: map(), Extra :: map()) -> + {ok, [mongoose_acc:t()]}. +filter_unacknowledged_messages(Buffer, _, _) -> + {ok, [acc_with_perm_mam_id(Acc) || Acc <- Buffer]}. + %% ---------------------------------------------------------------------- %% Internal functions @@ -531,6 +537,15 @@ return_acc_with_mam_id_if_configured(ExtMessId, HostType, Acc) -> true -> mongoose_acc:set_permanent(mam, mam_id, ExtMessId, Acc) end. +-spec acc_with_perm_mam_id(Acc :: mongoose_acc:t()) -> Acc :: mongoose_acc:t(). +acc_with_perm_mam_id(Acc) -> + case mongoose_acc:get(mam, mam_id, undefined, Acc) of + undefined -> + Acc; + MamID -> + mongoose_acc:set_permanent(mam, mam_id, MamID, Acc) + end. + is_interesting(LocJID, RemJID) -> HostType = jid_to_host_type(LocJID), ArcID = archive_id_int(HostType, LocJID), @@ -732,7 +747,8 @@ hooks(HostType) -> {anonymous_purge, HostType, fun ?MODULE:remove_user/3, #{}, 50}, {amp_determine_strategy, HostType, fun ?MODULE:determine_amp_strategy/3, #{}, 20}, {sm_filter_offline_message, HostType, fun ?MODULE:sm_filter_offline_message/3, #{}, 50}, - {get_personal_data, HostType, fun ?MODULE:get_personal_data/3, #{}, 50} + {get_personal_data, HostType, fun ?MODULE:get_personal_data/3, #{}, 50}, + {filter_unacknowledged_messages, HostType, fun ?MODULE:filter_unacknowledged_messages/3, #{}, 50} ]. add_iq_handlers(HostType, Opts) -> diff --git a/src/mongoose_acc.erl b/src/mongoose_acc.erl index d1f94816399..141886c0d95 100644 --- a/src/mongoose_acc.erl +++ b/src/mongoose_acc.erl @@ -56,7 +56,7 @@ % Strip with or without stanza replacement -export([strip/1, strip/2]). --ignore_xref([delete/2, ref/1]). +-ignore_xref([delete/2, ref/1, strip/1]). %% Note about 'undefined' to_jid and from_jid: these are the special cases when JID may be %% truly unknown: before a client is authorized. diff --git a/src/stream_management/mod_stream_management.erl b/src/stream_management/mod_stream_management.erl index 28ccba4cf58..d7ac5c5a1bd 100644 --- a/src/stream_management/mod_stream_management.erl +++ b/src/stream_management/mod_stream_management.erl @@ -352,8 +352,7 @@ maybe_notify_unacknowledged_msg(Acc, Jid) -> -spec notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t(). notify_unacknowledged_msg(Acc, Jid) -> - NewAcc = mongoose_hooks:unacknowledged_message(Acc, Jid), - mongoose_acc:strip(NewAcc). + mongoose_hooks:unacknowledged_message(Acc, Jid). -spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> mongoose_c2s_hooks:result().