diff --git a/changelog.d/11532.misc b/changelog.d/11532.misc new file mode 100644 index 000000000000..9054fadfe3fa --- /dev/null +++ b/changelog.d/11532.misc @@ -0,0 +1 @@ +Further refactors of the `/sync` handler. \ No newline at end of file diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f3039c3c3fb7..68f6e6f9b571 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -62,7 +62,6 @@ # Debug logger for https://github.com/matrix-org/synapse/issues/4422 issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") - # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is # "true" or "false" depending on if the request asked for lazy loaded members or @@ -83,7 +82,6 @@ # avoiding redundantly sending the same lazy-loaded members to the client LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 - SyncRequestKey = Tuple[Any, ...] @@ -206,9 +204,42 @@ class _RoomChanges: room_entries: List["RoomSyncResultBuilder"] invited: List[InvitedSyncResult] + """Our outstanding invitations at the `now_token`.""" + knocked: List[KnockedSyncResult] + """Rooms we have knocked on at the `now_token`.""" + newly_joined_rooms: List[str] + """Rooms we joined at some point between `since` and `now`. + + Note: we need not be joined to these rooms at the `since` or `now` tokens. + Some examples: + + Since Midway Now + -------------------------- + join + invite join + join leave join + invite join leave + """ + newly_left_rooms: List[str] + """Rooms we are not joined to at the `now_token` and left between `since` and `now`. + + "Left" means "membership changed from 'join` to something else". It's not the same + as moving to the membership `leave`. + + Note: we need not have membership "leave" at the `since` or `now` tokens. + Some examples: + Since Midway Now + -------------------------- + join leave + join ban + invite join leave + leave join leave + join leave invite + join leave knock + """ @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -1072,7 +1103,7 @@ async def generate_sync_result( At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` instance to signify that the sync calculation is complete. """ - # NB: The now_token gets changed by some of the generate_sync_* methods, + # NB: Parts of the now_token get changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` @@ -1093,6 +1124,8 @@ async def generate_sync_result( # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() else: + # The `room_key` part of the `now_token` is not changed by the sync + # machinery. If it did, `joined_room_ids` could become out of date. joined_room_ids = await self.get_rooms_for_user_at( user_id, now_token.room_key ) @@ -1684,7 +1717,7 @@ async def _get_rooms_changed( now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config - assert since_token + assert since_token is not None # The spec # https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync @@ -1703,15 +1736,99 @@ async def _get_rooms_changed( user_id, since_token.room_key, now_token.room_key ) - mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} - for event in membership_change_events: - mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + room_changes = await self._classify_rooms_by_membership_changes( + sync_result_builder, membership_change_events, ignored_users + ) + + timeline_limit = sync_config.filter_collection.timeline_limit() + + # Get all events since the `from_key` in rooms we're currently joined to. + # If there are too many, we get the most recent events only. This leaves + # a "gap" in the timeline, as described by the spec for /sync. + room_to_events = await self.store.get_room_events_stream_for_rooms( + room_ids=sync_result_builder.joined_room_ids, + from_key=since_token.room_key, + to_key=now_token.room_key, + limit=timeline_limit + 1, + ) + + # We loop through all room ids, even if there are no new events, in case + # there are non room events that we need to notify about. + for room_id in sync_result_builder.joined_room_ids: + room_entry = room_to_events.get(room_id, None) + + newly_joined = room_id in room_changes.newly_joined_rooms + if room_entry: + events, start_key = room_entry + + prev_batch_token = now_token.copy_and_replace("room_key", start_key) + + entry = RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=events, + newly_joined=newly_joined, + full_state=False, + since_token=None if newly_joined else since_token, + upto_token=prev_batch_token, + ) + else: + entry = RoomSyncResultBuilder( + room_id=room_id, + rtype="joined", + events=[], + newly_joined=newly_joined, + full_state=False, + since_token=since_token, + upto_token=since_token, + ) + + if newly_joined: + # debugging for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "RoomSyncResultBuilder events for newly joined room %s: %r", + room_id, + entry.events, + ) + room_changes.room_entries.append(entry) + + return room_changes + + async def _classify_rooms_by_membership_changes( + self, + sync_result_builder: "SyncResultBuilder", + membership_change_events: List[EventBase], + ignored_users: Collection[str], + ) -> _RoomChanges: + """Classify each room by the membership changes from `since` upto `now`. + + Rooms are grouped by the user's membership at the `now_token`, either "invite", + "join", "leave" or "knock". + + Invite and knock are the simplest: to include these in the sync body, we need + just the room ID and the invite/knock event. + + See the _RoomChanges struct for the meaning of the five lists we build up and + return. + """ + since_token = sync_result_builder.since_token + # This assetion is also made in the caller, `_get_rooms_changed`. We repeat it + # here for mypy's benefit. + assert since_token is not None + + user_id = sync_result_builder.sync_config.user.to_string() newly_joined_rooms: List[str] = [] newly_left_rooms: List[str] = [] room_entries: List[RoomSyncResultBuilder] = [] invited: List[InvitedSyncResult] = [] knocked: List[KnockedSyncResult] = [] + + # 0. Do a first pass to group the events by room id. + mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} + for event in membership_change_events: + mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) + for room_id, events in mem_change_events_by_room_id.items(): # The body of this loop will add this room to at least one of the five lists # above. Things get messy if you've e.g. joined, left, joined then left the @@ -1725,11 +1842,10 @@ async def _get_rooms_changed( non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) + # 1. Should we add this room to `newly_joined_rooms`? # We want to figure out if we joined the room at some point since # the last sync (even if we have since left). This is to make sure - # we do send down the room, and with full state, where necessary - - old_state_ids = None + # we do send down the room, and with full state, where necessary. if room_id in sync_result_builder.joined_room_ids and non_joins: # Always include if the user (re)joined the room, especially # important so that device list changes are calculated correctly. @@ -1740,73 +1856,50 @@ async def _get_rooms_changed( # User is in the room so we don't need to do the invite/leave checks continue + old_mem_ev = await self._fetch_membership_event_at( + room_id, user_id, since_token + ) if room_id in sync_result_builder.joined_room_ids or has_join: - old_state_ids = await self.get_state_at(room_id, since_token) - old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) - old_mem_ev = None - if old_mem_ev_id: - old_mem_ev = await self.store.get_event( - old_mem_ev_id, allow_none=True - ) - # debug for #4422 - if has_join: - prev_membership = None - if old_mem_ev: - prev_membership = old_mem_ev.membership + if has_join and old_mem_ev is not None: issue4422_logger.debug( "Previous membership for room %s with join: %s (event %s)", room_id, - prev_membership, - old_mem_ev_id, + old_mem_ev.membership, + old_mem_ev.event_id, ) - if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: + if old_mem_ev is None or old_mem_ev.membership != Membership.JOIN: newly_joined_rooms.append(room_id) # If user is in the room then we don't need to do the invite/leave checks if room_id in sync_result_builder.joined_room_ids: continue - if not non_joins: - continue - last_non_join = non_joins[-1] - + # 2. Should we add this to `newly_left_rooms`? # Check if we have left the room. This can either be because we were # joined before *or* that we since joined and then left. - if events[-1].membership != Membership.JOIN: - if has_join: + if has_join: + newly_left_rooms.append(room_id) + else: + if old_mem_ev is not None and old_mem_ev.membership == Membership.JOIN: newly_left_rooms.append(room_id) - else: - if not old_state_ids: - old_state_ids = await self.get_state_at(room_id, since_token) - old_mem_ev_id = old_state_ids.get( - (EventTypes.Member, user_id), None - ) - old_mem_ev = None - if old_mem_ev_id: - old_mem_ev = await self.store.get_event( - old_mem_ev_id, allow_none=True - ) - if old_mem_ev and old_mem_ev.membership == Membership.JOIN: - newly_left_rooms.append(room_id) - # Only bother if we're still currently invited - should_invite = last_non_join.membership == Membership.INVITE - if should_invite: + # 3. Should we add this room to `invited`? + last_non_join = non_joins[-1] + if last_non_join.membership == Membership.INVITE: if last_non_join.sender not in ignored_users: invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join) if invite_room_sync: invited.append(invite_room_sync) - # Only bother if our latest membership in the room is knock (and we haven't - # been accepted/rejected in the meantime). - should_knock = last_non_join.membership == Membership.KNOCK - if should_knock: + # 4. Should we add this room to `knocked`? + elif last_non_join.membership == Membership.KNOCK: knock_room_sync = KnockedSyncResult(room_id, knock=last_non_join) if knock_room_sync: knocked.append(knock_room_sync) + # 5. Do we need to add this to `room_entries`? # Always include leave/ban events. Just take the last one. # TODO: How do we handle ban -> leave in same batch? leave_events = [ @@ -1859,58 +1952,6 @@ async def _get_rooms_changed( ) ) - timeline_limit = sync_config.filter_collection.timeline_limit() - - # Get all events since the `from_key` in rooms we're currently joined to. - # If there are too many, we get the most recent events only. This leaves - # a "gap" in the timeline, as described by the spec for /sync. - room_to_events = await self.store.get_room_events_stream_for_rooms( - room_ids=sync_result_builder.joined_room_ids, - from_key=since_token.room_key, - to_key=now_token.room_key, - limit=timeline_limit + 1, - ) - - # We loop through all room ids, even if there are no new events, in case - # there are non room events that we need to notify about. - for room_id in sync_result_builder.joined_room_ids: - room_entry = room_to_events.get(room_id, None) - - newly_joined = room_id in newly_joined_rooms - if room_entry: - events, start_key = room_entry - - prev_batch_token = now_token.copy_and_replace("room_key", start_key) - - entry = RoomSyncResultBuilder( - room_id=room_id, - rtype="joined", - events=events, - newly_joined=newly_joined, - full_state=False, - since_token=None if newly_joined else since_token, - upto_token=prev_batch_token, - ) - else: - entry = RoomSyncResultBuilder( - room_id=room_id, - rtype="joined", - events=[], - newly_joined=newly_joined, - full_state=False, - since_token=since_token, - upto_token=since_token, - ) - - if newly_joined: - # debugging for https://github.com/matrix-org/synapse/issues/4422 - issue4422_logger.debug( - "RoomSyncResultBuilder events for newly joined room %s: %r", - room_id, - entry.events, - ) - room_entries.append(entry) - return _RoomChanges( room_entries, invited, @@ -1919,6 +1960,24 @@ async def _get_rooms_changed( newly_left_rooms, ) + async def _fetch_membership_event_at( + self, room_id: str, user_id: str, since_token: StreamToken + ) -> Optional[EventBase]: + """What was the user's membership in this room at the given stream_token? + + Returns None if + - there was no membership for the user at the given time + - the user had a membership event, but we couldn't find it. + + Otherwise, returns the membership event itself. + """ + + old_state_ids = await self.get_state_at(room_id, since_token) + old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) + if old_mem_ev_id is not None: + return await self.store.get_event(old_mem_ev_id, allow_none=True) + return None + async def _get_all_rooms( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str] ) -> _RoomChanges: