diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 73380e295dab..501dbbc99011 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -43,7 +43,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.events import EventBase, relation_from_event +from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.opentracing import ( @@ -431,22 +431,6 @@ async def enqueue( else: events.append(event) - # We expect events to be persisted by this point and this makes - # mypy happy about `stream_ordering` not being optional below - assert event.internal_metadata.stream_ordering is not None - # Invalidate related caches after we persist a new event - relation = relation_from_event(event) - self.main_store._invalidate_caches_for_event( - stream_ordering=event.internal_metadata.stream_ordering, - event_id=event.event_id, - room_id=event.room_id, - etype=event.type, - state_key=event.state_key if hasattr(event, "state_key") else None, - redacts=event.redacts, - relates_to=relation.parent_id if relation else None, - backfilled=backfilled, - ) - return ( events, self.main_store.get_room_max_token(), @@ -479,22 +463,6 @@ async def persist_event( replaced_event = replaced_events.get(event.event_id) if replaced_event: event = await self.main_store.get_event(replaced_event) - else: - # We expect events to be persisted by this point and this makes - # mypy happy about `stream_ordering` not being optional below - assert event.internal_metadata.stream_ordering is not None - # Invalidate related caches after we persist a new event - relation = relation_from_event(event) - self.main_store._invalidate_caches_for_event( - stream_ordering=event.internal_metadata.stream_ordering, - event_id=event.event_id, - room_id=event.room_id, - etype=event.type, - state_key=event.state_key if hasattr(event, "state_key") else None, - redacts=event.redacts, - relates_to=relation.parent_id if relation else None, - backfilled=backfilled, - ) event_stream_id = event.internal_metadata.stream_ordering # stream ordering should have been assigned by now diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1b54a2eb5768..1f1a7b754576 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -410,6 +410,31 @@ def _persist_events_txn( assert min_stream_order assert max_stream_order + # Once the txn completes, invalidate all of the relevant caches. Note that we do this + # up here because it captures all the events_and_contexts before any are removed. + for event, _ in events_and_contexts: + self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) + if event.redacts: + self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) + + relates_to = None + relation = relation_from_event(event) + if relation: + relates_to = relation.parent_id + + assert event.internal_metadata.stream_ordering is not None + txn.call_after( + self.store._invalidate_caches_for_event, + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.type, + getattr(event, "state_key", None), + event.redacts, + relates_to, + backfilled=False, + ) + self._update_forward_extremities_txn( txn, new_forward_extremities=new_forward_extremities,