Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delays to SM buffer #4407

Merged
merged 7 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 62 additions & 8 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ groups() ->
{parallel_manual_ack_freq_1, [parallel], parallel_manual_ack_freq_1_cases()},
{manual_ack_freq_2, [], manual_ack_freq_2_cases()},
{stale_h, [], stale_h_cases()},
{parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()}
{parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()},
{resume_timeout, [parallel], resume_timeout_cases()}
].

ws_tests() ->
Expand All @@ -72,6 +73,7 @@ ws_tests() ->
{group, manual_ack_freq_2},
{group, stale_h},
{group, parallel_unacknowledged_message_hook},
{group, resume_timeout},
ping_timeout].

tcp_tests() ->
Expand All @@ -80,6 +82,7 @@ tcp_tests() ->
{group, manual_ack_freq_2},
{group, stale_h},
{group, parallel_unacknowledged_message_hook},
{group, resume_timeout},
ping_timeout].

parallel_cases() ->
Expand Down Expand Up @@ -116,7 +119,6 @@ parallel_cases() ->
parallel_manual_ack_freq_1_cases() ->
[client_acks_more_than_sent,
too_many_unacked_stanzas,
resend_unacked_after_resume_timeout,
resume_session_state_send_message_with_ack,
resume_session_state_send_message_without_ack,
resume_session_state_stop_c2s,
Expand All @@ -127,6 +129,10 @@ parallel_manual_ack_freq_1_cases() ->
manual_ack_freq_2_cases() ->
[server_requests_ack_freq_2].

resume_timeout_cases() ->
[resend_unacked_after_resume_timeout,
resend_unacked_to_different_res_after_resume_timeout].

stale_h_cases() ->
[resume_expired_session_returns_correct_h,
gc_repeat_after_never_means_no_cleaning,
Expand Down Expand Up @@ -258,6 +264,8 @@ required_sm_opts(group, parallel_unacknowledged_message_hook) ->
#{ack_freq => 1};
required_sm_opts(group, manual_ack_freq_long_session_timeout) ->
#{ack_freq => 1, buffer_max => 1000};
required_sm_opts(group, resume_timeout) ->
#{ack_freq => 1, resume_timeout => ?SHORT_TIMEOUT};
required_sm_opts(testcase, resume_expired_session_returns_correct_h) ->
#{ack_freq => 1,
resume_timeout => ?SHORT_TIMEOUT,
Expand Down Expand Up @@ -545,16 +553,20 @@ resend_unacked_on_reconnection(Config) ->
sm_helper:send_messages(Bob, User, Texts),
%% User receives the messages.
sm_helper:wait_for_messages(User, Texts),
%% User disconnects without acking the messages.
%% User disconnects ending stream gracefully, but without acking the messages.
sm_helper:stop_client_and_wait_for_termination(User),
%% Messages go to the offline store.
%% User receives the messages from the offline store.
NewUser = connect_spec(UserSpec, session, manual),
send_initial_presence(NewUser),
sm_helper:wait_for_messages(NewUser, Texts),
sm_helper:wait_for_delayed_messages(NewUser, Texts),
%% User acks the delayed messages so they won't go again
%% to the offline store.
escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)).
escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)),
% user receives initial presence response
P = escalus:wait_for_stanza(NewUser),
escalus:assert(is_presence, P),
escalus_connection:stop(NewUser).

%% Remove wait_for_n_offline_messages and you will get anything, but preserve_order
%% TODO Test without wait_for_n_offline_messages. It would require changes in SM
Expand Down Expand Up @@ -630,20 +642,62 @@ resend_unacked_after_resume_timeout(Config) ->
User = connect_fresh(Config, ?config(user, Config), sr_presence),
UserSpec = sm_helper:client_to_spec(User),

escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)),
escalus_connection:send(Bob, escalus_stanza:chat_to(User, <<"msg-1">>)),
%% kill user connection
escalus_connection:kill(User),

%% ensure there is no session
C2SPid = mongoose_helper:get_session_pid(User),
sm_helper:wait_until_resume_session(C2SPid),

%% user come back and receives unacked message
%% user comes back
NewUser = connect_spec(UserSpec, session),
send_initial_presence(NewUser),

%% resume timeout passes
timer:sleep(timer:seconds(?SHORT_TIMEOUT + 1)),

%% user receives unacked message and initial presence
UnackedStanzas = escalus:wait_for_stanzas(NewUser, 2),
escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)],
escalus:wait_for_stanzas(NewUser, 2)),
UnackedStanzas),
[UnackedMsg] = lists:filter(fun escalus_pred:is_message/1, UnackedStanzas),
sm_helper:assert_delayed(UnackedMsg),
escalus_assert:has_no_stanzas(NewUser),

escalus_connection:stop(Bob),
escalus_connection:stop(NewUser).


resend_unacked_to_different_res_after_resume_timeout(Config) ->
%% connect bob and user
Bob = connect_fresh(Config, bob, presence),
User = connect_fresh(Config, ?config(user, Config), sr_presence),
UserSpec = sm_helper:client_to_spec(User),

escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)),
%% kill user connection
escalus_connection:kill(User),

%% ensure there is no session
C2SPid = mongoose_helper:get_session_pid(User),
sm_helper:wait_until_resume_session(C2SPid),

%% user comes back with different resource
NewUser = connect_spec([{resource, <<"2nd_resource">>} | UserSpec], session),
send_initial_presence(NewUser),

