Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix have_seen_event cache not being invalidated #13863

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13863.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `have_seen_event` cache not being invalidated after we persist an event which causes downstream effects like extra `/state` federation calls.
25 changes: 25 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,31 @@ def _persist_events_txn(
assert min_stream_order
assert max_stream_order

# Once the txn completes, invalidate all of the relevant caches. Note that we do this
# up here because it captures all the events_and_contexts before any are removed.
for event, _ in events_and_contexts:
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
if event.redacts:
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)

relates_to = None
relation = relation_from_event(event)
if relation:
relates_to = relation.parent_id

assert event.internal_metadata.stream_ordering is not None
txn.call_after(
self.store._invalidate_caches_for_event,
event.internal_metadata.stream_ordering,
event.event_id,
event.room_id,
event.type,
getattr(event, "state_key", None),
event.redacts,
relates_to,
backfilled=False,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremities,
Expand Down
40 changes: 22 additions & 18 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1474,32 +1474,38 @@ async def have_seen_events(
# the batches as big as possible.

results: Set[str] = set()
for chunk in batch_iter(event_ids, 500):
r = await self._have_seen_events_dict(
[(room_id, event_id) for event_id in chunk]
for event_ids_chunk in batch_iter(event_ids, 500):
events_seen_dict = await self._have_seen_events_dict(
room_id, event_ids_chunk
)
results.update(
eid for (eid, have_event) in events_seen_dict.items() if have_event
)
results.update(eid for ((_rid, eid), have_event) in r.items() if have_event)

return results

@cachedList(cached_method_name="have_seen_event", list_name="keys")
@cachedList(cached_method_name="have_seen_event", list_name="event_ids")
async def _have_seen_events_dict(
self, keys: Collection[Tuple[str, str]]
) -> Dict[Tuple[str, str], bool]:
self,
room_id: str,
event_ids: Collection[str],
) -> Dict[str, bool]:
Comment on lines +1487 to +1492
Copy link
Contributor Author

@MadLittleMods MadLittleMods Sep 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix as described by @erikjohnston in #13865 (comment)

The rest of the changes in this file are adapting to this change.

"""Helper for have_seen_events

Returns:
a dict {(room_id, event_id)-> bool}
a dict {event_id -> bool}
"""
# if the event cache contains the event, obviously we've seen it.

cache_results = {
(rid, eid)
for (rid, eid) in keys
if await self._get_event_cache.contains((eid,))
event_id
for event_id in event_ids
if await self._get_event_cache.contains((event_id,))
}
results = dict.fromkeys(cache_results, True)
remaining = [k for k in keys if k not in cache_results]
remaining = [
event_id for event_id in event_ids if event_id not in cache_results
]
if not remaining:
return results

Expand All @@ -1511,23 +1517,21 @@ def have_seen_events_txn(txn: LoggingTransaction) -> None:

sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining]
txn.database_engine, "e.event_id", remaining
)
txn.execute(sql + clause, args)
found_events = {eid for eid, in txn}

# ... and then we can update the results for each key
results.update(
{(rid, eid): (eid in found_events) for (rid, eid) in remaining}
)
results.update({eid: (eid in found_events) for eid in remaining})

await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
return results

@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
res = await self._have_seen_events_dict(((room_id, event_id),))
return res[(room_id, event_id)]
res = await self._have_seen_events_dict(room_id, [event_id])
return res[event_id]

def _get_current_state_event_counts_txn(
self, txn: LoggingTransaction, room_id: str
Expand Down
120 changes: 72 additions & 48 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,66 +35,45 @@
from synapse.util.async_helpers import yieldable_gather_results

from tests import unittest
from tests.test_utils.event_injection import create_event, inject_event


class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def prepare(self, reactor, clock, hs):
self.hs = hs
self.store: EventsWorkerStore = hs.get_datastores().main

# insert some test data
for rid in ("room1", "room2"):
self.get_success(
self.store.db_pool.simple_insert(
"rooms",
{"room_id": rid, "room_version": 4},
)
)
self.user = self.register_user("user", "pass")
self.token = self.login(self.user, "pass")
self.room_id = self.helper.create_room_as(self.user, tok=self.token)

self.event_ids: List[str] = []
for idx, rid in enumerate(
(
"room1",
"room1",
"room1",
"room2",
)
):
event_json = {"type": f"test {idx}", "room_id": rid}
event = make_event_from_dict(event_json, room_version=RoomVersions.V4)
event_id = event.event_id

self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": event_id,
"room_id": rid,
"topological_ordering": idx,
"stream_ordering": idx,
"type": event.type,
"processed": True,
"outlier": False,
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"event_json",
{
"event_id": event_id,
"room_id": rid,
"json": json.dumps(event_json),
"internal_metadata": "{}",
"format_version": 3,
},
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
for i in range(3):
event = self.get_success(
inject_event(
hs,
room_version=RoomVersions.V7.identifier,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": f"foobarbaz{i}"},
)
)
self.event_ids.append(event_id)

self.event_ids.append(event.event_id)

def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})

Expand All @@ -104,7 +83,9 @@ def test_simple(self):
# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0], "event19"])
self.store.have_seen_events(
self.room_id, [self.event_ids[0], "eventdoesnotexist"]
)
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
Expand All @@ -116,11 +97,54 @@ def test_query_via_event_cache(self):
# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", [self.event_ids[0]])
self.store.have_seen_events(self.room_id, [self.event_ids[0]])
)
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)

def test_persisting_event_invalidates_cache(self):
event, event_context = self.get_success(
create_event(
self.hs,
room_id=self.room_id,
sender=self.user,
type="test_event_type",
content={"body": "garply"},
)
)

with LoggingContext(name="test") as ctx:
# First, check `have_seen_event` for an event we have not seen yet
# to prime the cache with a `false` value.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, set())

# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)

# Persist the event which should invalidate or prefill the
# `have_seen_event` cache so we don't return stale values.
persistence = self.hs.get_storage_controllers().persistence
self.get_success(
persistence.persist_event(
event,
event_context,
)
)

with LoggingContext(name="test") as ctx:
# Check `have_seen_event` again and we should see the updated fact
# that we have now seen the event after persisting it.
res = self.get_success(
self.store.have_seen_events(event.room_id, [event.event_id])
)
self.assertEqual(res, {event.event_id})

# That should result in a single db query to lookup
self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)


class EventCacheTestCase(unittest.HomeserverTestCase):
"""Test that the various layers of event cache works."""
Expand Down