Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Simplify cache invalidation after event persist txn #13796

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13796.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use shared methods for cache invalidation when persisting events, remove duplicate codepaths. Contributed by Nick @ Beeper (@fizzadar).
3 changes: 3 additions & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (user_id,)
)

# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def _invalidate_caches_for_event(
self.get_applicable_edit.invalidate((relates_to,))
self.get_thread_summary.invalidate((relates_to,))
self.get_thread_participated.invalidate((relates_to,))
self.get_mutual_event_relations_for_rel_type.invalidate((relates_to,))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the whole _invalidate_caches_for_event be moved to the base store class, alongside _invalidate_state_caches?

Another q: should this use _attempt_to_invalidate_cache?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the whole _invalidate_caches_for_event be moved to the base store class, alongside _invalidate_state_caches?

I'm curious, what benefits are you thinking of?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely aesthetic - in my head at least there's a divide like:

  • databases/main/_base.py - contains local cache invalidation helpers/wrappers
  • databases/main/cache.py - contains the stream/database/distributed invalidation logic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to have _invalidate_caches_for_event and _invalidate_state_caches next to each other. And _invalidate_caches_for_event even uses _attempt_to_invalidate_cache now.

I don't have a strong opinion on which direction we should move things to though. Probably best to wait for a reviewer if they want to include that change here.

  • synapse/storage/_base.py
  • synapse/storage/databases/main/cache.py

It looks like currently, that divide you mentioned lines up but that's totally determined by where _attempt_to_invalidate_cache and _invalidate_state_caches currently are in synapse/storage/_base.py.

By name, having all of the cache stuff be in synapse/storage/databases/main/cache.py seems like it would make sense 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By name, having all of the cache stuff be in synapse/storage/databases/main/cache.py seems like it would make sense 🤷

Agreed! I did try this but ended up in a total mess of class resolution order so reverted 😬

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total mess of class resolution order

