Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into rav/clean_up_event…
Browse files Browse the repository at this point in the history
…_edges
  • Loading branch information
richvdh committed May 27, 2022
2 parents c29b21b + f1605b7 commit ab737dd
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 27 deletions.
1 change: 1 addition & 0 deletions changelog.d/12886.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `have_seen_events` to reduce memory consumed when processing federation traffic.
1 change: 1 addition & 0 deletions changelog.d/12889.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.59.0 which caused room deletion to fail with a foreign key violation.
42 changes: 24 additions & 18 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,14 +1356,23 @@ async def have_seen_events(
Returns:
The set of events we have already seen.
"""
res = await self._have_seen_events_dict(
(room_id, event_id) for event_id in event_ids
)
return {eid for ((_rid, eid), have_event) in res.items() if have_event}

# @cachedList chomps lots of memory if you call it with a big list, so
# we break it down. However, each batch requires its own index scan, so we make
# the batches as big as possible.

results: Set[str] = set()
for chunk in batch_iter(event_ids, 500):
r = await self._have_seen_events_dict(
[(room_id, event_id) for event_id in chunk]
)
results.update(eid for ((_rid, eid), have_event) in r.items() if have_event)

return results

@cachedList(cached_method_name="have_seen_event", list_name="keys")
async def _have_seen_events_dict(
self, keys: Iterable[Tuple[str, str]]
self, keys: Collection[Tuple[str, str]]
) -> Dict[Tuple[str, str], bool]:
"""Helper for have_seen_events
Expand All @@ -1375,33 +1384,30 @@ async def _have_seen_events_dict(
cache_results = {
(rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
}
results = {x: True for x in cache_results}
results = dict.fromkeys(cache_results, True)
remaining = [k for k in keys if k not in cache_results]
if not remaining:
return results

def have_seen_events_txn(
txn: LoggingTransaction, chunk: Tuple[Tuple[str, str], ...]
) -> None:
def have_seen_events_txn(txn: LoggingTransaction) -> None:
# we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`.
#
# We therefore pull the events from the database into a set...

sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk]
txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining]
)
txn.execute(sql + clause, args)
found_events = {eid for eid, in txn}

# ... and then we can update the results for each row in the batch
results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk})

# each batch requires its own index scan, so we make the batches as big as
# possible.
for chunk in batch_iter((k for k in keys if k not in cache_results), 500):
await self.db_pool.runInteraction(
"have_seen_events", have_seen_events_txn, chunk
# ... and then we can update the results for each key
results.update(
{(rid, eid): (eid in found_events) for (rid, eid) in remaining}
)

await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
return results

@cached(max_entries=100000, tree=True)
Expand Down
19 changes: 10 additions & 9 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,7 @@ async def purge_room(self, room_id: str) -> List[int]:
)

def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
# We *immediately* delete the room from the rooms table. This ensures
# that we don't race when persisting events (as that transaction checks
# that the room exists).
txn.execute("DELETE FROM rooms WHERE room_id = ?", (room_id,))

# Next, we fetch all the state groups that should be deleted, before
# First, fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
"""
Expand Down Expand Up @@ -387,16 +382,21 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
(room_id,),
)

# and finally, the tables with an index on room_id (or no useful index)
# next, the tables with an index on room_id (or no useful index)
for table in (
"current_state_events",
"destination_rooms",
"event_backward_extremities",
"event_forward_extremities",
"event_push_actions",
"event_search",
"partial_state_events",
"events",
"federation_inbound_events_staging",
"group_rooms",
"local_current_membership",
"partial_state_rooms_servers",
"partial_state_rooms",
"receipts_graph",
"receipts_linearized",
"room_aliases",
Expand All @@ -416,8 +416,9 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
"group_summary_rooms",
"room_account_data",
"room_tags",
"local_current_membership",
"federation_inbound_events_staging",
# "rooms" happens last, to keep the foreign keys in the other tables
# happy
"rooms",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
Expand Down

0 comments on commit ab737dd

Please sign in to comment.