From b6d283a2675d395f5cef78d864e4e1ec9f85f490 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 16:39:57 +0000 Subject: [PATCH 01/19] Allow `AbstractSet` in `StrCollection` Or else frozensets are excluded. This will be useful in an upcoming commit where I plan to change a function that accepts `List[str]` to accept `StrCollection` instead. --- synapse/types/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index c59eca24301e..31edcd736c4f 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -17,6 +17,7 @@ import string from typing import ( TYPE_CHECKING, + AbstractSet, Any, ClassVar, Dict, @@ -79,7 +80,7 @@ # Collection[str] that does not include str itself; str being a Sequence[str] # is very misleading and results in bugs. -StrCollection = Union[Tuple[str, ...], List[str], Set[str]] +StrCollection = Union[Tuple[str, ...], List[str], AbstractSet[str]] # Note that this seems to require inheriting *directly* from Interface in order From cc87d2cec15aa7caf62615cf74fb719ab2ecfaaa Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 16:23:42 +0000 Subject: [PATCH 02/19] `rooms_to_exclude` -> `rooms_to_exclude_globally` I am about to make use of this exclusion mechanism to exclude rooms for a specific user and a specific sync. This rename helps to clarify the distinction between the global config and the rooms to exclude for a specific sync. --- synapse/handlers/sync.py | 11 +++++++---- tests/rest/client/test_sync.py | 4 +++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 78d488f2b1cb..b61da7d3d698 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -290,7 +290,7 @@ def __init__(self, hs: "HomeServer"): expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, ) - self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync + self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync async def wait_for_sync_for_user( self, @@ -1340,7 +1340,10 @@ async def generate_sync_result( membership_change_events = [] if since_token: membership_change_events = await self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude + user_id, + since_token.room_key, + now_token.room_key, + self.rooms_to_exclude_globally, ) mem_last_change_by_room_id: Dict[str, EventBase] = {} @@ -1380,7 +1383,7 @@ async def generate_sync_result( ( room_id for room_id in mutable_joined_room_ids - if room_id not in self.rooms_to_exclude + if room_id not in self.rooms_to_exclude_globally ) ) @@ -2178,7 +2181,7 @@ async def _get_all_rooms( room_list = await self.store.get_rooms_for_local_user_where_membership_is( user_id=user_id, membership_list=Membership.LIST, - excluded_rooms=self.rooms_to_exclude, + excluded_rooms=self.rooms_to_exclude_globally, ) room_entries = [] diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 0af643ecd97b..c9afa0f3dd05 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -913,7 +913,9 @@ def prepare( # We need to manually append the room ID, because we can't know the ID before # creating the room, and we can't set the config after starting the homeserver. - self.hs.get_sync_handler().rooms_to_exclude.append(self.excluded_room_id) + self.hs.get_sync_handler().rooms_to_exclude_globally.append( + self.excluded_room_id + ) def test_join_leave(self) -> None: """Tests that rooms are correctly excluded from the 'join' and 'leave' sections of From beeee9a94745bfe2a25dccb776821aaa8fd4d0a9 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 20 Jan 2023 11:27:44 +0000 Subject: [PATCH 03/19] Better function names for internal sync methods --- synapse/handlers/sync.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b61da7d3d698..bc873209dca7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1837,14 +1837,16 @@ async def _generate_sync_entry_for_rooms( # 3. Work out which rooms need reporting in the sync response. ignored_users = await self.store.ignored_users(user_id) if since_token: - room_changes = await self._get_rooms_changed( + room_changes = await self._get_room_changes_for_incremental_sync( sync_result_builder, ignored_users ) tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key ) else: - room_changes = await self._get_all_rooms(sync_result_builder, ignored_users) + room_changes = await self._get_room_changes_for_initial_sync( + sync_result_builder, ignored_users + ) tags_by_room = await self.store.get_tags_for_user(user_id) log_kv({"rooms_changed": len(room_changes.room_entries)}) @@ -1912,7 +1914,7 @@ async def _have_rooms_changed( return True return False - async def _get_rooms_changed( + async def _get_room_changes_for_incremental_sync( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str], @@ -2156,7 +2158,7 @@ async def _get_rooms_changed( newly_left_rooms, ) - async def _get_all_rooms( + async def _get_room_changes_for_initial_sync( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str], From 9cf40f72cb25dfaaf08bfdb34d534074797bab5d Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 16:26:51 +0000 Subject: [PATCH 04/19] Track a list of excluded rooms on SyncResultBuilder I plan to feed a list of partially stated rooms for this sync to ignore --- synapse/handlers/sync.py | 5 ++++- synapse/storage/databases/main/roommember.py | 10 ++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bc873209dca7..d2a1ab40645c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1400,6 +1400,7 @@ async def generate_sync_result( since_token=since_token, now_token=now_token, joined_room_ids=joined_room_ids, + excluded_room_ids=frozenset(self.rooms_to_exclude_globally), membership_change_events=membership_change_events, ) @@ -2183,7 +2184,7 @@ async def _get_room_changes_for_initial_sync( room_list = await self.store.get_rooms_for_local_user_where_membership_is( user_id=user_id, membership_list=Membership.LIST, - excluded_rooms=self.rooms_to_exclude_globally, + excluded_rooms=sync_result_builder.excluded_room_ids, ) room_entries = [] @@ -2554,6 +2555,7 @@ class SyncResultBuilder: since_token: The token supplied by user, or None. now_token: The token to sync up to. joined_room_ids: List of rooms the user is joined to + excluded_room_ids: Set of room ids we should omit from the /sync response. # The following mirror the fields in a sync response presence @@ -2570,6 +2572,7 @@ class SyncResultBuilder: since_token: Optional[StreamToken] now_token: StreamToken joined_room_ids: FrozenSet[str] + excluded_room_ids: FrozenSet[str] membership_change_events: List[EventBase] presence: List[UserPresenceState] = attr.Factory(list) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index f02c1d7ea7aa..f99e1a616e89 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -47,7 +47,13 @@ ProfileInfo, RoomsForUser, ) -from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain_from_id +from synapse.types import ( + JsonDict, + PersistedEventPosition, + StateMap, + StrCollection, + get_domain_from_id, +) from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList @@ -385,7 +391,7 @@ async def get_rooms_for_local_user_where_membership_is( self, user_id: str, membership_list: Collection[str], - excluded_rooms: Optional[List[str]] = None, + excluded_rooms: StrCollection = (), ) -> List[RoomsForUser]: """Get all the rooms for this *local* user where the membership for this user matches one in the membership list. From 12c11ae16d99762133698c6ac1969695ea15ecb5 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 16:55:37 +0000 Subject: [PATCH 05/19] Exclude partial state rooms during eager sync using the mechanism established in the previous commit --- synapse/handlers/sync.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d2a1ab40645c..041c62a0868a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1378,12 +1378,19 @@ async def generate_sync_result( else: mutable_joined_room_ids.discard(room_id) + mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally) + if not sync_config.filter_collection.lazy_load_members(): + # Exclude all partially stated rooms from this sync. + for room_id in mutable_joined_room_ids: + if await self.store.is_partial_state_room(room_id): + mutable_rooms_to_exclude.add(room_id) + # Now we have our list of joined room IDs, exclude as configured and freeze joined_room_ids = frozenset( ( room_id for room_id in mutable_joined_room_ids - if room_id not in self.rooms_to_exclude_globally + if room_id not in mutable_rooms_to_exclude ) ) @@ -1400,7 +1407,7 @@ async def generate_sync_result( since_token=since_token, now_token=now_token, joined_room_ids=joined_room_ids, - excluded_room_ids=frozenset(self.rooms_to_exclude_globally), + excluded_room_ids=frozenset(mutable_rooms_to_exclude), membership_change_events=membership_change_events, ) From 538e3998f4856addf1da3015dc73744c2212461c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 17:20:02 +0000 Subject: [PATCH 06/19] Track un-partial-state stream in sync tokens So that we can work out which rooms have become fully-stated during a given sync period. --- synapse/storage/databases/main/relations.py | 1 + synapse/streams/events.py | 3 +++ synapse/types/__init__.py | 12 ++++++++---- tests/rest/admin/test_room.py | 4 ++-- tests/rest/client/test_rooms.py | 10 +++++----- 5 files changed, 19 insertions(+), 11 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index aea96e9d2478..84f844b79e7f 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -292,6 +292,7 @@ def _get_recent_references_for_event_txn( to_device_key=0, device_list_key=0, groups_key=0, + un_partial_stated_rooms_key=0, ) return events[:limit], next_token diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 619eb7f601de..584ad1004b96 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -58,6 +58,7 @@ def get_current_token(self) -> StreamToken: push_rules_key = self.store.get_max_push_rules_stream_id() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() + un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token() token = StreamToken( room_key=self.sources.room.get_current_key(), @@ -70,6 +71,7 @@ def get_current_token(self) -> StreamToken: device_list_key=device_list_key, # Groups key is unused. groups_key=0, + un_partial_stated_rooms_key=un_partial_stated_rooms_key, ) return token @@ -107,5 +109,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken: to_device_key=0, device_list_key=0, groups_key=0, + un_partial_stated_rooms_key=0, ) return token diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 31edcd736c4f..f82d1cfc298b 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -634,6 +634,7 @@ class StreamKeyType: PUSH_RULES: Final = "push_rules_key" TO_DEVICE: Final = "to_device_key" DEVICE_LIST: Final = "device_list_key" + UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key" @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -641,7 +642,7 @@ class StreamToken: """A collection of keys joined together by underscores in the following order and which represent the position in their respective streams. - ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1` + ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379` 1. `room_key`: `s2633508` which is a `RoomStreamToken` - `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59` - See the docstring for `RoomStreamToken` for more details. @@ -653,12 +654,13 @@ class StreamToken: 7. `to_device_key`: `274711` 8. `device_list_key`: `265584` 9. `groups_key`: `1` (note that this key is now unused) + 10. `un_partial_stated_rooms_key`: `379` You can see how many of these keys correspond to the various fields in a "/sync" response: ```json { - "next_batch": "s12_4_0_1_1_1_1_4_1", + "next_batch": "s12_4_0_1_1_1_1_4_1_1", "presence": { "events": [] }, @@ -670,7 +672,7 @@ class StreamToken: "!QrZlfIDQLNLdZHqTnt:hs1": { "timeline": { "events": [], - "prev_batch": "s10_4_0_1_1_1_1_4_1", + "prev_batch": "s10_4_0_1_1_1_1_4_1_1", "limited": false }, "state": { @@ -706,6 +708,7 @@ class StreamToken: device_list_key: int # Note that the groups key is no longer used and may have bogus values. groups_key: int + un_partial_stated_rooms_key: int _SEPARATOR = "_" START: ClassVar["StreamToken"] @@ -744,6 +747,7 @@ async def to_string(self, store: "DataStore") -> str: # serialized so that there will not be confusion in the future # if additional tokens are added. str(self.groups_key), + str(self.un_partial_stated_rooms_key), ] ) @@ -776,7 +780,7 @@ def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken": return attr.evolve(self, **{key: new_value}) -StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0) +StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0) @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index e0f5d54abab0..453a6e979c02 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -1831,7 +1831,7 @@ def test_timestamp_to_event(self) -> None: def test_topo_token_is_accepted(self) -> None: """Test Topo Token is accepted.""" - token = "t1-0_0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), @@ -1845,7 +1845,7 @@ def test_topo_token_is_accepted(self) -> None: def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: """Test that stream token is accepted for forward pagination.""" - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index b4daace55617..9222cab19801 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -1987,7 +1987,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.room_id = self.helper.create_room_as(self.user_id) def test_topo_token_is_accepted(self) -> None: - token = "t1-0_0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) ) @@ -1998,7 +1998,7 @@ def test_topo_token_is_accepted(self) -> None: self.assertTrue("end" in channel.json_body) def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) ) @@ -2728,7 +2728,7 @@ def test_messages_filter_labels(self) -> None: """Test that we can filter by a label on a /messages request.""" self._send_labelled_messages_in_room() - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=%s&from=%s&filter=%s" @@ -2745,7 +2745,7 @@ def test_messages_filter_not_labels(self) -> None: """Test that we can filter by the absence of a label on a /messages request.""" self._send_labelled_messages_in_room() - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=%s&from=%s&filter=%s" @@ -2768,7 +2768,7 @@ def test_messages_filter_labels_not_labels(self) -> None: """ self._send_labelled_messages_in_room() - token = "s0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=%s&from=%s&filter=%s" From 32b473054fc6073c4d5486df027169c8bd58b573 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 21:04:11 +0000 Subject: [PATCH 07/19] Fix mutation of `@cached` return value This was fouling up a complement test added alongside this PR. Excluding a room would mean the set of forgotten rooms in the cache would be extended. This means that room could be erroneously considered forgotten in the future. Introduced in #12310, Synapse 1.57.0. I don't think this had any user-visible side effects (until now). --- synapse/storage/databases/main/roommember.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index f99e1a616e89..8e2ba7b7b470 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -15,6 +15,7 @@ import logging from typing import ( TYPE_CHECKING, + AbstractSet, Collection, Dict, FrozenSet, @@ -418,10 +419,12 @@ async def get_rooms_for_local_user_where_membership_is( ) # Now we filter out forgotten and excluded rooms - rooms_to_exclude: Set[str] = await self.get_forgotten_rooms_for_user(user_id) + rooms_to_exclude = await self.get_forgotten_rooms_for_user(user_id) if excluded_rooms is not None: - rooms_to_exclude.update(set(excluded_rooms)) + # Take a copy to avoid mutating the in-cache set + rooms_to_exclude = set(rooms_to_exclude) + rooms_to_exclude.update(excluded_rooms) return [room for room in rooms if room.room_id not in rooms_to_exclude] @@ -1175,7 +1178,7 @@ def f(txn: LoggingTransaction) -> int: return count == 0 @cached() - async def get_forgotten_rooms_for_user(self, user_id: str) -> Set[str]: + async def get_forgotten_rooms_for_user(self, user_id: str) -> AbstractSet[str]: """Gets all rooms the user has forgotten. Args: From 372b3cc9c4dd4a17b2e444b09daa76455233adf1 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 20 Jan 2023 11:58:23 +0000 Subject: [PATCH 08/19] SyncResultBuilder: track rooms to force as newly joined Similar plan as before. We've omitted rooms from certain sync responses; now we establish the mechanism to reintroduce them into future syncs. --- synapse/handlers/sync.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 041c62a0868a..8d2841d0fb7c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1408,6 +1408,7 @@ async def generate_sync_result( now_token=now_token, joined_room_ids=joined_room_ids, excluded_room_ids=frozenset(mutable_rooms_to_exclude), + forced_newly_joined_room_ids=frozenset(), membership_change_events=membership_change_events, ) @@ -2563,6 +2564,12 @@ class SyncResultBuilder: now_token: The token to sync up to. joined_room_ids: List of rooms the user is joined to excluded_room_ids: Set of room ids we should omit from the /sync response. + forced_newly_jined_room_ids: + Rooms that should be presented in the /sync response as if they were + newly joined during the sync period, even if that's not the case. + (This is useful if the room was previously excluded from a /sync response, + and now the client should be made aware of it.) + Only used by incremental syncs. # The following mirror the fields in a sync response presence @@ -2580,6 +2587,7 @@ class SyncResultBuilder: now_token: StreamToken joined_room_ids: FrozenSet[str] excluded_room_ids: FrozenSet[str] + forced_newly_joined_room_ids: FrozenSet[str] membership_change_events: List[EventBase] presence: List[UserPresenceState] = attr.Factory(list) From fd44b78a2c14fba14e49c75e9c774c15cbe1859f Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 20 Jan 2023 11:59:48 +0000 Subject: [PATCH 09/19] Read new field, to present rooms as newly joined --- synapse/handlers/sync.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8d2841d0fb7c..56d4f9e7670f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1914,7 +1914,7 @@ async def _have_rooms_changed( assert since_token - if membership_change_events: + if membership_change_events or sync_result_builder.retransmit_room_ids: return True stream_id = since_token.room_key.stream @@ -1961,7 +1961,7 @@ async def _get_room_changes_for_incremental_sync( for event in membership_change_events: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - newly_joined_rooms: List[str] = [] + newly_joined_rooms: List[str] = list(sync_result_builder.retransmit_room_ids) newly_left_rooms: List[str] = [] room_entries: List[RoomSyncResultBuilder] = [] invited: List[InvitedSyncResult] = [] From 5cb32d11621b61bbcc99dfa9405d02c8c137a0a2 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 20 Jan 2023 12:25:14 +0000 Subject: [PATCH 10/19] Force un-partial-stated rooms to be newly-joined for eager incremental syncs only, provided they're still fully stated --- synapse/handlers/sync.py | 28 ++++++++++++++++--- synapse/storage/databases/main/room.py | 37 +++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 56d4f9e7670f..d7bd345741ed 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1378,13 +1378,33 @@ async def generate_sync_result( else: mutable_joined_room_ids.discard(room_id) + # Tweak the set of rooms to return to the client for eager (non-lazy) syncs. mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally) if not sync_config.filter_collection.lazy_load_members(): + # Non-lazy syncs should never include partially stated rooms. # Exclude all partially stated rooms from this sync. for room_id in mutable_joined_room_ids: if await self.store.is_partial_state_room(room_id): mutable_rooms_to_exclude.add(room_id) + # Incremental eager syncs should additionally include rooms that + # - we are joined to + # - are full-stated + # - became fully-stated at some point during the sync period + # (These rooms will have been omitted during a previous eager sync.) + forced_newly_joined_room_ids = set() + if since_token and not sync_config.filter_collection.lazy_load_members(): + un_partial_stated_rooms = ( + await self.store.get_un_partial_stated_rooms_between( + since_token.un_partial_stated_rooms_key, + now_token.un_partial_stated_rooms_key, + mutable_joined_room_ids, + ) + ) + for room_id in un_partial_stated_rooms: + if not await self.store.is_partial_state_room(room_id): + forced_newly_joined_room_ids.add(room_id) + # Now we have our list of joined room IDs, exclude as configured and freeze joined_room_ids = frozenset( ( @@ -1408,7 +1428,7 @@ async def generate_sync_result( now_token=now_token, joined_room_ids=joined_room_ids, excluded_room_ids=frozenset(mutable_rooms_to_exclude), - forced_newly_joined_room_ids=frozenset(), + forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids), membership_change_events=membership_change_events, ) @@ -1914,7 +1934,7 @@ async def _have_rooms_changed( assert since_token - if membership_change_events or sync_result_builder.retransmit_room_ids: + if membership_change_events or sync_result_builder.forced_newly_joined_room_ids: return True stream_id = since_token.room_key.stream @@ -1961,7 +1981,9 @@ async def _get_room_changes_for_incremental_sync( for event in membership_change_events: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - newly_joined_rooms: List[str] = list(sync_result_builder.retransmit_room_ids) + newly_joined_rooms: List[str] = list( + sync_result_builder.forced_newly_joined_room_ids + ) newly_left_rooms: List[str] = [] room_entries: List[RoomSyncResultBuilder] = [] invited: List[InvitedSyncResult] = [] diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 78906a5e1d9e..7a16d2762711 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -26,6 +26,7 @@ Mapping, Optional, Sequence, + Set, Tuple, Union, cast, @@ -1285,10 +1286,44 @@ def get_un_partial_stated_rooms_token(self) -> int: # explanation.) return self._un_partial_stated_rooms_stream_id_gen.get_current_token() + async def get_un_partial_stated_rooms_between( + self, last_id: int, current_id: int, room_ids: Collection[str] + ) -> Set[str]: + """Get all rooms that got un partial stated between `last_id` exclusive and + `current_id` inclusive. + + Returns: + The list of room ids. + """ + + if last_id == current_id: + return set() + + def _get_un_partial_stated_rooms_between_txn( + txn: LoggingTransaction, + ) -> Set[str]: + sql = """ + SELECT DISTINCT room_id FROM un_partial_stated_room_stream + WHERE ? < stream_id AND stream_id <= ? AND + """ + + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) + + txn.execute(sql + clause, [last_id, current_id] + list(args)) + + return {r[0] for r in txn} + + return await self.db_pool.runInteraction( + "get_un_partial_stated_rooms_between", + _get_un_partial_stated_rooms_between_txn, + ) + async def get_un_partial_stated_rooms_from_stream( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: - """Get updates for caches replication stream. + """Get updates for un partial stated rooms replication stream. Args: instance_name: The writer we want to fetch updates from. Unused From 41c3b33c7a37d69633671a1f28e2517ae2a12a76 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 18 Jan 2023 23:54:24 +0100 Subject: [PATCH 11/19] Notify user stream listeners to wake up long polling syncs --- synapse/handlers/federation.py | 11 ++++++----- synapse/notifier.py | 26 ++++++++++++++++++++++++++ synapse/storage/databases/main/room.py | 10 +++++----- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index eca75f1108d1..3fcd5f3db47e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1726,15 +1726,16 @@ async def _sync_partial_state_room( await self._device_handler.handle_room_un_partial_stated(room_id) logger.info("Clearing partial-state flag for %s", room_id) - success = await self.store.clear_partial_state_room(room_id) - if success: + new_stream_id = await self.store.clear_partial_state_room(room_id) + if new_stream_id is not None: logger.info("State resync complete for %s", room_id) self._storage_controllers.state.notify_room_un_partial_stated( room_id ) - # Poke the notifier so that other workers see the write to - # the un-partial-stated rooms stream. - self._notifier.notify_replication() + + await self._notifier.on_un_partial_stated_room( + room_id, new_stream_id + ) # TODO(faster_joins) update room stats and user directory? # https://github.com/matrix-org/synapse/issues/12814 diff --git a/synapse/notifier.py b/synapse/notifier.py index 26b97cf766c3..258e60367ef9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -315,6 +315,32 @@ async def on_new_room_events( event_entries.append((entry, event.event_id)) await self.notify_new_room_events(event_entries, max_room_stream_token) + async def on_un_partial_stated_room( + self, + room_id: str, + new_token: int, + ) -> None: + """Used by the resync background processes to wake up all listeners + of this room that it just got un-partial-stated. + + It will also notify replication listeners of the change in stream. + """ + + # Wake up all related user stream notifiers + user_streams = self.room_to_user_streams.get(room_id, set()) + time_now_ms = self.clock.time_msec() + for user_stream in user_streams: + try: + user_stream.notify( + StreamKeyType.UN_PARTIAL_STATED_ROOMS, new_token, time_now_ms + ) + except Exception: + logger.exception("Failed to notify listener") + + # Poke the replication so that other workers also see the write to + # the un-partial-stated rooms stream. + self.notify_replication() + async def notify_new_room_events( self, event_entries: List[Tuple[_PendingRoomEventEntry, str]], diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 7a16d2762711..09ba372f7a7f 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2330,16 +2330,16 @@ async def unblock_room(self, room_id: str) -> None: (room_id,), ) - async def clear_partial_state_room(self, room_id: str) -> bool: + async def clear_partial_state_room(self, room_id: str) -> Optional[int]: """Clears the partial state flag for a room. Args: room_id: The room whose partial state flag is to be cleared. Returns: - `True` if the partial state flag has been cleared successfully. + The corresponding stream id for the un-partial-stated rooms stream. - `False` if the partial state flag could not be cleared because the room + `None` if the partial state flag could not be cleared because the room still contains events with partial state. """ try: @@ -2350,7 +2350,7 @@ async def clear_partial_state_room(self, room_id: str) -> bool: room_id, un_partial_state_room_stream_id, ) - return True + return un_partial_state_room_stream_id except self.db_pool.engine.module.IntegrityError as e: # Assume that any `IntegrityError`s are due to partial state events. logger.info( @@ -2358,7 +2358,7 @@ async def clear_partial_state_room(self, room_id: str) -> bool: room_id, e, ) - return False + return None def _clear_partial_state_room_txn( self, From fbed1a7e0e80e2302e32c642622043f3afcb54a2 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 19 Jan 2023 15:34:50 +0000 Subject: [PATCH 12/19] Changelog --- changelog.d/14870.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14870.feature diff --git a/changelog.d/14870.feature b/changelog.d/14870.feature new file mode 100644 index 000000000000..44f701d1c996 --- /dev/null +++ b/changelog.d/14870.feature @@ -0,0 +1 @@ +Faster joins: allow non-lazy-loading ("eager") syncs to complete after a partial join by omitting partial state rooms until they become fully stated. \ No newline at end of file From f46590a3d369d55542724ae1fb2b2bc1f8fde520 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sun, 22 Jan 2023 21:32:00 +0000 Subject: [PATCH 13/19] Typo fix Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d7bd345741ed..051269e9b191 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2586,7 +2586,7 @@ class SyncResultBuilder: now_token: The token to sync up to. joined_room_ids: List of rooms the user is joined to excluded_room_ids: Set of room ids we should omit from the /sync response. - forced_newly_jined_room_ids: + forced_newly_joined_room_ids: Rooms that should be presented in the /sync response as if they were newly joined during the sync period, even if that's not the case. (This is useful if the room was previously excluded from a /sync response, From adbf474afae3facba9aa573e235efc836d25f279 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sun, 22 Jan 2023 21:33:01 +0000 Subject: [PATCH 14/19] Unnecessary list cast Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/storage/databases/main/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 54e5e951d1d6..3aa7b945606e 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1320,7 +1320,7 @@ def _get_un_partial_stated_rooms_between_txn( self.database_engine, "room_id", room_ids ) - txn.execute(sql + clause, [last_id, current_id] + list(args)) + txn.execute(sql + clause, [last_id, current_id] + args) return {r[0] for r in txn} From 237b63d2df64ed3cf3ece868e4a26bc520e15711 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sun, 22 Jan 2023 21:35:15 +0000 Subject: [PATCH 15/19] Rephrase comment Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 5a79f4fed059..2b0e52f23c33 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -320,7 +320,7 @@ async def on_un_partial_stated_room( new_token: int, ) -> None: """Used by the resync background processes to wake up all listeners - of this room that it just got un-partial-stated. + of this room when it is un-partial-stated. It will also notify replication listeners of the change in stream. """ From 2bb13df298b1ccdee4136f4e0d984625462b4b67 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sun, 22 Jan 2023 21:35:46 +0000 Subject: [PATCH 16/19] Another comment Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 051269e9b191..ee117645673f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1391,7 +1391,7 @@ async def generate_sync_result( # - we are joined to # - are full-stated # - became fully-stated at some point during the sync period - # (These rooms will have been omitted during a previous eager sync.) + # (These rooms will have been omitted during a previous eager sync.) forced_newly_joined_room_ids = set() if since_token and not sync_config.filter_collection.lazy_load_members(): un_partial_stated_rooms = ( From 9181ddcc182daf078294fffc4ae040f248bf64af Mon Sep 17 00:00:00 2001 From: David Robertson Date: Sun, 22 Jan 2023 21:42:59 +0000 Subject: [PATCH 17/19] Fixup merge(?) --- synapse/streams/events.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 584ad1004b96..d7084d2358cc 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -53,12 +53,15 @@ def __init__(self, hs: "HomeServer"): *(attribute.type(hs) for attribute in attr.fields(_EventSourcesInner)) ) self.store = hs.get_datastores().main + self._instance_name = hs.get_instance_name() def get_current_token(self) -> StreamToken: push_rules_key = self.store.get_max_push_rules_stream_id() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() - un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token() + un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token( + self._instance_name + ) token = StreamToken( room_key=self.sources.room.get_current_key(), From 995a2eff37d683d8ffd4c48dc8b9db9b22d62cc2 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 23 Jan 2023 14:31:33 +0000 Subject: [PATCH 18/19] Poke notifier when receiving un-partial-stated msg over replication --- synapse/replication/tcp/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 493f61667999..fb8264290284 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -254,6 +254,7 @@ async def on_rdata( self._state_storage_controller.notify_room_un_partial_stated( row.room_id ) + await self.notifier.on_un_partial_stated_room(row.room_id, token) elif stream_name == UnPartialStatedEventStream.NAME: for row in rows: assert isinstance(row, UnPartialStatedEventStreamRow) From b1b2dcd53967ab686f381fd4446cd2e0e775496a Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 23 Jan 2023 15:00:49 +0000 Subject: [PATCH 19/19] Fixup merge whoops Thanks MV :) Co-authored-by: Mathieu Velen --- synapse/handlers/federation.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6d62b8272535..233f8c113d19 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1879,11 +1879,6 @@ async def _sync_partial_state_room( await self._notifier.on_un_partial_stated_room( room_id, new_stream_id ) - - # Poke the notifier so that other workers see the write to - # the un-partial-stated rooms stream. - self._notifier.notify_replication() - return # we raced against more events arriving with partial state. Go round