diff --git a/changelog.d/7.misc b/changelog.d/7.misc new file mode 100644 index 00000000000..63f1fb77ffa --- /dev/null +++ b/changelog.d/7.misc @@ -0,0 +1 @@ +Faster partial join to room with complex auth graph. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 882be905dbb..398f19eec0b 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -94,7 +94,7 @@ ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched +from synapse.util.iterutils import batch_iter, partition, sorted_topologically from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -1678,57 +1678,36 @@ async def _auth_and_persist_outliers( # We need to persist an event's auth events before the event. auth_graph = { - ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map] + ev.event_id: [e_id for e_id in ev.auth_event_ids() if e_id in event_map] for ev in event_map.values() } - for roots in sorted_topologically_batched(event_map.values(), auth_graph): - if not roots: - # if *none* of the remaining events are ready, that means - # we have a loop. This either means a bug in our logic, or that - # somebody has managed to create a loop (which requires finding a - # hash collision in room v2 and later). - logger.warning( - "Loop found in auth events while fetching missing state/auth " - "events: %s", - shortstr(event_map.keys()), - ) - return - - logger.info( - "Persisting %i of %i remaining outliers: %s", - len(roots), - len(event_map), - shortstr(e.event_id for e in roots), - ) - - await self._auth_and_persist_outliers_inner(room_id, roots) - - async def _auth_and_persist_outliers_inner( - self, room_id: str, fetched_events: Collection[EventBase] - ) -> None: - """Helper for _auth_and_persist_outliers - - Persists a batch of events where we have (theoretically) already persisted all - of their auth events. - - Marks the events as outliers, auths them, persists them to the database, and, - where appropriate (eg, an invite), awakes the notifier. + sorted_auth_event_ids = sorted_topologically(event_map.keys(), auth_graph) + sorted_auth_events = [event_map[e_id] for e_id in sorted_auth_event_ids] + logger.info( + "Persisting %i remaining outliers: %s", + len(sorted_auth_events), + shortstr(e.event_id for e in sorted_auth_events), + ) - Params: - origin: where the events came from - room_id: the room that the events are meant to be in (though this has - not yet been checked) - fetched_events: the events to persist - """ # get all the auth events for all the events in this batch. By now, they should # have been persisted. - auth_events = { - aid for event in fetched_events for aid in event.auth_event_ids() + auth_event_ids = { + aid for event in sorted_auth_events for aid in event.auth_event_ids() + } + auth_map = { + ev.event_id: ev + for ev in sorted_auth_events + if ev.event_id in auth_event_ids } - persisted_events = await self._store.get_events( - auth_events, - allow_rejected=True, - ) + + missing_events = auth_event_ids.difference(auth_map) + if missing_events: + persisted_events = await self._store.get_events( + missing_events, + allow_rejected=True, + redact_behaviour=EventRedactBehaviour.as_is, + ) + auth_map.update(persisted_events) events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = [] @@ -1736,7 +1715,7 @@ async def prep(event: EventBase) -> None: with nested_logging_context(suffix=event.event_id): auth = [] for auth_event_id in event.auth_event_ids(): - ae = persisted_events.get(auth_event_id) + ae = auth_map.get(auth_event_id) if not ae: # the fact we can't find the auth event doesn't mean it doesn't # exist, which means it is premature to reject `event`. Instead we @@ -1755,7 +1734,9 @@ async def prep(event: EventBase) -> None: context = EventContext.for_outlier(self._storage_controllers) try: validate_event_for_room_version(event) - await check_state_independent_auth_rules(self._store, event) + await check_state_independent_auth_rules( + self._store, event, batched_auth_events=auth_map + ) check_state_dependent_auth_rules(event, auth) except AuthError as e: logger.warning("Rejecting %r because %s", event, e) @@ -1772,7 +1753,7 @@ async def prep(event: EventBase) -> None: events_and_contexts_to_persist.append((event, context)) - for event in fetched_events: + for event in sorted_auth_events: await prep(event) await self.persist_events_and_notify(