Skip to content

Commit

Permalink
Merge pull request #4374 from esl/duplicated_mam_id
Browse files Browse the repository at this point in the history
Fix doubled messaged in MAM bug
  • Loading branch information
JanuszJakubiec authored Sep 17, 2024
2 parents 9c1cdf8 + 729c237 commit fc86296
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 7 deletions.
153 changes: 151 additions & 2 deletions big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,12 @@ is_skipped(_, _) ->
basic_groups() ->
[
{mam_all, [parallel],
[{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases()},
[{mam04, [parallel], mam_cases() ++ [retrieve_form_fields] ++ text_search_cases()
++ [{stream_management, [], stream_management_cases()}]},
{mam06, [parallel], mam_cases() ++ [retrieve_form_fields_extra_features]
++ stanzaid_cases() ++ retract_cases()
++ metadata_cases() ++ fetch_specific_msgs_cases()},
++ metadata_cases() ++ fetch_specific_msgs_cases()
++ [{stream_management, [], stream_management_cases()}]},
{nostore, [parallel], nostore_cases()},
{archived, [parallel], archived_cases()},
{configurable_archiveid, [], configurable_archiveid_cases()},
Expand Down Expand Up @@ -513,6 +515,11 @@ impl_specific() ->
pm_sql_query_failed,
async_pools_batch_flush].

stream_management_cases() ->
[reconnect_ack,
reconnect_no_ack,
reconnect_no_ack_different_resource].

suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().

Expand Down Expand Up @@ -582,6 +589,13 @@ init_per_group(with_rsm04, Config) ->
[{props, mam04_props()}, {with_rsm, true}|Config];
init_per_group(nostore, Config) ->
Config;
init_per_group(stream_management, Config) ->
Config1 = dynamic_modules:save_modules(host_type(), Config),
DefaultSMConfig = config_parser_helper:default_mod_config(mod_stream_management),
MnesiaOrCets = ct_helper:get_internal_database(),
SMConfig = DefaultSMConfig#{backend => MnesiaOrCets},
dynamic_modules:ensure_modules(host_type(), [{mod_stream_management, SMConfig}]),
Config1;
init_per_group(archived, Config) ->
Config;
init_per_group(muc04, Config) ->
Expand Down Expand Up @@ -683,6 +697,8 @@ end_per_group(G, Config) when G == drop_msg;
G == muc_drop_msg ->
teardown_meck(),
Config;
end_per_group(stream_management, Config) ->
dynamic_modules:restore_modules(Config);
end_per_group(muc_configurable_archiveid, Config) ->
dynamic_modules:restore_modules(Config),
Config;
Expand Down Expand Up @@ -3854,6 +3870,139 @@ check_user_exist(Config) ->
%% cleanup
ok = rpc(mim(), ejabberd_auth, remove_user, [JID]).

reconnect_no_ack(Config) ->
%% Connect Bob and Alice
Bob = sm_helper:connect_fresh(Config, bob, presence),
Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual),
AliceJid = escalus_client:full_jid(Alice),
BobJid = escalus_client:full_jid(Bob),
sm_helper:ack_initial_presence(Alice),

% 1. Bob sends a msg to Alice
Body = <<"OH, HAI! Msg 1">>,
escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)),
mam_helper:wait_for_archive_size(Alice, 1),

% 2. Alice receives, and does not acknowledge
% She may get the ack request before the message for some reason
Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end],
Resp),

% 3. Alice disconnects abruptly
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
sm_helper:wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

% 4. Alice reconnects
NewAlice = sm_helper:connect_same(Alice, session),

% We have to send presence by hand, because the message may be received first
sm_helper:send_initial_presence(NewAlice),
% Current behaviour - unacked stanza is rerouted when a quick reconnection occurs
% there is no delay element, or any indication of retransmission
NewResp = [_, _] = escalus_client:wait_for_stanzas(NewAlice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(Presence) -> escalus_pred:is_presence(Presence) end],
NewResp),

AliceUsername = escalus_client:username(NewAlice),
AliceServer = escalus_client:server(NewAlice),

% There is only one message in MAM, even though it was resent
?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)),

escalus_connection:stop(Bob),
escalus_connection:stop(Alice).

reconnect_ack(Config) ->
% Connect Bob and Alice
Bob = sm_helper:connect_fresh(Config, bob, presence),
Alice = sm_helper:connect_fresh(Config, alice, sr_presence, manual),
AliceJid = escalus_client:full_jid(Alice),
BobJid = escalus_client:full_jid(Bob),
sm_helper:ack_initial_presence(Alice),

% 1. Bob sends a msg to Alice
Body = <<"OH, HAI! Msg 1">>,
escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)),
mam_helper:wait_for_archive_size(Alice, 1),

% 2. Alice receives, and acknowledges
Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end],
Resp),
escalus_connection:send(Alice, escalus_stanza:sm_ack(2)),

% 3. Alice disconnects abruptly
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
sm_helper:wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

% 4. Alice reconnects
NewAlice = sm_helper:connect_same(Alice, presence),

% 5. Check no new messages received
timer:sleep(timer:seconds(1)),
escalus_assert:has_no_stanzas(NewAlice),

% No new messages in MAM as well
AliceUsername = escalus_client:username(NewAlice),
AliceServer = escalus_client:server(NewAlice),
?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)),

escalus_connection:stop(Bob),
escalus_connection:stop(Alice).

