From eab09e226286620edd42ff86a854b7bc305e293e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Jul 2024 15:01:33 +0100 Subject: [PATCH 1/9] Handle to-device extensions to Sliding Sync --- synapse/handlers/sliding_sync.py | 89 ++++++++++++++++++++++++++- synapse/rest/client/sync.py | 17 ++++- synapse/types/handlers/__init__.py | 23 ++++++- synapse/types/rest/client/__init__.py | 37 +++++++++-- 4 files changed, 156 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index a1ddac903ea..c6bb81a9b59 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -526,11 +526,15 @@ async def current_sync_for_user( rooms[room_id] = room_sync_result + extensions = await self.get_extensions_response( + sync_config=sync_config, to_token=to_token + ) + return SlidingSyncResult( next_pos=to_token, lists=lists, rooms=rooms, - extensions={}, + extensions=extensions, ) async def get_sync_room_ids_for_user( @@ -1367,3 +1371,86 @@ async def get_room_sync_data( notification_count=0, highlight_count=0, ) + + async def get_extensions_response( + self, + sync_config: SlidingSyncConfig, + to_token: StreamToken, + ) -> SlidingSyncResult.Extensions: + """Handle extension requests.""" + + if sync_config.extensions is None: + return SlidingSyncResult.Extensions() + + to_device_response = None + if sync_config.extensions.to_device: + to_device_response = await self.get_to_device_extensions_response( + sync_config=sync_config, + to_device_request=sync_config.extensions.to_device, + to_token=to_token, + ) + + return SlidingSyncResult.Extensions(to_device=to_device_response) + + async def get_to_device_extensions_response( + self, + sync_config: SlidingSyncConfig, + to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension, + to_token: StreamToken, + ) -> SlidingSyncResult.Extensions.ToDeviceExtension: + """Handle to-device extension (MSC3885)""" + + user_id = sync_config.user.to_string() + device_id = sync_config.device_id + + # Check that this request has a valid device ID (not all requests have + # to belong to a device, and so device_id is None), and that the + # extension is enabled. + if device_id is None or not to_device_request.enabled: + return SlidingSyncResult.Extensions.ToDeviceExtension( + next_batch=f"{to_token.to_device_key}", + events=[], + ) + + since_stream_id = 0 + if to_device_request.since: + # We've already validated this is an int. + since_stream_id = int(to_device_request.since) + + if to_token.to_device_key < since_stream_id: + # The since token is ahead of our current token, so we return an + # empty response. + logger.warning( + "Got to-device.since from the future. Next_batch: %r, current: %r", + since_stream_id, + to_token.to_device_key, + ) + return SlidingSyncResult.Extensions.ToDeviceExtension( + next_batch=to_device_request.since, + events=[], + ) + + # Delete everything before the given since token, as we know the + # device must have received them. + deleted = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=since_stream_id, + ) + + logger.debug( + "Deleted %d to-device messages up to %d", deleted, since_stream_id + ) + + messages, stream_id = await self.store.get_messages_for_device( + user_id=user_id, + device_id=device_id, + from_stream_id=since_stream_id, + to_stream_id=to_token.to_device_key, + limit=min(to_device_request.limit, 100), # Limit to at most 100 events + ) + + return SlidingSyncResult.Extensions.ToDeviceExtension( + next_batch=f"{stream_id}", + events=messages, + ) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 2a22bc14ecb..46bc7cce148 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -942,7 +942,9 @@ async def encode_response( response["rooms"] = await self.encode_rooms( requester, sliding_sync_result.rooms ) - response["extensions"] = {} # TODO: sliding_sync_result.extensions + response["extensions"] = await self.encode_extensions( + requester, sliding_sync_result.extensions + ) return response @@ -1053,6 +1055,19 @@ async def encode_rooms( return serialized_rooms + async def encode_extensions( + self, requester: Requester, extensions: SlidingSyncResult.Extensions + ) -> JsonDict: + result = {} + + if extensions.to_device is not None: + result["to_device"] = { + "next_batch": extensions.to_device.next_batch, + "events": extensions.to_device.events, + } + + return result + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: SyncRestServlet(hs).register(http_server) diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 3bd3268e595..46fd7bd6a39 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -18,7 +18,7 @@ # # from enum import Enum -from typing import TYPE_CHECKING, Dict, Final, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, Final, List, Optional, Sequence, Tuple import attr from typing_extensions import TypedDict @@ -244,10 +244,27 @@ class Operation: count: int ops: List[Operation] + @attr.s(slots=True, frozen=True, auto_attribs=True) + class Extensions: + @attr.s(slots=True, frozen=True, auto_attribs=True) + class ToDeviceExtension: + """The to-device extension (MSC3885)""" + + next_batch: str + events: Sequence[JsonMapping] + + def __bool__(self) -> bool: + return bool(self.events) + + to_device: Optional[ToDeviceExtension] = None + + def __bool__(self) -> bool: + return bool(self.to_device) + next_pos: StreamToken lists: Dict[str, SlidingWindowList] rooms: Dict[str, RoomResult] - extensions: JsonMapping + extensions: Extensions def __bool__(self) -> bool: """Make the result appear empty if there are no updates. This is used @@ -263,5 +280,5 @@ def empty(next_pos: StreamToken) -> "SlidingSyncResult": next_pos=next_pos, lists={}, rooms={}, - extensions={}, + extensions=SlidingSyncResult.Extensions(), ) diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index 55f6b440535..cae312753ea 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -276,10 +276,37 @@ class Filters(RequestBodyModel): class RoomSubscription(CommonRoomParameters): pass - class Extension(RequestBodyModel): - enabled: Optional[StrictBool] = False - lists: Optional[List[StrictStr]] = None - rooms: Optional[List[StrictStr]] = None + class Extensions(RequestBodyModel): + """The extensions section of the request.""" + + class ToDeviceExtension(RequestBodyModel): + """The to-device extension (MSC3885) + + Args: + enabled + limit: Maximum number of to-device messages to return + since: The `next_batch` from the previous sync response + """ + + enabled: Optional[StrictBool] = False + limit: StrictInt = 100 + since: Optional[StrictStr] = None + + @validator("since") + def lists_length_check( + cls, value: Optional[StrictStr] + ) -> Optional[StrictStr]: + if value is None: + return value + + try: + int(value) + except ValueError: + raise ValueError("'extensions.to_device.since' is invalid") + + return value + + to_device: Optional[ToDeviceExtension] = None # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 if TYPE_CHECKING: @@ -287,7 +314,7 @@ class Extension(RequestBodyModel): else: lists: Optional[Dict[constr(max_length=64, strict=True), SlidingSyncList]] = None # type: ignore[valid-type] room_subscriptions: Optional[Dict[StrictStr, RoomSubscription]] = None - extensions: Optional[Dict[StrictStr, Extension]] = None + extensions: Optional[Extensions] = None @validator("lists") def lists_length_check( From 3bfc7b2e99230d6b596145af0823e97b4ccc76a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Jul 2024 15:05:40 +0100 Subject: [PATCH 2/9] Newsfile --- changelog.d/17416.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17416.feature diff --git a/changelog.d/17416.feature b/changelog.d/17416.feature new file mode 100644 index 00000000000..a2431dbff87 --- /dev/null +++ b/changelog.d/17416.feature @@ -0,0 +1 @@ +Add to-device support to experimental sliding sync implementation. From 23a155822a883145d7b72d35bbfd2c710b789a66 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Jul 2024 15:24:54 +0100 Subject: [PATCH 3/9] Add some tests --- tests/rest/client/test_sync.py | 200 ++++++++++++++++++++++++++++++++- 1 file changed, 198 insertions(+), 2 deletions(-) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index cb2888409e6..9c9f99954bf 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -38,7 +38,16 @@ ) from synapse.events import EventBase from synapse.handlers.sliding_sync import StateValues -from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync +from synapse.rest.client import ( + devices, + knock, + login, + read_marker, + receipts, + room, + sendtodevice, + sync, +) from synapse.server import HomeServer from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID from synapse.util import Clock @@ -47,7 +56,7 @@ from tests.federation.transport.test_knocking import ( KnockingStrippedStateEventHelperMixin, ) -from tests.server import TimedOutException +from tests.server import FakeChannel, TimedOutException from tests.test_utils.event_injection import mark_event_as_partial_state logger = logging.getLogger(__name__) @@ -3392,3 +3401,190 @@ def test_rooms_required_state_partial_state(self) -> None: ], channel.json_body["lists"]["foo-list"], ) + + +class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase): + """Tests for the to-device sliding sync extension""" + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + sync.register_servlets, + sendtodevice.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.sync_endpoint = ( + "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" + ) + + def _assert_to_device_response( + self, channel: FakeChannel, expected_messages: List[JsonDict] + ) -> str: + """Assert the sliding sync response was successful and has the expected + to-device messages. + + Returns the next_batch token from the to-device section. + """ + self.assertEqual(channel.code, 200, channel.json_body) + extensions = channel.json_body["extensions"] + to_device = extensions["to_device"] + self.assertIsInstance(to_device["next_batch"], str) + self.assertEqual(to_device["events"], expected_messages) + + return to_device["next_batch"] + + def test_no_data(self) -> None: + """Test that enabling to-device extension works, even if there is + no-data + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } + }, + }, + access_token=user1_tok, + ) + + # We expect no to-device messages + self._assert_to_device_response(channel, []) + + def test_data_initial_sync(self) -> None: + """Test that we get to-device messages when we don't specify a since + token""" + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass", "d1") + user2_id = self.register_user("u2", "pass") + user2_tok = self.login(user2_id, "pass", "d2") + + # send the message + test_msg = {"foo": "bar"} + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.test/1234", + content={"messages": {user1_id: {"d1": test_msg}}}, + access_token=user2_tok, + ) + self.assertEqual(chan.code, 200, chan.result) + + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } + }, + }, + access_token=user1_tok, + ) + self._assert_to_device_response( + channel, + [{"content": test_msg, "sender": user2_id, "type": "m.test"}], + ) + + def test_data_incremental_sync(self) -> None: + """Test that we get to-device messages over incremental syncs""" + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass", "d1") + user2_id = self.register_user("u2", "pass") + user2_tok = self.login(user2_id, "pass", "d2") + + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } + }, + }, + access_token=user1_tok, + ) + # No to-device messages yet. + next_batch = self._assert_to_device_response(channel, []) + + test_msg = {"foo": "bar"} + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.test/1234", + content={"messages": {user1_id: {"d1": test_msg}}}, + access_token=user2_tok, + ) + self.assertEqual(chan.code, 200, chan.result) + + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + "since": next_batch, + } + }, + }, + access_token=user1_tok, + ) + next_batch = self._assert_to_device_response( + channel, + [{"content": test_msg, "sender": user2_id, "type": "m.test"}], + ) + + # The next sliding sync request should not include the to-device + # message. + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + "since": next_batch, + } + }, + }, + access_token=user1_tok, + ) + self._assert_to_device_response(channel, []) + + # An initial sliding sync request should not include the to-device + # message, as it should have been deleted + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": {}, + "extensions": { + "to_device": { + "enabled": True, + } + }, + }, + access_token=user1_tok, + ) + self._assert_to_device_response(channel, []) From ac0dfe0a2f73373bcd92873ee097e02477530acb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jul 2024 11:17:27 +0100 Subject: [PATCH 4/9] Update synapse/types/rest/client/__init__.py Co-authored-by: Eric Eastwood --- synapse/types/rest/client/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index cae312753ea..9a43ca14f1e 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -293,7 +293,7 @@ class ToDeviceExtension(RequestBodyModel): since: Optional[StrictStr] = None @validator("since") - def lists_length_check( + def since_token_check( cls, value: Optional[StrictStr] ) -> Optional[StrictStr]: if value is None: From f15897199f71c5d2008a62abb325ba4b483314cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jul 2024 11:21:33 +0100 Subject: [PATCH 5/9] Update synapse/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c6bb81a9b59..c51976a83f2 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1421,7 +1421,7 @@ async def get_to_device_extensions_response( # The since token is ahead of our current token, so we return an # empty response. logger.warning( - "Got to-device.since from the future. Next_batch: %r, current: %r", + "Got to-device.since from the future. since token: %r is ahead of our current to_device stream position: %r", since_stream_id, to_token.to_device_key, ) From e9082098fc16a830bf276fc2a1537dd23688d475 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jul 2024 11:21:46 +0100 Subject: [PATCH 6/9] Update synapse/handlers/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c51976a83f2..b8b09c88baf 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1439,7 +1439,7 @@ async def get_to_device_extensions_response( ) logger.debug( - "Deleted %d to-device messages up to %d", deleted, since_stream_id + "Deleted %d to-device messages up to %d for %s", deleted, since_stream_id, user_id ) messages, stream_id = await self.store.get_messages_for_device( From d2e5e415535b9c660e965eb382422bf2a7cd80b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jul 2024 11:23:02 +0100 Subject: [PATCH 7/9] Apply suggestions from code review Co-authored-by: Eric Eastwood --- changelog.d/17416.feature | 2 +- synapse/handlers/sliding_sync.py | 2 +- synapse/types/rest/client/__init__.py | 2 +- tests/rest/client/test_sync.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/changelog.d/17416.feature b/changelog.d/17416.feature index a2431dbff87..1d119cf48fc 100644 --- a/changelog.d/17416.feature +++ b/changelog.d/17416.feature @@ -1 +1 @@ -Add to-device support to experimental sliding sync implementation. +Add to-device extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index b8b09c88baf..c64bd47e8ac 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1413,7 +1413,7 @@ async def get_to_device_extensions_response( ) since_stream_id = 0 - if to_device_request.since: + if to_device_request.since is not None: # We've already validated this is an int. since_stream_id = int(to_device_request.since) diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index 9a43ca14f1e..7e4369ce531 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -282,7 +282,7 @@ class Extensions(RequestBodyModel): class ToDeviceExtension(RequestBodyModel): """The to-device extension (MSC3885) - Args: + Attributes: enabled limit: Maximum number of to-device messages to return since: The `next_batch` from the previous sync response diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 9c9f99954bf..2c299e31c7f 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -3474,7 +3474,7 @@ def test_data_initial_sync(self) -> None: user2_id = self.register_user("u2", "pass") user2_tok = self.login(user2_id, "pass", "d2") - # send the message + # Send the to-device message test_msg = {"foo": "bar"} chan = self.make_request( "PUT", From ecaf1159fe69cb4f17bc09c4db6f50ccc5d2575e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jul 2024 11:23:21 +0100 Subject: [PATCH 8/9] Review comments --- synapse/types/rest/client/__init__.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index 7e4369ce531..1e8fe76c993 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -277,7 +277,13 @@ class RoomSubscription(CommonRoomParameters): pass class Extensions(RequestBodyModel): - """The extensions section of the request.""" + """The extensions section of the request. + + Extensions MUST have an `enabled` flag which defaults to `false`. If a client + sends an unknown extension name, the server MUST ignore it (or else backwards + compatibility between clients and servers is broken when a newer client tries to + communicate with an older server). + """ class ToDeviceExtension(RequestBodyModel): """The to-device extension (MSC3885) @@ -296,13 +302,18 @@ class ToDeviceExtension(RequestBodyModel): def since_token_check( cls, value: Optional[StrictStr] ) -> Optional[StrictStr]: + # `since` comes in as an opaque string token but we know that it's just + # an integer representing the position in the device inbox stream. We + # want to pre-validate it to make sure it works fine in downstream code. if value is None: return value try: int(value) except ValueError: - raise ValueError("'extensions.to_device.since' is invalid") + raise ValueError( + "'extensions.to_device.since' is invalid (should look like an int)" + ) return value From 052346a789901be80b43ddf263a869291a751e00 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jul 2024 11:27:18 +0100 Subject: [PATCH 9/9] Review comments --- synapse/handlers/sliding_sync.py | 20 +++++++++++++++++--- synapse/types/handlers/__init__.py | 14 +++++++++++++- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c64bd47e8ac..eca63e3ed63 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1377,7 +1377,12 @@ async def get_extensions_response( sync_config: SlidingSyncConfig, to_token: StreamToken, ) -> SlidingSyncResult.Extensions: - """Handle extension requests.""" + """Handle extension requests. + + Args: + sync_config: Sync configuration + to_token: The point in the stream to sync up to. + """ if sync_config.extensions is None: return SlidingSyncResult.Extensions() @@ -1398,7 +1403,13 @@ async def get_to_device_extensions_response( to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension, to_token: StreamToken, ) -> SlidingSyncResult.Extensions.ToDeviceExtension: - """Handle to-device extension (MSC3885)""" + """Handle to-device extension (MSC3885) + + Args: + sync_config: Sync configuration + to_device_request: The to-device extension from the request + to_token: The point in the stream to sync up to. + """ user_id = sync_config.user.to_string() device_id = sync_config.device_id @@ -1439,7 +1450,10 @@ async def get_to_device_extensions_response( ) logger.debug( - "Deleted %d to-device messages up to %d for %s", deleted, since_stream_id, user_id + "Deleted %d to-device messages up to %d for %s", + deleted, + since_stream_id, + user_id, ) messages, stream_id = await self.store.get_messages_for_device( diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 46fd7bd6a39..93c09806aa6 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -246,9 +246,21 @@ class Operation: @attr.s(slots=True, frozen=True, auto_attribs=True) class Extensions: + """Responses for extensions + + Attributes: + to_device: The to-device extension (MSC3885) + """ + @attr.s(slots=True, frozen=True, auto_attribs=True) class ToDeviceExtension: - """The to-device extension (MSC3885)""" + """The to-device extension (MSC3885) + + Attributes: + next_batch: The to-device stream token the client should use + to get more results + events: A list of to-device messages for the client + """ next_batch: str events: Sequence[JsonMapping]