Yeah, the CacheInvalidationWorkerStore is a real bottleneck for this, making it really hard to type-hint. (#11165, #11354, #11271)

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
Expand Down
133 changes: 27 additions & 106 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from prometheus_client import Counter

import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.constants import EventContentFields, EventTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
Expand Down Expand Up @@ -410,6 +410,31 @@ def _persist_events_txn(
assert min_stream_order
assert max_stream_order

# Once the txn completes, invalidate all of the relevant caches. Note that we do this
# up here because it captures all the events_and_contexts before any are removed.
for event, _ in events_and_contexts:
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
if event.redacts:
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)

relates_to = None
relation = relation_from_event(event)
if relation:
relates_to = relation.parent_id

assert event.internal_metadata.stream_ordering is not None
txn.call_after(
self.store._invalidate_caches_for_event,
event.internal_metadata.stream_ordering,
event.event_id,
event.room_id,
event.type,
getattr(event, "state_key", None),
event.redacts,
relates_to,
backfilled=False,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremities,
Expand Down Expand Up @@ -459,6 +484,7 @@ def _persist_events_txn(

# We call this last as it assumes we've inserted the events into
# room_memberships, where applicable.
# NB: This function invalidates all state related caches
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)

def _persist_event_auth_chain_txn(
Expand Down Expand Up @@ -1172,13 +1198,6 @@ def _update_current_state_txn(
)

# Invalidate the various caches

for member in members_changed:
txn.call_after(
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
(member,),
)

self.store._invalidate_state_caches_and_stream(
txn, room_id, members_changed
)
Expand Down Expand Up @@ -1222,9 +1241,6 @@ def _update_forward_extremities_txn(
self.db_pool.simple_delete_txn(
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
)
txn.call_after(
self.store.get_latest_event_ids_in_room.invalidate, (room_id,)
)

self.db_pool.simple_insert_many_txn(
txn,
Expand Down Expand Up @@ -1294,8 +1310,6 @@ def _update_room_depths_txn(
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
Expand Down Expand Up @@ -1697,16 +1711,7 @@ async def prefill() -> None:
txn.async_call_after(prefill)

def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
"""Invalidate the caches for the redacted event.

Note that these caches are also cleared as part of event replication in
_invalidate_caches_for_event.
"""
assert event.redacts is not None
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))

self.db_pool.simple_upsert_txn(
txn,
table="redactions",
Expand Down Expand Up @@ -1807,34 +1812,6 @@ def _store_room_members_txn(

for event in events:
assert event.internal_metadata.stream_ordering is not None
txn.call_after(
self.store._membership_stream_cache.entity_has_changed,
event.state_key,
event.internal_metadata.stream_ordering,
)
txn.call_after(
self.store.get_invited_rooms_for_local_user.invalidate,
(event.state_key,),
)
txn.call_after(
self.store.get_local_users_in_room.invalidate,
(event.room_id,),
)
txn.call_after(
self.store.get_number_joined_users_in_room.invalidate,
(event.room_id,),
)
txn.call_after(
self.store.get_user_in_room_with_profile.invalidate,
(event.room_id, event.state_key),
)

# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
txn.call_after(
self.store._get_membership_from_event_id.invalidate,
(event.event_id,),
)

# We update the local_current_membership table only if the event is
# "current", i.e., its something that has just happened.
Expand Down Expand Up @@ -1883,35 +1860,6 @@ def _handle_event_relations(
},
)

txn.call_after(
self.store.get_relations_for_event.invalidate, (relation.parent_id,)
)
txn.call_after(
self.store.get_aggregation_groups_for_event.invalidate,
(relation.parent_id,),
)
txn.call_after(
self.store.get_mutual_event_relations_for_rel_type.invalidate,
(relation.parent_id,),
)

if relation.rel_type == RelationTypes.REPLACE:
txn.call_after(
self.store.get_applicable_edit.invalidate, (relation.parent_id,)
)

if relation.rel_type == RelationTypes.THREAD:
txn.call_after(
self.store.get_thread_summary.invalidate, (relation.parent_id,)
)
# It should be safe to only invalidate the cache if the user has not
# previously participated in the thread, but that's difficult (and
# potentially error-prone) so it is always invalidated.
txn.call_after(
self.store.get_thread_participated.invalidate,
(relation.parent_id, event.sender),
)

def _handle_insertion_event(
self, txn: LoggingTransaction, event: EventBase
) -> None:
Expand Down Expand Up @@ -2213,28 +2161,6 @@ def _set_push_actions_for_event_and_users_txn(
),
)

room_to_event_ids: Dict[str, List[str]] = {}
for e in non_outlier_events:
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)

for room_id, event_ids in room_to_event_ids.items():
rows = self.db_pool.simple_select_many_txn(
txn,
table="event_push_actions_staging",
column="event_id",
iterable=event_ids,
keyvalues={},
retcols=("user_id",),
)

user_ids = {row["user_id"] for row in rows}

for user_id in user_ids:
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id, user_id),
)

# Now we delete the staging area for *all* events that were being
# persisted.
txn.execute_batch(
Expand All @@ -2249,11 +2175,6 @@ def _set_push_actions_for_event_and_users_txn(
def _remove_push_actions_for_event_id_txn(
self, txn: LoggingTransaction, room_id: str, event_id: str
) -> None:
# Sad that we have to blow away the cache for the whole room here
txn.call_after(
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
(room_id,),
)
Comment on lines -2252 to -2256
Copy link
Contributor

@MadLittleMods MadLittleMods Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check the review box that everything is accounted for, all of these removals are accounted for in _invalidate_caches_for_event and _invalidate_state_caches

_persist_events_txn takes care of calling _invalidate_caches_for_event and _update_current_state_txn -> _invalidate_state_caches_and_stream -> _invalidate_state_caches

And _invalidate_caches_for_event/_invalidate_state_caches are already called over replication (workers).

txn.execute(
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id),
Expand Down