This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Improve performance of backfilling in large rooms. #9935
Merged
Merged
Changes from 3 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Improve performance of backfilling in large rooms. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -552,17 +552,20 @@ async def _get_state_for_room( | |
destination: str, | ||
room_id: str, | ||
event_id: str, | ||
) -> Tuple[List[EventBase], List[EventBase]]: | ||
"""Requests all of the room state at a given event from a remote homeserver. | ||
) -> List[EventBase]: | ||
"""Requests all of the room state at a given event from a remote | ||
homeserver. | ||
|
||
Will also fetch any missing events reported in the `auth_chain_ids` | ||
section of `/state_ids`. | ||
|
||
Args: | ||
destination: The remote homeserver to query for the state. | ||
room_id: The id of the room we're interested in. | ||
event_id: The id of the event we want the state at. | ||
|
||
Returns: | ||
A list of events in the state, not including the event itself, and | ||
a list of events in the auth chain for the given event. | ||
A list of events in the state, not including the event itself. | ||
""" | ||
( | ||
state_event_ids, | ||
|
@@ -571,77 +574,64 @@ async def _get_state_for_room( | |
destination, room_id, event_id=event_id | ||
) | ||
|
||
desired_events = set(state_event_ids + auth_event_ids) | ||
# Fetch the state events from the DB, and check we have the auth events. | ||
event_map = await self.store.get_events(state_event_ids, allow_rejected=True) | ||
auth_events_in_store = await self.store.have_seen_events(auth_event_ids) | ||
|
||
event_map = await self._get_events_from_store_or_dest( | ||
destination, room_id, desired_events | ||
) | ||
|
||
failed_to_fetch = desired_events - event_map.keys() | ||
if failed_to_fetch: | ||
logger.warning( | ||
"Failed to fetch missing state/auth events for %s %s", | ||
event_id, | ||
failed_to_fetch, | ||
# Check for missing events. We handle state and auth event seperately, | ||
# as we want to pull the state from the DB, but we don't for the auth | ||
# events. (Note: we likely won't use the majority of the auth chain, and | ||
# it can be *huge* for large rooms, so it's worth ensuring that we don't | ||
# unnecessarily pull it from the DB). | ||
missing_state_events = set(event_map) - set(state_event_ids) | ||
missing_auth_events = set(auth_event_ids) - set(auth_events_in_store) | ||
if missing_state_events or missing_auth_events: | ||
await self._get_events_and_persist( | ||
destination=destination, | ||
room_id=room_id, | ||
events=missing_state_events | missing_auth_events, | ||
) | ||
|
||
remote_state = [ | ||
event_map[e_id] for e_id in state_event_ids if e_id in event_map | ||
] | ||
|
||
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] | ||
auth_chain.sort(key=lambda e: e.depth) | ||
|
||
return remote_state, auth_chain | ||
|
||
async def _get_events_from_store_or_dest( | ||
self, destination: str, room_id: str, event_ids: Iterable[str] | ||
) -> Dict[str, EventBase]: | ||
"""Fetch events from a remote destination, checking if we already have them. | ||
|
||
Persists any events we don't already have as outliers. | ||
|
||
If we fail to fetch any of the events, a warning will be logged, and the event | ||
will be omitted from the result. Likewise, any events which turn out not to | ||
be in the given room. | ||
if missing_state_events: | ||
new_events = await self.store.get_events( | ||
state_event_ids, allow_rejected=True | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this only fetch the new events instead of all events again? |
||
event_map.update(new_events) | ||
|
||
This function *does not* automatically get missing auth events of the | ||
newly fetched events. Callers must include the full auth chain of | ||
of the missing events in the `event_ids` argument, to ensure that any | ||
missing auth events are correctly fetched. | ||
missing_state_events.difference_update(new_events) | ||
|
||
Returns: | ||
map from event_id to event | ||
""" | ||
fetched_events = await self.store.get_events(event_ids, allow_rejected=True) | ||
|
||
missing_events = set(event_ids) - fetched_events.keys() | ||
if missing_state_events: | ||
logger.warning( | ||
"Failed to fetch missing state events for %s %s", | ||
event_id, | ||
missing_state_events, | ||
) | ||
|
||
if missing_events: | ||
logger.debug( | ||
"Fetching unknown state/auth events %s for room %s", | ||
missing_events, | ||
room_id, | ||
) | ||
if missing_auth_events: | ||
auth_events_in_store = await self.store.have_seen_events( | ||
missing_auth_events | ||
) | ||
missing_auth_events.difference_update(auth_events_in_store) | ||
|
||
await self._get_events_and_persist( | ||
destination=destination, room_id=room_id, events=missing_events | ||
) | ||
if missing_auth_events: | ||
logger.warning( | ||
"Failed to fetch missing auth events for %s %s", | ||
event_id, | ||
missing_auth_events, | ||
) | ||
|
||
# we need to make sure we re-load from the database to get the rejected | ||
# state correct. | ||
fetched_events.update( | ||
(await self.store.get_events(missing_events, allow_rejected=True)) | ||
) | ||
remote_state = [ | ||
event_map[e_id] for e_id in state_event_ids if e_id in event_map | ||
] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this pretty much |
||
|
||
# check for events which were in the wrong room. | ||
# | ||
# this can happen if a remote server claims that the state or | ||
# auth_events at an event in room A are actually events in room B | ||
|
||
bad_events = [ | ||
(event_id, event.room_id) | ||
for event_id, event in fetched_events.items() | ||
(event.event_id, event.room_id) | ||
for event in remote_state | ||
if event.room_id != room_id | ||
] | ||
|
||
|
@@ -658,9 +648,10 @@ async def _get_events_from_store_or_dest( | |
room_id, | ||
) | ||
|
||
del fetched_events[bad_event_id] | ||
if bad_events: | ||
remote_state = [e for e in remote_state if e.room_id == room_id] | ||
|
||
return fetched_events | ||
return remote_state | ||
|
||
async def _get_state_after_missing_prev_event( | ||
self, | ||
|
@@ -963,27 +954,23 @@ async def backfill( | |
|
||
# For each edge get the current state. | ||
|
||
auth_events = {} | ||
state_events = {} | ||
events_to_state = {} | ||
for e_id in edges: | ||
state, auth = await self._get_state_for_room( | ||
state = await self._get_state_for_room( | ||
destination=dest, | ||
room_id=room_id, | ||
event_id=e_id, | ||
) | ||
auth_events.update({a.event_id: a for a in auth}) | ||
auth_events.update({s.event_id: s for s in state}) | ||
state_events.update({s.event_id: s for s in state}) | ||
events_to_state[e_id] = state | ||
|
||
required_auth = { | ||
a_id | ||
for event in events | ||
+ list(state_events.values()) | ||
+ list(auth_events.values()) | ||
for event in events + list(state_events.values()) | ||
for a_id in event.auth_event_ids() | ||
} | ||
auth_events = await self.store.get_events(required_auth, allow_rejected=True) | ||
auth_events.update( | ||
{e_id: event_map[e_id] for e_id in required_auth if e_id in event_map} | ||
) | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be
set(state_event_ids) - set(event_map)
? I think right nowmissing_state_events
will always be empty (sinceevent_map
by definition could only possibly have things that were instate_event_ids
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err, yes 🤦