diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index 576c068513e..6fa80169080 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -57,7 +57,10 @@ parallel_test_cases() -> resume_session_with_wrong_sid_returns_item_not_found, resume_session_with_wrong_namespace_is_a_noop, resume_dead_session_results_in_item_not_found, - aggressively_pipelined_resume + aggressively_pipelined_resume, + replies_are_processed_by_resumed_session, + subscription_requests_are_buffered_properly, + messages_are_properly_flushed_during_resumption ]. parallel_manual_ack_test_cases() -> @@ -83,6 +86,7 @@ init_per_suite(Config) -> NewConfig1 = escalus_ejabberd:setup_option(ack_freq(never), Config), NewConfig = escalus_ejabberd:setup_option(buffer_max(?SMALL_SM_BUFFER), NewConfig1), NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true), + mongoose_helper:inject_module(?MODULE), escalus:init_per_suite(NewConfigWithSM). end_per_suite(Config) -> @@ -111,15 +115,21 @@ end_per_group(parallel_manual_ack, Config) -> end_per_group(_GroupName, Config) -> Config. -init_per_testcase(server_requests_ack_freq_2, Config) -> +init_per_testcase(server_requests_ack_freq_2 = CN, Config) -> true = rpc(mim(), ?MOD_SM, set_ack_freq, [2]), - Config; + escalus:init_per_testcase(CN, Config); +init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> + register_handler(<<"localhost">>), + escalus:init_per_testcase(CN, Config); init_per_testcase(CaseName, Config) -> escalus:init_per_testcase(CaseName, Config). end_per_testcase(server_requests_ack_freq_2, Config) -> true = rpc(mim(), ?MOD_SM, set_ack_freq, [never]), - Config; + escalus:end_per_testcase(Config); +end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> + unregister_handler(<<"localhost">>), + escalus:end_per_testcase(CN, Config); end_per_testcase(CaseName, Config) -> escalus:end_per_testcase(CaseName, Config). @@ -757,6 +767,157 @@ aggressively_pipelined_resume(Config) -> escalus_connection:stop(Alice) end). +%% This is a regression test for a case when a session processes a request, which will +%% receive a response from the server, i.e. will have the same origin SID in mongoose_acc. +%% Without proper handling, the reply would be rejected because the resumed session +%% has new SID. +replies_are_processed_by_resumed_session(Config) -> + %% GIVEN a session and registered special IQ handler (added in init_per_testcase), + %% that waits for old session process to terminate (at this point new process + %% has fully taken over) and then actually sends the reply. + AliceSpec = escalus_fresh:create_fresh_user(Config, alice), + Steps = connection_steps_to_enable_stream_resumption(), + {ok, Alice = #client{props = Props}, _} = escalus_connection:start(AliceSpec, Steps), + SMID = proplists:get_value(smid, Props), + SMH = escalus_connection:get_sm_h(Alice), + + %% WHEN a client sends IQ request to the special handler... + IQReq = escalus_stanza:iq_get(regression_ns(), []), + escalus:send(Alice, IQReq), + + %% ... goes down and session is resumed. + escalus_client:kill_connection(Config, Alice), + Steps2 = connection_steps_to_stream_resumption(SMID, SMH), + {ok, Alice2, _} = escalus_connection:start(AliceSpec, Steps2), + + %% THEN the client receives the reply properly. + IQReply = escalus:wait_for_stanza(Alice2), + escalus:assert(is_iq_result, [IQReq], IQReply), + escalus_connection:stop(Alice2). + +%% This is a regression test for a bug, which manifested in following scenario +%% (due to improper presence sub requests buffering): +%% 1. A is online, B is offline +%% 2. A subscribes to B's presence; +%% 3. B becomes online +%% 4. A sends a message to B +%% 5. B doesn't SM-ack the request or message, terminates the connection +%% 6. B reconnects but with session replace, not resume +%% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer +%% 8. B doesn't receive the buffered message +subscription_requests_are_buffered_properly(Config) -> + AliceSpec = [{manual_ack, true} + | escalus_fresh:create_fresh_user(Config, alice)], + + MsgBody = <<"buffered">>, + SubPredFun = fun(S) -> + escalus_pred:is_presence_with_type(<<"subscribe">>, S) + end, + AvailablePredFun = fun(S) -> + escalus_pred:is_presence_with_type(<<"available">>, S) + end, + MsgPredFun = fun(S) -> + escalus_pred:is_chat_message(MsgBody, S) + end, + + escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> + % GIVEN Bob's pending subscription to Alice's presence + AliceUser = proplists:get_value(username, AliceSpec), + AliceServer = proplists:get_value(server, AliceSpec), + AliceJid = <>, + escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)), + _RosterPushReq = escalus:wait_for_stanza(Bob), + + % WHEN Alice becomes online... + Steps = connection_steps_to_enable_stream_resumption(), + {ok, Alice, _} = escalus_connection:start(AliceSpec, Steps), + InitialPresence = escalus_stanza:presence(<<"available">>), + escalus_connection:send(Alice, InitialPresence), + AvailableAndSubPresences = escalus:wait_for_stanzas(Alice, 2), + escalus:assert_many([SubPredFun, AvailablePredFun], AvailableAndSubPresences), + + % ...and Bob sends a message to Alice... + escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), + MsgStanza = escalus:wait_for_stanza(Alice), + escalus:assert(is_chat_message, [MsgBody], MsgStanza), + + % ...and Alice terminates connection without acking anything... + escalus_client:kill_connection(Config, Alice), + + % ...and reconnects with session replacement. + {ok, Alice2, _} = escalus_connection:start(AliceSpec), + + % THEN Alice receives (without sending initial presence): + % * buffered available presence (because it's addressed to full JID) + % * buffered Bob's message (like above) + % Alice DOESN'T receive: + % * buffered subscription request because it is dropped by ejabberd_sm + % because it's treated like repeated sub request to bare JID, so it's not + % processed by any sub req handler (like mod_roster) + SubReqAndInitialPresence = escalus:wait_for_stanzas(Alice2, 2), + escalus:assert_many([AvailablePredFun, MsgPredFun], SubReqAndInitialPresence), + + escalus_connection:stop(Alice2) + end). + +%% This is a regression test for a bug, due to which messages sent to old session +%% in a middle of state handover were not appended properly to SM buffer. +%% Scenario to reproduce: +%% 1. Online Bob and Alice +%% 2. Alice kills the connecion +%% 3. Alice's session is suspended +%% 4. Alice resumes session with new connection. At this moment new session is still not +%% present in session table. `resume` request is stuck in old proc mailbox. +%% 5. Bob sends a message to Alice. Only old proc is present in session table so now +%% old session has two messages in mailbox: `resume` and XML from Bob +%% 6. We resume old process and it begins session handover +%% 7. Bob's message is appended to SM buffer in "flush" step +%% 8. With bug fixed, the message is retransmitted properly +messages_are_properly_flushed_during_resumption(Config) -> + AliceSpec = escalus_fresh:create_fresh_user(Config, alice), + escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> + % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended + Steps = connection_steps_to_enable_stream_resumption(), + {ok, Alice = #client{props = Props}, _} = escalus_connection:start(AliceSpec, Steps), + SMID = proplists:get_value(smid, Props), + InitialPresence = escalus_stanza:presence(<<"available">>), + escalus_connection:send(Alice, InitialPresence), + Presence = escalus:wait_for_stanza(Alice), + escalus:assert(is_presence, Presence), + SMH = escalus_connection:get_sm_h(Alice), + escalus_client:kill_connection(Config, Alice), + {ok, C2SPid} = get_session_pid(AliceSpec, server_string("escalus-default-resource")), + ok = rpc(mim(), sys, suspend, [C2SPid]), + + % WHEN new session requests resumption + % we wait until that old session has resumption request enqueued; + % we need it to ensure the order of messages: resume first, Bob's chat second. + % Actual wait and message sent by Bob is done in separate process + % because new client start will block until old process is resumed + + MsgBody = <<"flush-regression">>, + spawn(fun() -> + wait_for_queue_length(C2SPid, 1), + + % Bob sends a message... + escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), + + % ...we ensure that a message is enqueued in Alice's session... + % (2 messages = resume request + Bob's message) + wait_for_queue_length(C2SPid, 2), + + % ...and old process is resumed. + ok = rpc(mim(), sys, resume, [C2SPid]) + end), + + Steps2 = connection_steps_to_stream_resumption(SMID, SMH), + {ok, Alice2, _} = escalus_connection:start(AliceSpec, Steps2), + + % THEN Alice's new session receives Bob's message + RecvMsg = escalus:wait_for_stanza(Alice2), + escalus:assert(is_chat_message, [MsgBody], RecvMsg) + end). + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- @@ -891,3 +1052,52 @@ given_fresh_user_with_spec(Spec) -> escalus:wait_for_stanza(User), JID = common_helper:get_bjid(Props), {User#client{jid = JID}, Spec}. + +wait_for_queue_length(Pid, Length) -> + mongoose_helper:wait_until( + fun() -> + rpc(mim(), erlang, process_info, [Pid, message_queue_len]) + end, {message_queue_len, Length}). + +%%-------------------------------------------------------------------- +%% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session" +%%-------------------------------------------------------------------- + +regression_ns() -> + <<"regression">>. + +register_handler(Host) -> + rpc(mim(), gen_iq_handler, add_iq_handler, + [ejabberd_sm, Host, regression_ns(), ?MODULE, regression_handler, one_queue]). + +unregister_handler(Host) -> + rpc(mim(), gen_iq_handler, remove_iq_handler, [ejabberd_sm, Host, regression_ns()]). + +regression_handler(_From, _To, Acc, IQ) -> + %% A bit of a hack - will no longer work when the SID format changes + {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc), + erlang:monitor(process, Pid), + receive + {'DOWN', _, _, _, _} -> + ok + after + 10000 -> + error({c2s_not_stopped_after_timeout, Pid}) + end, + + %% We avoid another race condition - there is a short window where user session + %% is not registered in ejabberd_sm: between old process termination and the moment + %% when the new process stores new session in memory. It should be fixed separately. + wait_for_session(jid:to_lower(mongoose_acc:get(c2s, origin_jid, undefined, Acc)), 50, 100), + + {Acc, jlib:make_result_iq_reply(IQ)}. + +wait_for_session({LU, LS, LR} = LJID, Retries, SleepTime) -> + case ejabberd_sm:get_session(LU, LS, LR) of + offline -> + timer:sleep(SleepTime), + wait_for_session(LJID, Retries - 1, SleepTime); + _ -> + ok + end. + diff --git a/include/ejabberd_c2s.hrl b/include/ejabberd_c2s.hrl index c5682fb4bc1..ade2f90c815 100644 --- a/include/ejabberd_c2s.hrl +++ b/include/ejabberd_c2s.hrl @@ -71,6 +71,7 @@ stream_mgmt_ack_freq = ?STREAM_MGMT_ACK_FREQ, stream_mgmt_resume_timeout = ?STREAM_MGMT_RESUME_TIMEOUT, stream_mgmt_resume_tref, + stream_mgmt_resumed_from, stream_mgmt_constraint_check_tref, csi_state = active :: mod_csi:state(), csi_buffer = [], diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index d042afef463..302bc484a7b 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -1190,7 +1190,8 @@ process_incoming_stanza_with_conflict_check(From, To, Acc, StateName, StateData) %% "Incoming" means that stanza is coming from ejabberd_router. -spec check_incoming_accum_for_conflicts(mongoose_acc:t(), state()) -> unknown_origin | different_origin | same_device | conflict. -check_incoming_accum_for_conflicts(Acc, #state{sid = SID, jid = JID}) -> +check_incoming_accum_for_conflicts(Acc, #state{sid = SID, jid = JID, + stream_mgmt_resumed_from = OldSID}) -> OriginSID = mongoose_acc:get(c2s, origin_sid, undefined, Acc), OriginJID = mongoose_acc:get(c2s, origin_jid, undefined, Acc), AreDefined = OriginJID =/= undefined andalso OriginSID =/= undefined, @@ -1200,7 +1201,10 @@ check_incoming_accum_for_conflicts(Acc, #state{sid = SID, jid = JID}) -> true -> SameJID = jid:are_equal(OriginJID, JID), SameSID = OriginSID =:= SID, - case {SameJID, SameSID} of + % It's possible to receive a response addressed to a process + % which we resumed from - still valid! + SameOldSession = OriginSID =:= OldSID, + case {SameJID, SameSID or SameOldSession} of {false, _} -> different_origin; {_, true} -> @@ -2331,9 +2335,13 @@ resend_subscription_requests(Acc, #state{pending_invitations = Pending} = StateD {NewAcc, NewState} = lists:foldl( fun(XMLPacket, {A, #state{} = State}) -> A1 = send_element(A, XMLPacket, State), + % We retrieve From i To from a stanza, because Acc has + % from_jid and to_jid that apply to 'available' stanza sent + % by the client {value, From} = xml:get_tag_attr(<<"from">>, XMLPacket), {value, To} = xml:get_tag_attr(<<"to">>, XMLPacket), - BufferedStateData = buffer_out_stanza({From, To, XMLPacket}, State), + PacketTuple = {jid:from_binary(From), jid:from_binary(To), XMLPacket}, + BufferedStateData = buffer_out_stanza(PacketTuple, State), % this one will be next to tackle A2 = maybe_send_ack_request(A1, BufferedStateData), {A2, BufferedStateData} @@ -2492,13 +2500,19 @@ bounce_messages() -> end. %% Return the messages in reverse order than they were received in! +-spec flush_messages() -> {N :: non_neg_integer(), Msgs :: [packet()]}. flush_messages() -> flush_messages(0, []). +-spec flush_messages(N :: non_neg_integer(), Msgs :: [packet()]) -> + {N :: non_neg_integer(), Msgs :: [packet()]}. flush_messages(N, Acc) -> receive - {route, _, _, _} = Msg -> - flush_messages(N+1, [Msg | Acc]) + {route, From, To, #xmlel{} = Packet} -> + flush_messages(N+1, [{From, To, Packet} | Acc]); + {route, From, To, MongooseAcc} -> + % TODO: SM buffer should hold acc, not xmlels + flush_messages(N+1, [{From, To, mongoose_acc:element(MongooseAcc)} | Acc]) after 0 -> {N, Acc} end. @@ -2851,6 +2865,7 @@ stream_mgmt_ack(NIncoming) -> attrs = [{<<"xmlns">>, ?NS_STREAM_MGNT_3}, {<<"h">>, integer_to_binary(NIncoming)}]}. +-spec buffer_out_stanza(packet(), state()) -> state(). buffer_out_stanza(_Packet, #state{stream_mgmt = false} = S) -> S; buffer_out_stanza(_Packet, #state{stream_mgmt_buffer_max = no_buffer} = S) -> @@ -3001,7 +3016,9 @@ do_resume_session(SMID, El, [{_, Pid}], #state{ server = Server } = StateData) - NSD2 = flush_csi_buffer(NSD), - fsm_next_state(session_established, NSD2) + NSD3 = NSD2#state{ stream_mgmt_resumed_from = OldState#state.sid }, + + fsm_next_state(session_established, NSD3) catch %% errors from send_element _:_ -> diff --git a/src/jlib.erl b/src/jlib.erl index 1484782172b..5c79b778be0 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -109,10 +109,13 @@ 'false' | non_neg_integer()}. --spec make_result_iq_reply(exml:element()) -> exml:element(). +-spec make_result_iq_reply(exml:element()) -> exml:element(); + (iq()) -> iq(). make_result_iq_reply(XE = #xmlel{attrs = Attrs}) -> NewAttrs = make_result_iq_reply_attrs(Attrs), - XE#xmlel{attrs = NewAttrs}. + XE#xmlel{attrs = NewAttrs}; +make_result_iq_reply(IQ = #iq{}) -> + IQ#iq{ type = result }. -spec make_result_iq_reply_attrs([binary_pair()]) -> [binary_pair(), ...].