This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Simplify cache invalidation after event persist txn #13796
Merged
erikjohnston
merged 5 commits into
matrix-org:develop
from
beeper:cleanup-event-cache-invalidation
Sep 26, 2022
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
346cca3
Invalidate event caches using base method when persisting events
Fizzadar 43439b6
Add changelog file
Fizzadar a86bb01
Invalidate caches before filtering events & contexts
Fizzadar f2a1207
Move `get_rooms_for_user_with_stream_ordering` invalidation into base
Fizzadar 4587be1
Use `_attempt_to_invalidate_cache` in `_invalidate_caches_for_event`
Fizzadar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Use shared methods for cache invalidation when persisting events, remove duplicate codepaths. Contributed by Nick @ Beeper (@fizzadar). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,7 @@ | |
from prometheus_client import Counter | ||
|
||
import synapse.metrics | ||
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes | ||
from synapse.api.constants import EventContentFields, EventTypes | ||
from synapse.api.errors import Codes, SynapseError | ||
from synapse.api.room_versions import RoomVersions | ||
from synapse.events import EventBase, relation_from_event | ||
|
@@ -410,6 +410,31 @@ def _persist_events_txn( | |
assert min_stream_order | ||
assert max_stream_order | ||
|
||
# Once the txn completes, invalidate all of the relevant caches. Note that we do this | ||
# up here because it captures all the events_and_contexts before any are removed. | ||
for event, _ in events_and_contexts: | ||
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) | ||
if event.redacts: | ||
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) | ||
|
||
relates_to = None | ||
relation = relation_from_event(event) | ||
if relation: | ||
relates_to = relation.parent_id | ||
|
||
assert event.internal_metadata.stream_ordering is not None | ||
txn.call_after( | ||
self.store._invalidate_caches_for_event, | ||
event.internal_metadata.stream_ordering, | ||
event.event_id, | ||
event.room_id, | ||
event.type, | ||
getattr(event, "state_key", None), | ||
event.redacts, | ||
relates_to, | ||
backfilled=False, | ||
) | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
self._update_forward_extremities_txn( | ||
txn, | ||
new_forward_extremities=new_forward_extremities, | ||
|
@@ -459,6 +484,7 @@ def _persist_events_txn( | |
|
||
# We call this last as it assumes we've inserted the events into | ||
# room_memberships, where applicable. | ||
# NB: This function invalidates all state related caches | ||
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) | ||
|
||
def _persist_event_auth_chain_txn( | ||
|
@@ -1172,13 +1198,6 @@ def _update_current_state_txn( | |
) | ||
|
||
# Invalidate the various caches | ||
|
||
for member in members_changed: | ||
txn.call_after( | ||
self.store.get_rooms_for_user_with_stream_ordering.invalidate, | ||
(member,), | ||
) | ||
|
||
self.store._invalidate_state_caches_and_stream( | ||
txn, room_id, members_changed | ||
) | ||
|
@@ -1222,9 +1241,6 @@ def _update_forward_extremities_txn( | |
self.db_pool.simple_delete_txn( | ||
txn, table="event_forward_extremities", keyvalues={"room_id": room_id} | ||
) | ||
txn.call_after( | ||
self.store.get_latest_event_ids_in_room.invalidate, (room_id,) | ||
) | ||
|
||
self.db_pool.simple_insert_many_txn( | ||
txn, | ||
|
@@ -1294,8 +1310,6 @@ def _update_room_depths_txn( | |
""" | ||
depth_updates: Dict[str, int] = {} | ||
for event, context in events_and_contexts: | ||
# Remove the any existing cache entries for the event_ids | ||
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) | ||
# Then update the `stream_ordering` position to mark the latest | ||
# event as the front of the room. This should not be done for | ||
# backfilled events because backfilled events have negative | ||
|
@@ -1697,16 +1711,7 @@ async def prefill() -> None: | |
txn.async_call_after(prefill) | ||
|
||
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None: | ||
"""Invalidate the caches for the redacted event. | ||
|
||
Note that these caches are also cleared as part of event replication in | ||
_invalidate_caches_for_event. | ||
""" | ||
assert event.redacts is not None | ||
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) | ||
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,)) | ||
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,)) | ||
|
||
self.db_pool.simple_upsert_txn( | ||
txn, | ||
table="redactions", | ||
|
@@ -1807,34 +1812,6 @@ def _store_room_members_txn( | |
|
||
for event in events: | ||
assert event.internal_metadata.stream_ordering is not None | ||
txn.call_after( | ||
self.store._membership_stream_cache.entity_has_changed, | ||
event.state_key, | ||
event.internal_metadata.stream_ordering, | ||
) | ||
txn.call_after( | ||
self.store.get_invited_rooms_for_local_user.invalidate, | ||
(event.state_key,), | ||
) | ||
txn.call_after( | ||
self.store.get_local_users_in_room.invalidate, | ||
(event.room_id,), | ||
) | ||
txn.call_after( | ||
self.store.get_number_joined_users_in_room.invalidate, | ||
(event.room_id,), | ||
) | ||
txn.call_after( | ||
self.store.get_user_in_room_with_profile.invalidate, | ||
(event.room_id, event.state_key), | ||
) | ||
|
||
# The `_get_membership_from_event_id` is immutable, except for the | ||
# case where we look up an event *before* persisting it. | ||
txn.call_after( | ||
self.store._get_membership_from_event_id.invalidate, | ||
(event.event_id,), | ||
) | ||
|
||
# We update the local_current_membership table only if the event is | ||
# "current", i.e., its something that has just happened. | ||
|
@@ -1883,35 +1860,6 @@ def _handle_event_relations( | |
}, | ||
) | ||
|
||
txn.call_after( | ||
self.store.get_relations_for_event.invalidate, (relation.parent_id,) | ||
) | ||
txn.call_after( | ||
self.store.get_aggregation_groups_for_event.invalidate, | ||
(relation.parent_id,), | ||
) | ||
txn.call_after( | ||
self.store.get_mutual_event_relations_for_rel_type.invalidate, | ||
(relation.parent_id,), | ||
) | ||
|
||
if relation.rel_type == RelationTypes.REPLACE: | ||
txn.call_after( | ||
self.store.get_applicable_edit.invalidate, (relation.parent_id,) | ||
) | ||
|
||
if relation.rel_type == RelationTypes.THREAD: | ||
txn.call_after( | ||
self.store.get_thread_summary.invalidate, (relation.parent_id,) | ||
) | ||
# It should be safe to only invalidate the cache if the user has not | ||
# previously participated in the thread, but that's difficult (and | ||
# potentially error-prone) so it is always invalidated. | ||
txn.call_after( | ||
self.store.get_thread_participated.invalidate, | ||
(relation.parent_id, event.sender), | ||
) | ||
|
||
def _handle_insertion_event( | ||
self, txn: LoggingTransaction, event: EventBase | ||
) -> None: | ||
|
@@ -2213,28 +2161,6 @@ def _set_push_actions_for_event_and_users_txn( | |
), | ||
) | ||
|
||
room_to_event_ids: Dict[str, List[str]] = {} | ||
for e in non_outlier_events: | ||
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) | ||
|
||
for room_id, event_ids in room_to_event_ids.items(): | ||
rows = self.db_pool.simple_select_many_txn( | ||
txn, | ||
table="event_push_actions_staging", | ||
column="event_id", | ||
iterable=event_ids, | ||
keyvalues={}, | ||
retcols=("user_id",), | ||
) | ||
|
||
user_ids = {row["user_id"] for row in rows} | ||
|
||
for user_id in user_ids: | ||
txn.call_after( | ||
self.store.get_unread_event_push_actions_by_room_for_user.invalidate, | ||
(room_id, user_id), | ||
) | ||
|
||
# Now we delete the staging area for *all* events that were being | ||
# persisted. | ||
txn.execute_batch( | ||
|
@@ -2249,11 +2175,6 @@ def _set_push_actions_for_event_and_users_txn( | |
def _remove_push_actions_for_event_id_txn( | ||
self, txn: LoggingTransaction, room_id: str, event_id: str | ||
) -> None: | ||
# Sad that we have to blow away the cache for the whole room here | ||
txn.call_after( | ||
self.store.get_unread_event_push_actions_by_room_for_user.invalidate, | ||
(room_id,), | ||
) | ||
Comment on lines
-2252
to
-2256
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To check the review box that everything is accounted for, all of these removals are accounted for in
And |
||
txn.execute( | ||
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", | ||
(room_id, event_id), | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is significantly greedier than the old version which only invalidated caches based on the relation type. Not sure if this matters much, but we know ahead of time which of these might need to be invalidated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Also it is quite annoying that you can no longer jump to these definitions via PyCharm or anything since they're strings now. 😢 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this function should take
relation
instead ofrelatest_to
and it could evaluate the lost conditionals to reduce the invalidation amount?:( not ideal at all, nor do I have any ideas on how to solve this unfortunately! If all the DB classes were fully shared this could just assume each method existed and call them direct...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, I'm not sure it is a huge deal or not, but definitely fits in the bucket of "code that doesn't need to run".
_attempt_to_invalidate_cache
sounds scary (and looking at it, it seems to use quite a fewgetattr
calls, which aren't known for optimizations...)I think ideally it would be nice if we referred to everything by the store it is on, instead of via a god object. Like the changes we did with the config objects. I think this is doable, but a lot of work. 😢 See #11165
Anyway -- I don't think there's anything actionable to do here, just wanted to note my concerns down for posterity.