diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index e04376d113d..a24cd7d6254 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -63,7 +63,8 @@ groups() -> {parallel_manual_ack_freq_1, [parallel], parallel_manual_ack_freq_1_cases()}, {manual_ack_freq_2, [], manual_ack_freq_2_cases()}, {stale_h, [], stale_h_cases()}, - {parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()} + {parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()}, + {resume_timeout, [parallel], resume_timeout_cases()} ]. ws_tests() -> @@ -72,6 +73,7 @@ ws_tests() -> {group, manual_ack_freq_2}, {group, stale_h}, {group, parallel_unacknowledged_message_hook}, + {group, resume_timeout}, ping_timeout]. tcp_tests() -> @@ -80,6 +82,7 @@ tcp_tests() -> {group, manual_ack_freq_2}, {group, stale_h}, {group, parallel_unacknowledged_message_hook}, + {group, resume_timeout}, ping_timeout]. parallel_cases() -> @@ -116,7 +119,6 @@ parallel_cases() -> parallel_manual_ack_freq_1_cases() -> [client_acks_more_than_sent, too_many_unacked_stanzas, - resend_unacked_after_resume_timeout, resume_session_state_send_message_with_ack, resume_session_state_send_message_without_ack, resume_session_state_stop_c2s, @@ -127,6 +129,10 @@ parallel_manual_ack_freq_1_cases() -> manual_ack_freq_2_cases() -> [server_requests_ack_freq_2]. +resume_timeout_cases() -> + [resend_unacked_after_resume_timeout, + resend_unacked_to_different_res_after_resume_timeout]. + stale_h_cases() -> [resume_expired_session_returns_correct_h, gc_repeat_after_never_means_no_cleaning, @@ -258,6 +264,8 @@ required_sm_opts(group, parallel_unacknowledged_message_hook) -> #{ack_freq => 1}; required_sm_opts(group, manual_ack_freq_long_session_timeout) -> #{ack_freq => 1, buffer_max => 1000}; +required_sm_opts(group, resume_timeout) -> + #{ack_freq => 1, resume_timeout => ?SHORT_TIMEOUT}; required_sm_opts(testcase, resume_expired_session_returns_correct_h) -> #{ack_freq => 1, resume_timeout => ?SHORT_TIMEOUT, @@ -545,16 +553,20 @@ resend_unacked_on_reconnection(Config) -> sm_helper:send_messages(Bob, User, Texts), %% User receives the messages. sm_helper:wait_for_messages(User, Texts), - %% User disconnects without acking the messages. + %% User disconnects ending stream gracefully, but without acking the messages. sm_helper:stop_client_and_wait_for_termination(User), %% Messages go to the offline store. %% User receives the messages from the offline store. NewUser = connect_spec(UserSpec, session, manual), send_initial_presence(NewUser), - sm_helper:wait_for_messages(NewUser, Texts), + sm_helper:wait_for_delayed_messages(NewUser, Texts), %% User acks the delayed messages so they won't go again %% to the offline store. - escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)). + escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)), + % user receives initial presence response + P = escalus:wait_for_stanza(NewUser), + escalus:assert(is_presence, P), + escalus_connection:stop(NewUser). %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order %% TODO Test without wait_for_n_offline_messages. It would require changes in SM @@ -630,7 +642,7 @@ resend_unacked_after_resume_timeout(Config) -> User = connect_fresh(Config, ?config(user, Config), sr_presence), UserSpec = sm_helper:client_to_spec(User), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to(User, <<"msg-1">>)), %% kill user connection escalus_connection:kill(User), @@ -638,12 +650,54 @@ resend_unacked_after_resume_timeout(Config) -> C2SPid = mongoose_helper:get_session_pid(User), sm_helper:wait_until_resume_session(C2SPid), - %% user come back and receives unacked message + %% user comes back NewUser = connect_spec(UserSpec, session), send_initial_presence(NewUser), + %% resume timeout passes + timer:sleep(timer:seconds(?SHORT_TIMEOUT + 1)), + + %% user receives unacked message and initial presence + UnackedStanzas = escalus:wait_for_stanzas(NewUser, 2), escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], - escalus:wait_for_stanzas(NewUser, 2)), + UnackedStanzas), + [UnackedMsg] = lists:filter(fun escalus_pred:is_message/1, UnackedStanzas), + sm_helper:assert_delayed(UnackedMsg), + escalus_assert:has_no_stanzas(NewUser), + + escalus_connection:stop(Bob), + escalus_connection:stop(NewUser). + + +resend_unacked_to_different_res_after_resume_timeout(Config) -> + %% connect bob and user + Bob = connect_fresh(Config, bob, presence), + User = connect_fresh(Config, ?config(user, Config), sr_presence), + UserSpec = sm_helper:client_to_spec(User), + + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)), + %% kill user connection + escalus_connection:kill(User), + + %% ensure there is no session + C2SPid = mongoose_helper:get_session_pid(User), + sm_helper:wait_until_resume_session(C2SPid), + + %% user comes back with different resource + NewUser = connect_spec([{resource, <<"2nd_resource">>} | UserSpec], session), + send_initial_presence(NewUser), + + %% resume timeout passes + timer:sleep(timer:seconds(?SHORT_TIMEOUT + 1)), + + %% user receives unacked message and presence, as well as initial presence response + %% the order of the messages may change, especially on CI, so we test all of them + UnackedStanzas = escalus:wait_for_stanzas(NewUser, 3), + escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>), is_presence], UnackedStanzas), + [UnackedMsg] = lists:filter(fun escalus_pred:is_message/1, UnackedStanzas), + sm_helper:assert_delayed(UnackedMsg), + + escalus_assert:has_no_stanzas(NewUser), escalus_connection:stop(Bob), escalus_connection:stop(NewUser). diff --git a/big_tests/tests/sm_helper.erl b/big_tests/tests/sm_helper.erl index 29edd9bd350..56ef5b2107e 100644 --- a/big_tests/tests/sm_helper.erl +++ b/big_tests/tests/sm_helper.erl @@ -35,6 +35,8 @@ -export([send_initial_presence/1, send_messages/3, wait_for_messages/2, + wait_for_delayed_messages/2, + assert_delayed/1, assert_messages/2, send_and_receive/3, get_ack/1, @@ -293,6 +295,14 @@ wait_for_messages(Alice, Texts) -> Stanzas = escalus:wait_for_stanzas(Alice, length(Texts)), assert_messages(Stanzas, Texts). +wait_for_delayed_messages(Alice, Texts) -> + Stanzas = escalus:wait_for_stanzas(Alice, length(Texts)), + assert_messages(Stanzas, Texts), + [assert_delayed(S) || S <- Stanzas]. + +assert_delayed(Stanza) -> + escalus:assert(has_ns, [<<"urn:xmpp:delay">>], exml_query:subelement(Stanza, <<"delay">>)). + assert_messages(Stanzas, Texts) -> Bodies = lists:map(fun get_body/1, Stanzas), case Bodies of diff --git a/src/mod_presence.erl b/src/mod_presence.erl index 9da95035247..3e3cc1ee696 100644 --- a/src/mod_presence.erl +++ b/src/mod_presence.erl @@ -312,7 +312,7 @@ route_probe(Acc, Presences, FromJid, ToJid) -> Packet0 = Presences#presences_state.pres_last, TS = Presences#presences_state.pres_timestamp, %% To is the one sending the presence (the target of the probe) - Packet1 = jlib:maybe_append_delay(Packet0, ToJid, TS, <<>>), + Packet1 = jlib:maybe_append_delay(Packet0, ToJid, TS, <<"Delayed presence">>), HostType = mongoose_acc:host_type(Acc), Acc2 = mongoose_hooks:presence_probe(HostType, Acc, FromJid, ToJid, self()), %% Don't route a presence probe to oneself diff --git a/src/stream_management/mod_stream_management.erl b/src/stream_management/mod_stream_management.erl index d7ac5c5a1bd..c92e19f1ef6 100644 --- a/src/stream_management/mod_stream_management.erl +++ b/src/stream_management/mod_stream_management.erl @@ -386,7 +386,9 @@ maybe_handle_stream_mgmt_reroute(Acc, _StateData, _HostType, _Reason, {error, no handle_user_terminate(#sm_state{counter_in = H} = SmState, StateData, HostType) -> Sid = mongoose_c2s:get_sid(StateData), do_remove_smid(HostType, Sid, H), - reroute_buffer(StateData, SmState), + FromServer = mongoose_c2s:get_lserver(StateData), + NewState = add_delay_elements_to_buffer(SmState, FromServer), + reroute_buffer(StateData, NewState), SmState#sm_state{buffer = [], buffer_size = 0}. reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _}}}) -> @@ -394,6 +396,10 @@ reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _ reroute_buffer(StateData, #sm_state{buffer = Buffer}) -> mongoose_c2s:reroute_buffer(StateData, Buffer). +add_delay_elements_to_buffer(#sm_state{buffer = Buffer} = SmState, FromServer) -> + BufferWithDelays = [maybe_add_timestamp(Acc, FromServer) || Acc <- Buffer], + SmState#sm_state{buffer = BufferWithDelays}. + -spec terminate(term(), c2s_state(), mongoose_c2s:data()) -> term(). terminate(Reason, C2SState, StateData) -> ?LOG_DEBUG(#{what => stream_mgmt_statem_terminate, reason => Reason, @@ -615,14 +621,20 @@ get_all_stanzas_to_forward(StateData, SMID) -> LServer = mongoose_c2s:get_lserver(StateData), FromServer = jid:make_noprep(<<>>, LServer, <<>>), ToForward = [ begin - TS = mongoose_acc:timestamp(Acc), - Packet = mongoose_acc:element(Acc), - StanzaName = mongoose_acc:stanza_name(Acc), - StanzaType = mongoose_acc:stanza_type(Acc), - maybe_add_timestamp(Packet, StanzaName, StanzaType, TS, FromServer) + AccWithTS = maybe_add_timestamp(Acc, FromServer), + mongoose_acc:element(AccWithTS) end || Acc <- lists:reverse(Buffer)], {Resumed, ToForward}. +maybe_add_timestamp(Acc, FromServer) -> + TS = mongoose_acc:timestamp(Acc), + StanzaName = mongoose_acc:stanza_name(Acc), + StanzaType = mongoose_acc:stanza_type(Acc), + {From, To, El} = mongoose_acc:packet(Acc), + ElWithDelay = maybe_add_timestamp(El, StanzaName, StanzaType, TS, FromServer), + AccParams = #{from_jid => From, to_jid => To, element => ElWithDelay}, + mongoose_acc:update_stanza(AccParams, Acc). + maybe_add_timestamp(Packet, <<"message">>, <<"error">>, _, _) -> Packet; maybe_add_timestamp(Packet, <<"message">>, <<"headline">>, _, _) ->