-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
A third batch of refactors to the sync handler #11532
Changes from all commits
7aee344
bca64f9
7662ba4
6ff95ee
0d0783c
a5deabb
50fee89
8bd3565
824ae4e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Further refactors of the `/sync` handler. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,7 +62,6 @@ | |
# Debug logger for https://github.com/matrix-org/synapse/issues/4422 | ||
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") | ||
|
||
|
||
# Counts the number of times we returned a non-empty sync. `type` is one of | ||
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is | ||
# "true" or "false" depending on if the request asked for lazy loaded members or | ||
|
@@ -83,7 +82,6 @@ | |
# avoiding redundantly sending the same lazy-loaded members to the client | ||
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 | ||
|
||
|
||
SyncRequestKey = Tuple[Any, ...] | ||
|
||
|
||
|
@@ -206,9 +204,42 @@ class _RoomChanges: | |
|
||
room_entries: List["RoomSyncResultBuilder"] | ||
invited: List[InvitedSyncResult] | ||
"""Our outstanding invitations at the `now_token`.""" | ||
|
||
knocked: List[KnockedSyncResult] | ||
"""Rooms we have knocked on at the `now_token`.""" | ||
|
||
newly_joined_rooms: List[str] | ||
"""Rooms we joined at some point between `since` and `now`. | ||
|
||
Note: we need not be joined to these rooms at the `since` or `now` tokens. | ||
Some examples: | ||
|
||
Since Midway Now | ||
-------------------------- | ||
<none> join | ||
invite join | ||
join leave join | ||
invite join leave | ||
""" | ||
|
||
newly_left_rooms: List[str] | ||
"""Rooms we are not joined to at the `now_token` and left between `since` and `now`. | ||
|
||
"Left" means "membership changed from 'join` to something else". It's not the same | ||
as moving to the membership `leave`. | ||
|
||
Note: we need not have membership "leave" at the `since` or `now` tokens. | ||
Some examples: | ||
Since Midway Now | ||
-------------------------- | ||
join leave | ||
join ban | ||
invite join leave | ||
leave join leave | ||
join leave invite | ||
join leave knock | ||
""" | ||
|
||
|
||
@attr.s(slots=True, frozen=True, auto_attribs=True) | ||
|
@@ -1072,7 +1103,7 @@ async def generate_sync_result( | |
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` | ||
instance to signify that the sync calculation is complete. | ||
""" | ||
# NB: The now_token gets changed by some of the generate_sync_* methods, | ||
# NB: Parts of the now_token get changed by some of the generate_sync_* methods, | ||
# this is due to some of the underlying streams not supporting the ability | ||
# to query up to a given point. | ||
# Always use the `now_token` in `SyncResultBuilder` | ||
|
@@ -1093,6 +1124,8 @@ async def generate_sync_result( | |
# See https://github.com/matrix-org/matrix-doc/issues/1144 | ||
raise NotImplementedError() | ||
else: | ||
# The `room_key` part of the `now_token` is not changed by the sync | ||
# machinery. If it did, `joined_room_ids` could become out of date. | ||
joined_room_ids = await self.get_rooms_for_user_at( | ||
user_id, now_token.room_key | ||
) | ||
|
@@ -1684,7 +1717,7 @@ async def _get_rooms_changed( | |
now_token = sync_result_builder.now_token | ||
sync_config = sync_result_builder.sync_config | ||
|
||
assert since_token | ||
assert since_token is not None | ||
|
||
# The spec | ||
# https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync | ||
|
@@ -1703,15 +1736,99 @@ async def _get_rooms_changed( | |
user_id, since_token.room_key, now_token.room_key | ||
) | ||
|
||
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} | ||
for event in membership_change_events: | ||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) | ||
room_changes = await self._classify_rooms_by_membership_changes( | ||
sync_result_builder, membership_change_events, ignored_users | ||
) | ||
|
||
timeline_limit = sync_config.filter_collection.timeline_limit() | ||
|
||
# Get all events since the `from_key` in rooms we're currently joined to. | ||
# If there are too many, we get the most recent events only. This leaves | ||
# a "gap" in the timeline, as described by the spec for /sync. | ||
room_to_events = await self.store.get_room_events_stream_for_rooms( | ||
room_ids=sync_result_builder.joined_room_ids, | ||
from_key=since_token.room_key, | ||
to_key=now_token.room_key, | ||
limit=timeline_limit + 1, | ||
) | ||
|
||
# We loop through all room ids, even if there are no new events, in case | ||
# there are non room events that we need to notify about. | ||
for room_id in sync_result_builder.joined_room_ids: | ||
room_entry = room_to_events.get(room_id, None) | ||
|
||
newly_joined = room_id in room_changes.newly_joined_rooms | ||
if room_entry: | ||
events, start_key = room_entry | ||
|
||
prev_batch_token = now_token.copy_and_replace("room_key", start_key) | ||
|
||
entry = RoomSyncResultBuilder( | ||
room_id=room_id, | ||
rtype="joined", | ||
events=events, | ||
newly_joined=newly_joined, | ||
full_state=False, | ||
since_token=None if newly_joined else since_token, | ||
upto_token=prev_batch_token, | ||
) | ||
else: | ||
entry = RoomSyncResultBuilder( | ||
room_id=room_id, | ||
rtype="joined", | ||
events=[], | ||
newly_joined=newly_joined, | ||
full_state=False, | ||
since_token=since_token, | ||
upto_token=since_token, | ||
) | ||
|
||
if newly_joined: | ||
# debugging for https://github.com/matrix-org/synapse/issues/4422 | ||
issue4422_logger.debug( | ||
"RoomSyncResultBuilder events for newly joined room %s: %r", | ||
room_id, | ||
entry.events, | ||
) | ||
room_changes.room_entries.append(entry) | ||
|
||
return room_changes | ||
|
||
async def _classify_rooms_by_membership_changes( | ||
self, | ||
sync_result_builder: "SyncResultBuilder", | ||
membership_change_events: List[EventBase], | ||
ignored_users: Collection[str], | ||
) -> _RoomChanges: | ||
"""Classify each room by the membership changes from `since` upto `now`. | ||
|
||
Rooms are grouped by the user's membership at the `now_token`, either "invite", | ||
"join", "leave" or "knock". | ||
|
||
Invite and knock are the simplest: to include these in the sync body, we need | ||
just the room ID and the invite/knock event. | ||
|
||
See the _RoomChanges struct for the meaning of the five lists we build up and | ||
return. | ||
""" | ||
since_token = sync_result_builder.since_token | ||
# This assetion is also made in the caller, `_get_rooms_changed`. We repeat it | ||
# here for mypy's benefit. | ||
assert since_token is not None | ||
|
||
user_id = sync_result_builder.sync_config.user.to_string() | ||
|
||
newly_joined_rooms: List[str] = [] | ||
newly_left_rooms: List[str] = [] | ||
room_entries: List[RoomSyncResultBuilder] = [] | ||
invited: List[InvitedSyncResult] = [] | ||
knocked: List[KnockedSyncResult] = [] | ||
|
||
# 0. Do a first pass to group the events by room id. | ||
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} | ||
for event in membership_change_events: | ||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) | ||
|
||
for room_id, events in mem_change_events_by_room_id.items(): | ||
# The body of this loop will add this room to at least one of the five lists | ||
# above. Things get messy if you've e.g. joined, left, joined then left the | ||
|
@@ -1725,11 +1842,10 @@ async def _get_rooms_changed( | |
non_joins = [e for e in events if e.membership != Membership.JOIN] | ||
has_join = len(non_joins) != len(events) | ||
|
||
# 1. Should we add this room to `newly_joined_rooms`? | ||
# We want to figure out if we joined the room at some point since | ||
# the last sync (even if we have since left). This is to make sure | ||
# we do send down the room, and with full state, where necessary | ||
|
||
old_state_ids = None | ||
# we do send down the room, and with full state, where necessary. | ||
if room_id in sync_result_builder.joined_room_ids and non_joins: | ||
# Always include if the user (re)joined the room, especially | ||
# important so that device list changes are calculated correctly. | ||
|
@@ -1740,73 +1856,50 @@ async def _get_rooms_changed( | |
# User is in the room so we don't need to do the invite/leave checks | ||
continue | ||
|
||
old_mem_ev = await self._fetch_membership_event_at( | ||
room_id, user_id, since_token | ||
) | ||
if room_id in sync_result_builder.joined_room_ids or has_join: | ||
old_state_ids = await self.get_state_at(room_id, since_token) | ||
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) | ||
old_mem_ev = None | ||
if old_mem_ev_id: | ||
old_mem_ev = await self.store.get_event( | ||
old_mem_ev_id, allow_none=True | ||
) | ||
|
||
# debug for #4422 | ||
if has_join: | ||
prev_membership = None | ||
if old_mem_ev: | ||
prev_membership = old_mem_ev.membership | ||
if has_join and old_mem_ev is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a change in behaviour, where we will no longer log for joins where there was no previous membership event (ie, first-time joins). But #4422 got fixed a long time ago - I suggest you rip this whole thing out rather than trying to maintain it. |
||
issue4422_logger.debug( | ||
"Previous membership for room %s with join: %s (event %s)", | ||
room_id, | ||
prev_membership, | ||
old_mem_ev_id, | ||
old_mem_ev.membership, | ||
old_mem_ev.event_id, | ||
) | ||
|
||
if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: | ||
if old_mem_ev is None or old_mem_ev.membership != Membership.JOIN: | ||
newly_joined_rooms.append(room_id) | ||
|
||
# If user is in the room then we don't need to do the invite/leave checks | ||
if room_id in sync_result_builder.joined_room_ids: | ||
continue | ||
|
||
if not non_joins: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in the commit comment for this change:
to be more precise: all membership events for the calling user. But I think that's what you meant.
hrrrm. I assert it is possible for
Now, that's certainly an edge case, and I haven't run through the entire logic here to see if it doesn't get caught somewhere else, but it makes me hesitate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh bugger. Thanks for spotting this (and hopefully spelling out the logic made it easier to spot the flaw in my reasoning). I think this is arcane enough to deserve a test case. I'll see if I can cook up something in complement if I get time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a reliable test case might be tricky, since I suspect you need to exploit a race to make it happen :-S There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the race, sorry? Oh---I guess the initial checks for "are you banned from this room" need to pass, but then the ban event comes in before your join event is accepted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
s/accepted/persisted/, but pretty much, yes. The case above relies on your homeserver generating a join event with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. Sounds like a Synapse-specific test is more appropriate then. |
||
continue | ||
last_non_join = non_joins[-1] | ||
|
||
# 2. Should we add this to `newly_left_rooms`? | ||
# Check if we have left the room. This can either be because we were | ||
# joined before *or* that we since joined and then left. | ||
if events[-1].membership != Membership.JOIN: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this has the same problem as bca64f9. |
||
if has_join: | ||
if has_join: | ||
newly_left_rooms.append(room_id) | ||
else: | ||
if old_mem_ev is not None and old_mem_ev.membership == Membership.JOIN: | ||
newly_left_rooms.append(room_id) | ||
else: | ||
if not old_state_ids: | ||
old_state_ids = await self.get_state_at(room_id, since_token) | ||
old_mem_ev_id = old_state_ids.get( | ||
(EventTypes.Member, user_id), None | ||
) | ||
old_mem_ev = None | ||
if old_mem_ev_id: | ||
old_mem_ev = await self.store.get_event( | ||
old_mem_ev_id, allow_none=True | ||
) | ||
if old_mem_ev and old_mem_ev.membership == Membership.JOIN: | ||
newly_left_rooms.append(room_id) | ||
|
||
# Only bother if we're still currently invited | ||
should_invite = last_non_join.membership == Membership.INVITE | ||
if should_invite: | ||
# 3. Should we add this room to `invited`? | ||
last_non_join = non_joins[-1] | ||
if last_non_join.membership == Membership.INVITE: | ||
if last_non_join.sender not in ignored_users: | ||
invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join) | ||
if invite_room_sync: | ||
invited.append(invite_room_sync) | ||
|
||
# Only bother if our latest membership in the room is knock (and we haven't | ||
# been accepted/rejected in the meantime). | ||
should_knock = last_non_join.membership == Membership.KNOCK | ||
if should_knock: | ||
# 4. Should we add this room to `knocked`? | ||
elif last_non_join.membership == Membership.KNOCK: | ||
knock_room_sync = KnockedSyncResult(room_id, knock=last_non_join) | ||
if knock_room_sync: | ||
knocked.append(knock_room_sync) | ||
|
||
# 5. Do we need to add this to `room_entries`? | ||
# Always include leave/ban events. Just take the last one. | ||
# TODO: How do we handle ban -> leave in same batch? | ||
leave_events = [ | ||
|
@@ -1859,58 +1952,6 @@ async def _get_rooms_changed( | |
) | ||
) | ||
|
||
timeline_limit = sync_config.filter_collection.timeline_limit() | ||
|
||
# Get all events since the `from_key` in rooms we're currently joined to. | ||
# If there are too many, we get the most recent events only. This leaves | ||
# a "gap" in the timeline, as described by the spec for /sync. | ||
room_to_events = await self.store.get_room_events_stream_for_rooms( | ||
room_ids=sync_result_builder.joined_room_ids, | ||
from_key=since_token.room_key, | ||
to_key=now_token.room_key, | ||
limit=timeline_limit + 1, | ||
) | ||
|
||
# We loop through all room ids, even if there are no new events, in case | ||
# there are non room events that we need to notify about. | ||
for room_id in sync_result_builder.joined_room_ids: | ||
room_entry = room_to_events.get(room_id, None) | ||
|
||
newly_joined = room_id in newly_joined_rooms | ||
if room_entry: | ||
events, start_key = room_entry | ||
|
||
prev_batch_token = now_token.copy_and_replace("room_key", start_key) | ||
|
||
entry = RoomSyncResultBuilder( | ||
room_id=room_id, | ||
rtype="joined", | ||
events=events, | ||
newly_joined=newly_joined, | ||
full_state=False, | ||
since_token=None if newly_joined else since_token, | ||
upto_token=prev_batch_token, | ||
) | ||
else: | ||
entry = RoomSyncResultBuilder( | ||
room_id=room_id, | ||
rtype="joined", | ||
events=[], | ||
newly_joined=newly_joined, | ||
full_state=False, | ||
since_token=since_token, | ||
upto_token=since_token, | ||
) | ||
|
||
if newly_joined: | ||
# debugging for https://github.com/matrix-org/synapse/issues/4422 | ||
issue4422_logger.debug( | ||
"RoomSyncResultBuilder events for newly joined room %s: %r", | ||
room_id, | ||
entry.events, | ||
) | ||
room_entries.append(entry) | ||
|
||
return _RoomChanges( | ||
room_entries, | ||
invited, | ||
|
@@ -1919,6 +1960,24 @@ async def _get_rooms_changed( | |
newly_left_rooms, | ||
) | ||
|
||
async def _fetch_membership_event_at( | ||
self, room_id: str, user_id: str, since_token: StreamToken | ||
) -> Optional[EventBase]: | ||
"""What was the user's membership in this room at the given stream_token? | ||
|
||
Returns None if | ||
- there was no membership for the user at the given time | ||
- the user had a membership event, but we couldn't find it. | ||
|
||
Otherwise, returns the membership event itself. | ||
""" | ||
|
||
old_state_ids = await self.get_state_at(room_id, since_token) | ||
old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) | ||
if old_mem_ev_id is not None: | ||
return await self.store.get_event(old_mem_ev_id, allow_none=True) | ||
return None | ||
|
||
async def _get_all_rooms( | ||
self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str] | ||
) -> _RoomChanges: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.