Skip to content

Commit

Permalink
Merge 5aca772 into bc5e861
Browse files Browse the repository at this point in the history
  • Loading branch information
fenek authored Mar 19, 2019
2 parents bc5e861 + 5aca772 commit 9c05df1
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 12 deletions.
218 changes: 214 additions & 4 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ parallel_test_cases() ->
resume_session_with_wrong_sid_returns_item_not_found,
resume_session_with_wrong_namespace_is_a_noop,
resume_dead_session_results_in_item_not_found,
aggressively_pipelined_resume
aggressively_pipelined_resume,
replies_are_processed_by_resumed_session,
subscription_requests_are_buffered_properly,
messages_are_properly_flushed_during_resumption
].

parallel_manual_ack_test_cases() ->
Expand All @@ -83,6 +86,7 @@ init_per_suite(Config) ->
NewConfig1 = escalus_ejabberd:setup_option(ack_freq(never), Config),
NewConfig = escalus_ejabberd:setup_option(buffer_max(?SMALL_SM_BUFFER), NewConfig1),
NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true),
mongoose_helper:inject_module(?MODULE),
escalus:init_per_suite(NewConfigWithSM).

end_per_suite(Config) ->
Expand Down Expand Up @@ -111,15 +115,21 @@ end_per_group(parallel_manual_ack, Config) ->
end_per_group(_GroupName, Config) ->
Config.

init_per_testcase(server_requests_ack_freq_2, Config) ->
init_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
true = rpc(mim(), ?MOD_SM, set_ack_freq, [2]),
Config;
escalus:init_per_testcase(CN, Config);
init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
register_handler(<<"localhost">>),
escalus:init_per_testcase(CN, Config);
init_per_testcase(CaseName, Config) ->
escalus:init_per_testcase(CaseName, Config).

end_per_testcase(server_requests_ack_freq_2, Config) ->
true = rpc(mim(), ?MOD_SM, set_ack_freq, [never]),
Config;
escalus:end_per_testcase(Config);
end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
unregister_handler(<<"localhost">>),
escalus:end_per_testcase(CN, Config);
end_per_testcase(CaseName, Config) ->
escalus:end_per_testcase(CaseName, Config).

Expand Down Expand Up @@ -757,6 +767,157 @@ aggressively_pipelined_resume(Config) ->
escalus_connection:stop(Alice)
end).

%% This is a regression test for a case when a session processes a request, which will
%% receive a response from the server, i.e. will have the same origin SID in mongoose_acc.
%% Without proper handling, the reply would be rejected because the resumed session
%% has new SID.
replies_are_processed_by_resumed_session(Config) ->
%% GIVEN a session and registered special IQ handler (added in init_per_testcase),
%% that waits for old session process to terminate (at this point new process
%% has fully taken over) and then actually sends the reply.
AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
Steps = connection_steps_to_enable_stream_resumption(),
{ok, Alice = #client{props = Props}, _} = escalus_connection:start(AliceSpec, Steps),
SMID = proplists:get_value(smid, Props),
SMH = escalus_connection:get_sm_h(Alice),

%% WHEN a client sends IQ request to the special handler...
IQReq = escalus_stanza:iq_get(regression_ns(), []),
escalus:send(Alice, IQReq),

%% ... goes down and session is resumed.
escalus_client:kill_connection(Config, Alice),
Steps2 = connection_steps_to_stream_resumption(SMID, SMH),
{ok, Alice2, _} = escalus_connection:start(AliceSpec, Steps2),

%% THEN the client receives the reply properly.
IQReply = escalus:wait_for_stanza(Alice2),
escalus:assert(is_iq_result, [IQReq], IQReply),
escalus_connection:stop(Alice2).

%% This is a regression test for a bug, which manifested in following scenario
%% (due to improper presence sub requests buffering):
%% 1. A is online, B is offline
%% 2. A subscribes to B's presence;
%% 3. B becomes online
%% 4. A sends a message to B
%% 5. B doesn't SM-ack the request or message, terminates the connection
%% 6. B reconnects but with session replace, not resume
%% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer
%% 8. B doesn't receive the buffered message
subscription_requests_are_buffered_properly(Config) ->
AliceSpec = [{manual_ack, true}
| escalus_fresh:create_fresh_user(Config, alice)],

MsgBody = <<"buffered">>,
SubPredFun = fun(S) ->
escalus_pred:is_presence_with_type(<<"subscribe">>, S)
end,
AvailablePredFun = fun(S) ->
escalus_pred:is_presence_with_type(<<"available">>, S)
end,
MsgPredFun = fun(S) ->
escalus_pred:is_chat_message(MsgBody, S)
end,

escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
% GIVEN Bob's pending subscription to Alice's presence
AliceUser = proplists:get_value(username, AliceSpec),
AliceServer = proplists:get_value(server, AliceSpec),
AliceJid = <<AliceUser/binary, $@, AliceServer/binary>>,
escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)),
_RosterPushReq = escalus:wait_for_stanza(Bob),

