From 062b6cd4ca70e68df5df452fe63abcb6d966f92e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Jul 2024 13:18:32 +0100 Subject: [PATCH 1/4] Fix bug with 'bound_future_token' --- synapse/streams/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 93d5ae1a556..80ddac174e3 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -140,7 +140,7 @@ async def bound_future_token(self, token: StreamToken) -> StreamToken: ].get_max_allocated_token() token = token.copy_and_replace( - key, token.room_key.bound_stream_token(max_token) + key, token_value.bound_stream_token(max_token) ) else: assert isinstance(current_value, int) From 79ef8d5e00bd3a1135bfe4575fe0ef5a09467cd5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Jul 2024 13:23:01 +0100 Subject: [PATCH 2/4] Add logging --- synapse/streams/events.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 80ddac174e3..856f646795c 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -19,6 +19,7 @@ # # +import logging from typing import TYPE_CHECKING, Sequence, Tuple import attr @@ -41,6 +42,9 @@ from synapse.server import HomeServer +logger = logging.getLogger(__name__) + + @attr.s(frozen=True, slots=True, auto_attribs=True) class _EventSourcesInner: room: RoomEventSource @@ -139,9 +143,16 @@ async def bound_future_token(self, token: StreamToken) -> StreamToken: key ].get_max_allocated_token() - token = token.copy_and_replace( - key, token_value.bound_stream_token(max_token) - ) + if max_token < token_value.get_max_stream_pos(): + logger.error( + "Bounding token from the future '%s': token: %s, bound: %s", + key, + token_value, + max_token, + ) + token = token.copy_and_replace( + key, token_value.bound_stream_token(max_token) + ) else: assert isinstance(current_value, int) if current_value < token_value: @@ -149,7 +160,14 @@ async def bound_future_token(self, token: StreamToken) -> StreamToken: key ].get_max_allocated_token() - token = token.copy_and_replace(key, min(token_value, max_token)) + if max_token < token_value: + logger.error( + "Bounding token from the future '%s': token: %s, bound: %s", + key, + token_value, + max_token, + ) + token = token.copy_and_replace(key, max_token) return token From d2240a3043d7dc889fcff2902addde6afb0fbbb2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Jul 2024 13:37:30 +0100 Subject: [PATCH 3/4] Add test --- tests/handlers/test_sync.py | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 5319928c280..674dd4fb54c 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -36,7 +36,14 @@ from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer -from synapse.types import JsonDict, StreamKeyType, UserID, create_requester +from synapse.types import ( + JsonDict, + MultiWriterStreamToken, + RoomStreamToken, + StreamKeyType, + UserID, + create_requester, +) from synapse.util import Clock import tests.unittest @@ -999,7 +1006,13 @@ def test_wait_for_future_sync_token(self) -> None: self.get_success(sync_d, by=1.0) - def test_wait_for_invalid_future_sync_token(self) -> None: + @parameterized.expand( + [(key,) for key in StreamKeyType.__members__.values()], + name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}", + ) + def test_wait_for_invalid_future_sync_token( + self, stream_key: StreamKeyType + ) -> None: """Like the previous test, except we give a token that has a stream position ahead of what is in the DB, i.e. its invalid and we shouldn't wait for the stream to advance (as it may never do so). @@ -1010,11 +1023,23 @@ def test_wait_for_invalid_future_sync_token(self) -> None: """ user = self.register_user("alice", "password") - # Create a token and arbitrarily advance one of the streams. + # Create a token and advance one of the streams. current_token = self.hs.get_event_sources().get_current_token() - since_token = current_token.copy_and_advance( - StreamKeyType.PRESENCE, current_token.presence_key + 1 - ) + token_value = current_token.get_field(stream_key) + + # How we advance the streams depends on the type. + if isinstance(token_value, int): + since_token = current_token.copy_and_advance(stream_key, token_value + 1) + elif isinstance(token_value, MultiWriterStreamToken): + since_token = current_token.copy_and_advance( + stream_key, MultiWriterStreamToken(stream=token_value.stream + 1) + ) + elif isinstance(token_value, RoomStreamToken): + since_token = current_token.copy_and_advance( + stream_key, RoomStreamToken(stream=token_value.stream + 1) + ) + else: + raise Exception("Unreachable") sync_d = defer.ensureDeferred( self.sync_handler.wait_for_sync_for_user( From 8ccd320eb80e96038f6f1664aa357a849084115f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Jul 2024 13:39:16 +0100 Subject: [PATCH 4/4] Newsfile --- changelog.d/17391.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17391.bugfix diff --git a/changelog.d/17391.bugfix b/changelog.d/17391.bugfix new file mode 100644 index 00000000000..9686b5c2768 --- /dev/null +++ b/changelog.d/17391.bugfix @@ -0,0 +1 @@ +Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0.