%% resume timeout passes
timer:sleep(timer:seconds(?SHORT_TIMEOUT + 1)),

%% user receives unacked message and presence, as well as initial presence response
%% the order of the messages may change, especially on CI, so we test all of them
UnackedStanzas = escalus:wait_for_stanzas(NewUser, 3),
escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>), is_presence], UnackedStanzas),
[UnackedMsg] = lists:filter(fun escalus_pred:is_message/1, UnackedStanzas),
sm_helper:assert_delayed(UnackedMsg),

escalus_assert:has_no_stanzas(NewUser),

escalus_connection:stop(Bob),
escalus_connection:stop(NewUser).
Expand Down
10 changes: 10 additions & 0 deletions big_tests/tests/sm_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
-export([send_initial_presence/1,
send_messages/3,
wait_for_messages/2,
wait_for_delayed_messages/2,
assert_delayed/1,
assert_messages/2,
send_and_receive/3,
get_ack/1,
Expand Down Expand Up @@ -293,6 +295,14 @@ wait_for_messages(Alice, Texts) ->
Stanzas = escalus:wait_for_stanzas(Alice, length(Texts)),
assert_messages(Stanzas, Texts).

wait_for_delayed_messages(Alice, Texts) ->
Stanzas = escalus:wait_for_stanzas(Alice, length(Texts)),
assert_messages(Stanzas, Texts),
[assert_delayed(S) || S <- Stanzas].

assert_delayed(Stanza) ->
escalus:assert(has_ns, [<<"urn:xmpp:delay">>], exml_query:subelement(Stanza, <<"delay">>)).

assert_messages(Stanzas, Texts) ->
Bodies = lists:map(fun get_body/1, Stanzas),
case Bodies of
Expand Down
2 changes: 1 addition & 1 deletion src/mod_presence.erl
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ route_probe(Acc, Presences, FromJid, ToJid) ->
Packet0 = Presences#presences_state.pres_last,
TS = Presences#presences_state.pres_timestamp,
%% To is the one sending the presence (the target of the probe)
Packet1 = jlib:maybe_append_delay(Packet0, ToJid, TS, <<>>),
Packet1 = jlib:maybe_append_delay(Packet0, ToJid, TS, <<"Delayed presence">>),
HostType = mongoose_acc:host_type(Acc),
Acc2 = mongoose_hooks:presence_probe(HostType, Acc, FromJid, ToJid, self()),
%% Don't route a presence probe to oneself
Expand Down
24 changes: 18 additions & 6 deletions src/stream_management/mod_stream_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,20 @@
handle_user_terminate(#sm_state{counter_in = H} = SmState, StateData, HostType) ->
Sid = mongoose_c2s:get_sid(StateData),
do_remove_smid(HostType, Sid, H),
reroute_buffer(StateData, SmState),
FromServer = mongoose_c2s:get_lserver(StateData),
NewState = add_delay_elements_to_buffer(SmState, FromServer),
reroute_buffer(StateData, NewState),
SmState#sm_state{buffer = [], buffer_size = 0}.

reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _}}}) ->
mongoose_c2s:reroute_buffer_to_pid(StateData, Pid, Buffer);
reroute_buffer(StateData, #sm_state{buffer = Buffer}) ->
mongoose_c2s:reroute_buffer(StateData, Buffer).

add_delay_elements_to_buffer(#sm_state{buffer = Buffer} = SmState, FromServer) ->
BufferWithDelays = [maybe_add_timestamp(Acc, FromServer) || Acc <- Buffer],
SmState#sm_state{buffer = BufferWithDelays}.

-spec terminate(term(), c2s_state(), mongoose_c2s:data()) -> term().
terminate(Reason, C2SState, StateData) ->
?LOG_DEBUG(#{what => stream_mgmt_statem_terminate, reason => Reason,
Expand Down Expand Up @@ -615,14 +621,20 @@
LServer = mongoose_c2s:get_lserver(StateData),
FromServer = jid:make_noprep(<<>>, LServer, <<>>),
ToForward = [ begin
TS = mongoose_acc:timestamp(Acc),
Packet = mongoose_acc:element(Acc),
StanzaName = mongoose_acc:stanza_name(Acc),
StanzaType = mongoose_acc:stanza_type(Acc),
maybe_add_timestamp(Packet, StanzaName, StanzaType, TS, FromServer)
AccWithTS = maybe_add_timestamp(Acc, FromServer),
mongoose_acc:element(AccWithTS)

Check warning on line 625 in src/stream_management/mod_stream_management.erl

View check run for this annotation

Codecov / codecov/patch

src/stream_management/mod_stream_management.erl#L624-L625

Added lines #L624 - L625 were not covered by tests
end || Acc <- lists:reverse(Buffer)],
{Resumed, ToForward}.

maybe_add_timestamp(Acc, FromServer) ->
TS = mongoose_acc:timestamp(Acc),
StanzaName = mongoose_acc:stanza_name(Acc),
StanzaType = mongoose_acc:stanza_type(Acc),
{From, To, El} = mongoose_acc:packet(Acc),
ElWithDelay = maybe_add_timestamp(El, StanzaName, StanzaType, TS, FromServer),
AccParams = #{from_jid => From, to_jid => To, element => ElWithDelay},
mongoose_acc:update_stanza(AccParams, Acc).

maybe_add_timestamp(Packet, <<"message">>, <<"error">>, _, _) ->
Packet;
maybe_add_timestamp(Packet, <<"message">>, <<"headline">>, _, _) ->
Expand Down