Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Simplify cache invalidation after event persist txn
Browse files Browse the repository at this point in the history
  • Loading branch information
Fizzadar committed Sep 13, 2022
1 parent 9772e36 commit 956b47a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 95 deletions.
1 change: 1 addition & 0 deletions changelog.d/13796.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Deleteme.
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def _invalidate_caches_for_event(
self.get_applicable_edit.invalidate((relates_to,))
self.get_thread_summary.invalidate((relates_to,))
self.get_thread_participated.invalidate((relates_to,))
self.get_mutual_event_relations_for_rel_type.invalidate((relates_to,))

async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
Expand Down
121 changes: 26 additions & 95 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from prometheus_client import Counter

import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.constants import EventContentFields, EventTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
Expand Down Expand Up @@ -459,8 +459,33 @@ def _persist_events_txn(

# We call this last as it assumes we've inserted the events into
# room_memberships, where applicable.
# NB: This function invalidates all state related caches
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)

# Finally, invalidate relevant caches for each event once the txn completes
for event, _ in events_and_contexts:
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
if event.redacts:
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)

relates_to = None
relation = relation_from_event(event)
if relation:
relates_to = relation.parent_id

assert event.internal_metadata.stream_ordering is not None
txn.call_after(
self.store._invalidate_caches_for_event,
event.internal_metadata.stream_ordering,
event.event_id,
event.room_id,
event.type,
getattr(event, "state_key", None),
event.redacts,
relates_to,
backfilled=False,
)

def _persist_event_auth_chain_txn(
self,
txn: LoggingTransaction,
Expand Down Expand Up @@ -1222,9 +1247,6 @@ def _update_forward_extremities_txn(
self.db_pool.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
txn.call_after(
self.store.get_latest_event_ids_in_room.invalidate, (room_id,)
)

self.db_pool.simple_insert_many_txn(
txn,
Expand Down Expand Up @@ -1294,8 +1316,6 @@ def _update_room_depths_txn(
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
Expand Down Expand Up @@ -1697,16 +1717,7 @@ async def prefill() -> None:
txn.async_call_after(prefill)

def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
"""Invalidate the caches for the redacted event.
Note that these caches are also cleared as part of event replication in
_invalidate_caches_for_event.
"""
assert event.redacts is not None
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))

self.db_pool.simple_upsert_txn(
txn,
table="redactions",
Expand Down Expand Up @@ -1807,35 +1818,11 @@ def _store_room_members_txn(

for event in events:
assert event.internal_metadata.stream_ordering is not None
txn.call_after(
self.store._membership_stream_cache.entity_has_changed,
event.state_key,
event.internal_metadata.stream_ordering,
)
txn.call_after(
self.store.get_invited_rooms_for_local_user.invalidate,
(event.state_key,),
)
txn.call_after(
self.store.get_local_users_in_room.invalidate,
(event.room_id,),
)
txn.call_after(
self.store.get_number_joined_users_in_room.invalidate,
(event.room_id,),
)
txn.call_after(
self.store.get_user_in_room_with_profile.invalidate,
(event.room_id, event.state_key),
)

# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
txn.call_after(
self.store._get_membership_from_event_id.invalidate,
(event.event_id,),
)

# We update the local_current_membership table only if the event is
# "current", i.e., its something that has just happened.
#
Expand Down Expand Up @@ -1883,35 +1870,6 @@ def _handle_event_relations(
},
)

txn.call_after(
self.store.get_relations_for_event.invalidate, (relation.parent_id,)
)
txn.call_after(
self.store.get_aggregation_groups_for_event.invalidate,
(relation.parent_id,),
)
txn.call_after(
self.store.get_mutual_event_relations_for_rel_type.invalidate,
(relation.parent_id,),
)

if relation.rel_type == RelationTypes.REPLACE:
txn.call_after(
self.store.get_applicable_edit.invalidate, (relation.parent_id,)
)

if relation.rel_type == RelationTypes.THREAD:
txn.call_after(
self.store.get_thread_summary.invalidate, (relation.parent_id,)
)
# It should be safe to only invalidate the cache if the user has not
# previously participated in the thread, but that's difficult (and
# potentially error-prone) so it is always invalidated.
txn.call_after(
self.store.get_thread_participated.invalidate,
(relation.parent_id, event.sender),
)

def _handle_insertion_event(
self, txn: LoggingTransaction, event: EventBase
) -> None:
Expand Down Expand Up @@ -2213,28 +2171,6 @@ def _set_push_actions_for_event_and_users_txn(
),
)

room_to_event_ids: Dict[str, List[str]] = {}
for e in non_outlier_events:
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)

for room_id, event_ids in room_to_event_ids.items():
rows = self.db_pool.simple_select_many_txn(
txn,
table="event_push_actions_staging",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=("user_id",),
)

user_ids = {row["user_id"] for row in rows}

for user_id in user_ids:
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

# Now we delete the staging area for *all* events that were being
# persisted.
txn.execute_batch(
Expand All @@ -2249,11 +2185,6 @@ def _set_push_actions_for_event_and_users_txn(
def _remove_push_actions_for_event_id_txn(
self, txn: LoggingTransaction, room_id: str, event_id: str
) -> None:
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id,),
)
txn.execute(
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id),
Expand Down

0 comments on commit 956b47a

Please sign in to comment.