reconnect_no_ack_different_resource(Config) ->
%% Connect Bob and Alice
Bob = sm_helper:connect_fresh(Config, bob, presence),
Spec = escalus_fresh:create_fresh_user(Config, {alice, 2}),
Alice = sm_helper:connect_spec(Spec, sr_presence, manual),
AliceJid = escalus_client:full_jid(Alice),
BobJid = escalus_client:full_jid(Bob),
sm_helper:ack_initial_presence(Alice),

% 1. Bob sends a msg to Alice
Body = <<"OH, HAI! Msg 1">>,
escalus:send(Bob, escalus_stanza:chat_to(Alice, Body)),
mam_helper:wait_for_archive_size(Alice, 1),

% 2. Alice receives, and does not acknowledge
Resp = [_, _] = escalus_client:wait_for_stanzas(Alice, 2),
escalus:assert_many([fun(Msg) -> escalus_pred:is_chat_message_from_to(BobJid, AliceJid, Body, Msg) end,
fun(SMRequest) -> escalus_pred:is_sm_ack_request(SMRequest) end],
Resp),

% 3. Alice disconnects abruptly
C2SPid = mongoose_helper:get_session_pid(Alice),
escalus_connection:kill(Alice),
sm_helper:wait_until_resume_session(C2SPid),
sm_helper:assert_alive_resources(Alice, 1),

% 4. Alice reconnects a different resource
NewAlice = sm_helper:connect_spec([{resource, <<"mam_sm_test_2nd_resource">>} | Spec], presence, manual),

% 2nd resource doesn't get the stanza, only the delayed presence.
Presence = escalus:wait_for_stanza(NewAlice),
escalus:assert(is_presence, Presence),

% 5. Check no new messages received
timer:sleep(timer:seconds(1)),
escalus_assert:has_no_stanzas(NewAlice),

% No new messages in MAM as well
AliceUsername = escalus_client:username(NewAlice),
AliceServer = escalus_client:server(NewAlice),
?assertEqual(1, mam_helper:archive_size(AliceServer, AliceUsername)),

escalus_connection:stop(Bob),
escalus_connection:stop(Alice).

%% This function supports only one device, one user.
%% We don't send initial presence to avoid presence broadcasts between resources
%% of the same user from different stories.
Expand Down
20 changes: 18 additions & 2 deletions src/mam/mod_mam_pm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
filter_packet/3,
remove_user/3,
determine_amp_strategy/3,
sm_filter_offline_message/3]).
sm_filter_offline_message/3,
filter_unacknowledged_messages/3]).

%% ejabberd handlers
-export([process_mam_iq/5]).
Expand Down Expand Up @@ -273,6 +274,11 @@ get_personal_data(Acc, #{jid := ArcJID}, #{host_type := HostType}) ->
Entries = mongoose_hooks:get_mam_pm_gdpr_data(HostType, ArcJID),
{ok, [{mam_pm, Schema, Entries} | Acc]}.

-spec filter_unacknowledged_messages(Buffer :: [mongoose_acc:t()], Params :: map(), Extra :: map()) ->
{ok, [mongoose_acc:t()]}.
filter_unacknowledged_messages(Buffer, _, _) ->
{ok, [acc_with_perm_mam_id(Acc) || Acc <- Buffer]}.

%% ----------------------------------------------------------------------
%% Internal functions

Expand Down Expand Up @@ -531,6 +537,15 @@ return_acc_with_mam_id_if_configured(ExtMessId, HostType, Acc) ->
true -> mongoose_acc:set_permanent(mam, mam_id, ExtMessId, Acc)
end.

-spec acc_with_perm_mam_id(Acc :: mongoose_acc:t()) -> Acc :: mongoose_acc:t().
acc_with_perm_mam_id(Acc) ->
case mongoose_acc:get(mam, mam_id, undefined, Acc) of
undefined ->
Acc;
MamID ->
mongoose_acc:set_permanent(mam, mam_id, MamID, Acc)
end.

is_interesting(LocJID, RemJID) ->
HostType = jid_to_host_type(LocJID),
ArcID = archive_id_int(HostType, LocJID),
Expand Down Expand Up @@ -732,7 +747,8 @@ hooks(HostType) ->
{anonymous_purge, HostType, fun ?MODULE:remove_user/3, #{}, 50},
{amp_determine_strategy, HostType, fun ?MODULE:determine_amp_strategy/3, #{}, 20},
{sm_filter_offline_message, HostType, fun ?MODULE:sm_filter_offline_message/3, #{}, 50},
{get_personal_data, HostType, fun ?MODULE:get_personal_data/3, #{}, 50}
{get_personal_data, HostType, fun ?MODULE:get_personal_data/3, #{}, 50},
{filter_unacknowledged_messages, HostType, fun ?MODULE:filter_unacknowledged_messages/3, #{}, 50}
].

add_iq_handlers(HostType, Opts) ->
Expand Down
2 changes: 1 addition & 1 deletion src/mongoose_acc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
% Strip with or without stanza replacement
-export([strip/1, strip/2]).

-ignore_xref([delete/2, ref/1]).
-ignore_xref([delete/2, ref/1, strip/1]).

%% Note about 'undefined' to_jid and from_jid: these are the special cases when JID may be
%% truly unknown: before a client is authorized.
Expand Down
3 changes: 1 addition & 2 deletions src/stream_management/mod_stream_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,7 @@ maybe_notify_unacknowledged_msg(Acc, Jid) ->

-spec notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t().
notify_unacknowledged_msg(Acc, Jid) ->
NewAcc = mongoose_hooks:unacknowledged_message(Acc, Jid),
mongoose_acc:strip(NewAcc).
mongoose_hooks:unacknowledged_message(Acc, Jid).

-spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
mongoose_c2s_hooks:result().
Expand Down

0 comments on commit fc86296

Please sign in to comment.