% WHEN Alice becomes online...
Steps = connection_steps_to_enable_stream_resumption(),
{ok, Alice, _} = escalus_connection:start(AliceSpec, Steps),
InitialPresence = escalus_stanza:presence(<<"available">>),
escalus_connection:send(Alice, InitialPresence),
AvailableAndSubPresences = escalus:wait_for_stanzas(Alice, 2),
escalus:assert_many([SubPredFun, AvailablePredFun], AvailableAndSubPresences),

% ...and Bob sends a message to Alice...
escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),
MsgStanza = escalus:wait_for_stanza(Alice),
escalus:assert(is_chat_message, [MsgBody], MsgStanza),

% ...and Alice terminates connection without acking anything...
escalus_client:kill_connection(Config, Alice),

% ...and reconnects with session replacement.
{ok, Alice2, _} = escalus_connection:start(AliceSpec),

% THEN Alice receives (without sending initial presence):
% * buffered available presence (because it's addressed to full JID)
% * buffered Bob's message (like above)
% Alice DOESN'T receive:
% * buffered subscription request because it is dropped by ejabberd_sm
% because it's treated like repeated sub request to bare JID, so it's not
% processed by any sub req handler (like mod_roster)
SubReqAndInitialPresence = escalus:wait_for_stanzas(Alice2, 2),
escalus:assert_many([AvailablePredFun, MsgPredFun], SubReqAndInitialPresence),

escalus_connection:stop(Alice2)
end).

%% This is a regression test for a bug, due to which messages sent to old session
%% in a middle of state handover were not appended properly to SM buffer.
%% Scenario to reproduce:
%% 1. Online Bob and Alice
%% 2. Alice kills the connecion
%% 3. Alice's session is suspended
%% 4. Alice resumes session with new connection. At this moment new session is still not
%% present in session table. `resume` request is stuck in old proc mailbox.
%% 5. Bob sends a message to Alice. Only old proc is present in session table so now
%% old session has two messages in mailbox: `resume` and XML from Bob
%% 6. We resume old process and it begins session handover
%% 7. Bob's message is appended to SM buffer in "flush" step
%% 8. With bug fixed, the message is retransmitted properly
messages_are_properly_flushed_during_resumption(Config) ->
AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
% GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended
Steps = connection_steps_to_enable_stream_resumption(),
{ok, Alice = #client{props = Props}, _} = escalus_connection:start(AliceSpec, Steps),
SMID = proplists:get_value(smid, Props),
InitialPresence = escalus_stanza:presence(<<"available">>),
escalus_connection:send(Alice, InitialPresence),
Presence = escalus:wait_for_stanza(Alice),
escalus:assert(is_presence, Presence),
SMH = escalus_connection:get_sm_h(Alice),
escalus_client:kill_connection(Config, Alice),
{ok, C2SPid} = get_session_pid(AliceSpec, server_string("escalus-default-resource")),
ok = rpc(mim(), sys, suspend, [C2SPid]),

% WHEN new session requests resumption
% we wait until that old session has resumption request enqueued;
% we need it to ensure the order of messages: resume first, Bob's chat second.
% Actual wait and message sent by Bob is done in separate process
% because new client start will block until old process is resumed

MsgBody = <<"flush-regression">>,
spawn(fun() ->
wait_for_queue_length(C2SPid, 1),

% Bob sends a message...
escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),

% ...we ensure that a message is enqueued in Alice's session...
% (2 messages = resume request + Bob's message)
wait_for_queue_length(C2SPid, 2),

% ...and old process is resumed.
ok = rpc(mim(), sys, resume, [C2SPid])
end),

Steps2 = connection_steps_to_stream_resumption(SMID, SMH),
{ok, Alice2, _} = escalus_connection:start(AliceSpec, Steps2),

% THEN Alice's new session receives Bob's message
RecvMsg = escalus:wait_for_stanza(Alice2),
escalus:assert(is_chat_message, [MsgBody], RecvMsg)
end).

