From 7aee344c24a8c83a3bf79304bc72b762459fbd3a Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 6 Dec 2021 18:28:33 +0000 Subject: [PATCH 1/9] Pull out `_classify_rooms_by_membership_changes` --- synapse/handlers/sync.py | 127 ++++++++++++++++++++++----------------- 1 file changed, 72 insertions(+), 55 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f3039c3c3fb7..b62ec3416bd1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -62,7 +62,6 @@ # Debug logger for https://github.com/matrix-org/synapse/issues/4422 issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") - # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is # "true" or "false" depending on if the request asked for lazy loaded members or @@ -83,7 +82,6 @@ # avoiding redundantly sending the same lazy-loaded members to the client LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 - SyncRequestKey = Tuple[Any, ...] @@ -1684,7 +1682,7 @@ async def _get_rooms_changed( now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config - assert since_token + assert since_token is not None # The spec # https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync @@ -1703,6 +1701,77 @@ async def _get_rooms_changed( user_id, since_token.room_key, now_token.room_key ) + room_changes = await self._classify_rooms_by_membership_changes( + sync_result_builder, membership_change_events, ignored_users + ) + + timeline_limit = sync_config.filter_collection.timeline_limit() + + # Get all events since the `from_key` in rooms we're currently joined to. + # If there are too many, we get the most recent events only. This leaves + # a "gap" in the timeline, as described by the spec for /sync. + room_to_events = await self.store.get_room_events_stream_for_rooms( + room_ids=sync_result_builder.joined_room_ids, + from_key=since_token.room_key, + to_key=now_token.room_key, + limit=timeline_limit + 1, + ) + + # We loop through all room ids, even if there are no new events, in case + # there are non room events that we need to notify about. + for room_id in sync_result_builder.joined_room_ids: + room_entry = room_to_events.get(room_id, None) + + newly_joined = room_id in room_changes.newly_joined_rooms + if room_entry: + events, start_key = room_entry + + prev_batch_token = now_token.copy_and_replace("room_key", start_key) + + entry = RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=events, + newly_joined=newly_joined, + full_state=False, + since_token=None if newly_joined else since_token, + upto_token=prev_batch_token, + ) + else: + entry = RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=[], + newly_joined=newly_joined, + full_state=False, + since_token=since_token, + upto_token=since_token, + ) + + if newly_joined: + # debugging for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "RoomSyncResultBuilder events for newly joined room %s: %r", + room_id, + entry.events, + ) + room_changes.room_entries.append(entry) + + return room_changes + + async def _classify_rooms_by_membership_changes( + self, + sync_result_builder: "SyncResultBuilder", + membership_change_events: List[EventBase], + ignored_users: Collection[str], + ) -> _RoomChanges: + since_token = sync_result_builder.since_token + # This assetion is also made in the caller, `_get_rooms_changed`. We repeat it + # here for mypy's benefit. + assert since_token is not None + + user_id = sync_result_builder.sync_config.user.to_string() + mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} for event in membership_change_events: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) @@ -1859,58 +1928,6 @@ async def _get_rooms_changed( ) ) - timeline_limit = sync_config.filter_collection.timeline_limit() - - # Get all events since the `from_key` in rooms we're currently joined to. - # If there are too many, we get the most recent events only. This leaves - # a "gap" in the timeline, as described by the spec for /sync. - room_to_events = await self.store.get_room_events_stream_for_rooms( - room_ids=sync_result_builder.joined_room_ids, - from_key=since_token.room_key, - to_key=now_token.room_key, - limit=timeline_limit + 1, - ) - - # We loop through all room ids, even if there are no new events, in case - # there are non room events that we need to notify about. - for room_id in sync_result_builder.joined_room_ids: - room_entry = room_to_events.get(room_id, None) - - newly_joined = room_id in newly_joined_rooms - if room_entry: - events, start_key = room_entry - - prev_batch_token = now_token.copy_and_replace("room_key", start_key) - - entry = RoomSyncResultBuilder( - room_id=room_id, - rtype="joined", - events=events, - newly_joined=newly_joined, - full_state=False, - since_token=None if newly_joined else since_token, - upto_token=prev_batch_token, - ) - else: - entry = RoomSyncResultBuilder( - room_id=room_id, - rtype="joined", - events=[], - newly_joined=newly_joined, - full_state=False, - since_token=since_token, - upto_token=since_token, - ) - - if newly_joined: - # debugging for https://github.com/matrix-org/synapse/issues/4422 - issue4422_logger.debug( - "RoomSyncResultBuilder events for newly joined room %s: %r", - room_id, - entry.events, - ) - room_entries.append(entry) - return _RoomChanges( room_entries, invited, From bca64f949e42bd5a45056da9bf8ece46dceae99b Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 7 Dec 2021 12:33:13 +0000 Subject: [PATCH 2/9] Remove unused branch I claim that it is impossible to hit the `continue` which is removed in this commit. Setup: - `events` is a list of all membership events in the range `since < time <= now`. - `non_joins` is the list of `e in events` with `e.membership` not equal to`"join"`. - `events` is a nonempty list by construction of `mem_change_events_by_room_id`. Rationale: - We hit the deleted code only if `non_joins` is empty. - If so, `events` consists only of `join` membership events. - `events` is non_empty, so there was at least one join during the sync period. - Therefore the room_id will belong to `sync_result_builder.joined_room_ids`. But this means we will have `continue`d in the branch above. - I'm assuming here that `joined_room_ids` and `events` are both using the same `now_token.room_key`. --- synapse/handlers/sync.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b62ec3416bd1..d8cbbb576d3b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1070,7 +1070,7 @@ async def generate_sync_result( At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` instance to signify that the sync calculation is complete. """ - # NB: The now_token gets changed by some of the generate_sync_* methods, + # NB: Parts of the now_token get changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` @@ -1091,6 +1091,8 @@ async def generate_sync_result( # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() else: + # The `room_key` part of the `now_token` is not changed by the sync + # machinery. If it did, `joined_room_ids` could become out of date. joined_room_ids = await self.get_rooms_for_user_at( user_id, now_token.room_key ) @@ -1837,10 +1839,6 @@ async def _classify_rooms_by_membership_changes( if room_id in sync_result_builder.joined_room_ids: continue - if not non_joins: - continue - last_non_join = non_joins[-1] - # Check if we have left the room. This can either be because we were # joined before *or* that we since joined and then left. if events[-1].membership != Membership.JOIN: @@ -1861,6 +1859,7 @@ async def _classify_rooms_by_membership_changes( newly_left_rooms.append(room_id) # Only bother if we're still currently invited + last_non_join = non_joins[-1] should_invite = last_non_join.membership == Membership.INVITE if should_invite: if last_non_join.sender not in ignored_users: From 7662ba453153081b7cb3f7eff02c55690877e238 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 7 Dec 2021 16:22:24 +0000 Subject: [PATCH 3/9] Signposting in `_classify_rooms_by_membership` --- synapse/handlers/sync.py | 61 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d8cbbb576d3b..55867755e89e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -204,9 +204,41 @@ class _RoomChanges: room_entries: List["RoomSyncResultBuilder"] invited: List[InvitedSyncResult] + """Our outstanding invitations at the `now_token`.""" + knocked: List[KnockedSyncResult] + """Rooms we have knocked on at the `now_token`.""" + newly_joined_rooms: List[str] + """Rooms we joined at some point between `since` and `now`. + + Note: we need not be joined to these rooms at the `since` or `now` tokens. + Some examples: + + Since Midway Now + -------------------------- + join + invite join + join leave join + invite join leave + """ newly_left_rooms: List[str] + """Rooms we are not joined to at the `now_token` and left between `since` and `now`. + + "Left" means "membership changed from 'join` to something else". It's not the same + as moving to the membership `leave`. + + Note: we need not have membership "leave" at the `since` or `now` tokens. + Some examples: + Since Midway Now + -------------------------- + join leave + join ban + invite join leave + leave join leave + join leave invite + join leave knock + """ @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -1767,6 +1799,17 @@ async def _classify_rooms_by_membership_changes( membership_change_events: List[EventBase], ignored_users: Collection[str], ) -> _RoomChanges: + """Classify each room by the membership changes from `since` upto `now`. + + Rooms are grouped by the user's membership at the `now_token`, either "invite", + "join", "leave" or "knock". + + Invite and knock are the simplest: to include these in the sync body, we need + just the room ID and the invite/knock event. + + See the _RoomChanges struct for the meaning of the five lists we build up and + return. + """ since_token = sync_result_builder.since_token # This assetion is also made in the caller, `_get_rooms_changed`. We repeat it # here for mypy's benefit. @@ -1774,15 +1817,17 @@ async def _classify_rooms_by_membership_changes( user_id = sync_result_builder.sync_config.user.to_string() - mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} - for event in membership_change_events: - mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - newly_joined_rooms: List[str] = [] newly_left_rooms: List[str] = [] room_entries: List[RoomSyncResultBuilder] = [] invited: List[InvitedSyncResult] = [] knocked: List[KnockedSyncResult] = [] + + # 0. Do a first pass to group the events by room id. + mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} + for event in membership_change_events: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + for room_id, events in mem_change_events_by_room_id.items(): # The body of this loop will add this room to at least one of the five lists # above. Things get messy if you've e.g. joined, left, joined then left the @@ -1796,10 +1841,10 @@ async def _classify_rooms_by_membership_changes( non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) + # 1. Should we add this room to `newly_joined_rooms`? # We want to figure out if we joined the room at some point since # the last sync (even if we have since left). This is to make sure - # we do send down the room, and with full state, where necessary - + # we do send down the room, and with full state, where necessary. old_state_ids = None if room_id in sync_result_builder.joined_room_ids and non_joins: # Always include if the user (re)joined the room, especially @@ -1839,6 +1884,7 @@ async def _classify_rooms_by_membership_changes( if room_id in sync_result_builder.joined_room_ids: continue + # 2. Should we add this to `newly_left_rooms`? # Check if we have left the room. This can either be because we were # joined before *or* that we since joined and then left. if events[-1].membership != Membership.JOIN: @@ -1858,6 +1904,7 @@ async def _classify_rooms_by_membership_changes( if old_mem_ev and old_mem_ev.membership == Membership.JOIN: newly_left_rooms.append(room_id) + # 3. Should we add this room to `invited`? # Only bother if we're still currently invited last_non_join = non_joins[-1] should_invite = last_non_join.membership == Membership.INVITE @@ -1867,6 +1914,7 @@ async def _classify_rooms_by_membership_changes( if invite_room_sync: invited.append(invite_room_sync) + # 4. Should we add this room to `knocked`? # Only bother if our latest membership in the room is knock (and we haven't # been accepted/rejected in the meantime). should_knock = last_non_join.membership == Membership.KNOCK @@ -1875,6 +1923,7 @@ async def _classify_rooms_by_membership_changes( if knock_room_sync: knocked.append(knock_room_sync) + # 5. Do we need to add this to `room_entries`? # Always include leave/ban events. Just take the last one. # TODO: How do we handle ban -> leave in same batch? leave_events = [ From 6ff95ee66c29dbb12f832ad8bc9acdb86a89b903 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 7 Dec 2021 16:27:23 +0000 Subject: [PATCH 4/9] Pull out `_fetch_membership_event_at` Doing so makes it possible we'll repeat a DB query. In the next commit, I'll demonstrate that we always call this exactly once. --- synapse/handlers/sync.py | 58 ++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 55867755e89e..a2045412d8fa 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1845,7 +1845,6 @@ async def _classify_rooms_by_membership_changes( # We want to figure out if we joined the room at some point since # the last sync (even if we have since left). This is to make sure # we do send down the room, and with full state, where necessary. - old_state_ids = None if room_id in sync_result_builder.joined_room_ids and non_joins: # Always include if the user (re)joined the room, especially # important so that device list changes are calculated correctly. @@ -1857,27 +1856,20 @@ async def _classify_rooms_by_membership_changes( continue if room_id in sync_result_builder.joined_room_ids or has_join: - old_state_ids = await self.get_state_at(room_id, since_token) - old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) - old_mem_ev = None - if old_mem_ev_id: - old_mem_ev = await self.store.get_event( - old_mem_ev_id, allow_none=True - ) + old_mem_ev = await self._fetch_membership_event_at( + room_id, user_id, since_token + ) # debug for #4422 - if has_join: - prev_membership = None - if old_mem_ev: - prev_membership = old_mem_ev.membership + if has_join and old_mem_ev is not None: issue4422_logger.debug( "Previous membership for room %s with join: %s (event %s)", room_id, - prev_membership, - old_mem_ev_id, + old_mem_ev.membership, + old_mem_ev.event_id, ) - if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + if old_mem_ev is None or old_mem_ev.membership != Membership.JOIN: newly_joined_rooms.append(room_id) # If user is in the room then we don't need to do the invite/leave checks @@ -1891,17 +1883,13 @@ async def _classify_rooms_by_membership_changes( if has_join: newly_left_rooms.append(room_id) else: - if not old_state_ids: - old_state_ids = await self.get_state_at(room_id, since_token) - old_mem_ev_id = old_state_ids.get( - (EventTypes.Member, user_id), None - ) - old_mem_ev = None - if old_mem_ev_id: - old_mem_ev = await self.store.get_event( - old_mem_ev_id, allow_none=True - ) - if old_mem_ev and old_mem_ev.membership == Membership.JOIN: + old_mem_ev = await self._fetch_membership_event_at( + room_id, user_id, since_token + ) + if ( + old_mem_ev is not None + and old_mem_ev.membership == Membership.JOIN + ): newly_left_rooms.append(room_id) # 3. Should we add this room to `invited`? @@ -1984,6 +1972,24 @@ async def _classify_rooms_by_membership_changes( newly_left_rooms, ) + async def _fetch_membership_event_at( + self, room_id: str, user_id: str, since_token: StreamToken + ) -> Optional[EventBase]: + """What was the user's membership in this room at the given stream_token? + + Returns None if + - there was no membership for the user at the given time + - the user had a membership event, but we couldn't find it. + + Otherwise, returns the membership event itself. + """ + + old_state_ids = await self.get_state_at(room_id, since_token) + old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) + if old_mem_ev_id is not None: + return await self.store.get_event(old_mem_ev_id, allow_none=True) + return None + async def _get_all_rooms( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str] ) -> _RoomChanges: From 0d0783caa35c8483c5a9b2198a35e4a358d03115 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 7 Dec 2021 16:31:16 +0000 Subject: [PATCH 5/9] Always call `old_mem_ev`---exactly once. I claim this is identical to the existing behaviour. Proof: consider the boolean `room_id in sync_result_builder.joined_room_ids or has_join`. If this is true, we make the first call to `_fetch_membership_event_at`. Otherwise: - `room_id not in sync_result_builder.joined_room_ids` and `not has_join`. - The former means we continue on to inspect `events[-1].membership`. - This is not `"join"`, or else `room_id in sync_result_builder.joined_room_ids` would be true. - `has_join` is False, so we hit the `else` branch and make the second call to `_fetch_membership_event_at`. So, assuming we continue beyond the first `continue`, we always call fetch the old membership event exactly once. Do it up front to make the reader's life easier. --- synapse/handlers/sync.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a2045412d8fa..fc2562cd7b45 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1855,11 +1855,10 @@ async def _classify_rooms_by_membership_changes( # User is in the room so we don't need to do the invite/leave checks continue + old_mem_ev = await self._fetch_membership_event_at( + room_id, user_id, since_token + ) if room_id in sync_result_builder.joined_room_ids or has_join: - old_mem_ev = await self._fetch_membership_event_at( - room_id, user_id, since_token - ) - # debug for #4422 if has_join and old_mem_ev is not None: issue4422_logger.debug( @@ -1883,9 +1882,6 @@ async def _classify_rooms_by_membership_changes( if has_join: newly_left_rooms.append(room_id) else: - old_mem_ev = await self._fetch_membership_event_at( - room_id, user_id, since_token - ) if ( old_mem_ev is not None and old_mem_ev.membership == Membership.JOIN From a5deabbaae534449ef0570bdba135dda98c04959 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 7 Dec 2021 17:42:11 +0000 Subject: [PATCH 6/9] Remove check on `events[-1].membership` I claim this check is redundant, because said membership cannot be join. If not, we'd have `continue`d above because `room_id` would be in `sync_result_builder.joined_room_ids`. --- synapse/handlers/sync.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index fc2562cd7b45..967f88c52f6e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1878,15 +1878,11 @@ async def _classify_rooms_by_membership_changes( # 2. Should we add this to `newly_left_rooms`? # Check if we have left the room. This can either be because we were # joined before *or* that we since joined and then left. - if events[-1].membership != Membership.JOIN: - if has_join: + if has_join: + newly_left_rooms.append(room_id) + else: + if old_mem_ev is not None and old_mem_ev.membership == Membership.JOIN: newly_left_rooms.append(room_id) - else: - if ( - old_mem_ev is not None - and old_mem_ev.membership == Membership.JOIN - ): - newly_left_rooms.append(room_id) # 3. Should we add this room to `invited`? # Only bother if we're still currently invited From 50fee8941a19026e57a72c7e9cb3869cc281c4c8 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 7 Dec 2021 17:55:48 +0000 Subject: [PATCH 7/9] Small style rewrite to emphasise exclusive cases I prefer `if .. elif` where possible, because I know for sure that at most one branch will execute. --- synapse/handlers/sync.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 967f88c52f6e..72e3114b27f6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1885,20 +1885,15 @@ async def _classify_rooms_by_membership_changes( newly_left_rooms.append(room_id) # 3. Should we add this room to `invited`? - # Only bother if we're still currently invited last_non_join = non_joins[-1] - should_invite = last_non_join.membership == Membership.INVITE - if should_invite: + if last_non_join.membership == Membership.INVITE: if last_non_join.sender not in ignored_users: invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join) if invite_room_sync: invited.append(invite_room_sync) # 4. Should we add this room to `knocked`? - # Only bother if our latest membership in the room is knock (and we haven't - # been accepted/rejected in the meantime). - should_knock = last_non_join.membership == Membership.KNOCK - if should_knock: + elif last_non_join.membership == Membership.KNOCK: knock_room_sync = KnockedSyncResult(room_id, knock=last_non_join) if knock_room_sync: knocked.append(knock_room_sync) From 8bd35651f22e90743b5029e5651f6a27431e768e Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 7 Dec 2021 18:33:13 +0000 Subject: [PATCH 8/9] Changelog --- changelog.d/11532.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11532.misc diff --git a/changelog.d/11532.misc b/changelog.d/11532.misc new file mode 100644 index 000000000000..9054fadfe3fa --- /dev/null +++ b/changelog.d/11532.misc @@ -0,0 +1 @@ +Further refactors of the `/sync` handler. \ No newline at end of file From 824ae4e82cefccfe0fbd4ab1334ff7fe4cfb8d10 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 8 Dec 2021 13:35:21 +0000 Subject: [PATCH 9/9] docstring formatting Co-authored-by: Dan Callahan --- synapse/handlers/sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 72e3114b27f6..68f6e6f9b571 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -222,6 +222,7 @@ class _RoomChanges: join leave join invite join leave """ + newly_left_rooms: List[str] """Rooms we are not joined to at the `now_token` and left between `since` and `now`.