%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -891,3 +1052,52 @@ given_fresh_user_with_spec(Spec) ->
escalus:wait_for_stanza(User),
JID = common_helper:get_bjid(Props),
{User#client{jid = JID}, Spec}.

wait_for_queue_length(Pid, Length) ->
mongoose_helper:wait_until(
fun() ->
rpc(mim(), erlang, process_info, [Pid, message_queue_len])
end, {message_queue_len, Length}).

%%--------------------------------------------------------------------
%% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session"
%%--------------------------------------------------------------------

regression_ns() ->
<<"regression">>.

register_handler(Host) ->
rpc(mim(), gen_iq_handler, add_iq_handler,
[ejabberd_sm, Host, regression_ns(), ?MODULE, regression_handler, one_queue]).

unregister_handler(Host) ->
rpc(mim(), gen_iq_handler, remove_iq_handler, [ejabberd_sm, Host, regression_ns()]).

regression_handler(_From, _To, Acc, IQ) ->
%% A bit of a hack - will no longer work when the SID format changes
{_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc),
erlang:monitor(process, Pid),
receive
{'DOWN', _, _, _, _} ->
ok
after
10000 ->
error({c2s_not_stopped_after_timeout, Pid})
end,

%% We avoid another race condition - there is a short window where user session
%% is not registered in ejabberd_sm: between old process termination and the moment
%% when the new process stores new session in memory. It should be fixed separately.
wait_for_session(jid:to_lower(mongoose_acc:get(c2s, origin_jid, undefined, Acc)), 50, 100),

{Acc, jlib:make_result_iq_reply(IQ)}.

wait_for_session({LU, LS, LR} = LJID, Retries, SleepTime) ->
case ejabberd_sm:get_session(LU, LS, LR) of
offline ->
timer:sleep(SleepTime),
wait_for_session(LJID, Retries - 1, SleepTime);
_ ->
ok
end.

1 change: 1 addition & 0 deletions include/ejabberd_c2s.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
stream_mgmt_ack_freq = ?STREAM_MGMT_ACK_FREQ,
stream_mgmt_resume_timeout = ?STREAM_MGMT_RESUME_TIMEOUT,
stream_mgmt_resume_tref,
stream_mgmt_resumed_from,
stream_mgmt_constraint_check_tref,
csi_state = active :: mod_csi:state(),
csi_buffer = [],
Expand Down
29 changes: 23 additions & 6 deletions src/ejabberd_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,8 @@ process_incoming_stanza_with_conflict_check(From, To, Acc, StateName, StateData)
%% "Incoming" means that stanza is coming from ejabberd_router.
-spec check_incoming_accum_for_conflicts(mongoose_acc:t(), state()) ->
unknown_origin | different_origin | same_device | conflict.
check_incoming_accum_for_conflicts(Acc, #state{sid = SID, jid = JID}) ->
check_incoming_accum_for_conflicts(Acc, #state{sid = SID, jid = JID,
stream_mgmt_resumed_from = OldSID}) ->
OriginSID = mongoose_acc:get(c2s, origin_sid, undefined, Acc),
OriginJID = mongoose_acc:get(c2s, origin_jid, undefined, Acc),
AreDefined = OriginJID =/= undefined andalso OriginSID =/= undefined,
Expand All @@ -1200,7 +1201,10 @@ check_incoming_accum_for_conflicts(Acc, #state{sid = SID, jid = JID}) ->
true ->
SameJID = jid:are_equal(OriginJID, JID),
SameSID = OriginSID =:= SID,
case {SameJID, SameSID} of
% It's possible to receive a response addressed to a process
% which we resumed from - still valid!
SameOldSession = OriginSID =:= OldSID,
case {SameJID, SameSID or SameOldSession} of
{false, _} ->
different_origin;
{_, true} ->
Expand Down Expand Up @@ -2331,9 +2335,13 @@ resend_subscription_requests(Acc, #state{pending_invitations = Pending} = StateD
{NewAcc, NewState} = lists:foldl(
fun(XMLPacket, {A, #state{} = State}) ->
A1 = send_element(A, XMLPacket, State),
% We retrieve From i To from a stanza, because Acc has
% from_jid and to_jid that apply to 'available' stanza sent
% by the client
{value, From} = xml:get_tag_attr(<<"from">>, XMLPacket),
{value, To} = xml:get_tag_attr(<<"to">>, XMLPacket),
BufferedStateData = buffer_out_stanza({From, To, XMLPacket}, State),
PacketTuple = {jid:from_binary(From), jid:from_binary(To), XMLPacket},
BufferedStateData = buffer_out_stanza(PacketTuple, State),
% this one will be next to tackle
A2 = maybe_send_ack_request(A1, BufferedStateData),
{A2, BufferedStateData}
Expand Down Expand Up @@ -2492,13 +2500,19 @@ bounce_messages() ->
end.

%% Return the messages in reverse order than they were received in!
-spec flush_messages() -> {N :: non_neg_integer(), Msgs :: [packet()]}.
flush_messages() ->
flush_messages(0, []).

-spec flush_messages(N :: non_neg_integer(), Msgs :: [packet()]) ->
{N :: non_neg_integer(), Msgs :: [packet()]}.
flush_messages(N, Acc) ->
receive
{route, _, _, _} = Msg ->
flush_messages(N+1, [Msg | Acc])
{route, From, To, #xmlel{} = Packet} ->
flush_messages(N+1, [{From, To, Packet} | Acc]);
{route, From, To, MongooseAcc} ->
% TODO: SM buffer should hold acc, not xmlels
flush_messages(N+1, [{From, To, mongoose_acc:element(MongooseAcc)} | Acc])
after 0 ->
{N, Acc}
end.
Expand Down Expand Up @@ -2851,6 +2865,7 @@ stream_mgmt_ack(NIncoming) ->
attrs = [{<<"xmlns">>, ?NS_STREAM_MGNT_3},
{<<"h">>, integer_to_binary(NIncoming)}]}.

-spec buffer_out_stanza(packet(), state()) -> state().
buffer_out_stanza(_Packet, #state{stream_mgmt = false} = S) ->
S;
buffer_out_stanza(_Packet, #state{stream_mgmt_buffer_max = no_buffer} = S) ->
Expand Down Expand Up @@ -3001,7 +3016,9 @@ do_resume_session(SMID, El, [{_, Pid}], #state{ server = Server } = StateData) -

NSD2 = flush_csi_buffer(NSD),

fsm_next_state(session_established, NSD2)
NSD3 = NSD2#state{ stream_mgmt_resumed_from = OldState#state.sid },

fsm_next_state(session_established, NSD3)
catch
%% errors from send_element
_:_ ->
Expand Down
7 changes: 5 additions & 2 deletions src/jlib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,13 @@
'false' | non_neg_integer()}.


-spec make_result_iq_reply(exml:element()) -> exml:element().
-spec make_result_iq_reply(exml:element()) -> exml:element();
(iq()) -> iq().
make_result_iq_reply(XE = #xmlel{attrs = Attrs}) ->
NewAttrs = make_result_iq_reply_attrs(Attrs),
XE#xmlel{attrs = NewAttrs}.
XE#xmlel{attrs = NewAttrs};
make_result_iq_reply(IQ = #iq{}) ->
IQ#iq{ type = result }.


-spec make_result_iq_reply_attrs([binary_pair()]) -> [binary_pair(), ...].
Expand Down

0 comments on commit 9c05df1

Please sign in to comment.