Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prune inbound federation queues if they get too long #10390

Merged
merged 6 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10390.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prune inbound federation inbound queues for a room if they get too large.
17 changes: 17 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,23 @@ async def _process_incoming_pdus_in_room_inner(

origin, event = next

# Prune the event queue if it's getting large.
#
# We do this *after* handling the first event as the common case is
# that the queue is empty (/has the single event in), and so there's
# no need to do this check.
pruned = await self.store.prune_staged_events_in_room(room_id, room_version)
if pruned:
# If we have pruned the queue check we need to refetch the next
# event to handle.
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
if not next:
break

origin, event = next

lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
)
Expand Down
104 changes: 102 additions & 2 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple

from prometheus_client import Gauge
from prometheus_client import Counter, Gauge

from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand All @@ -44,6 +44,12 @@
"The total number of events in the inbound federation staging",
)

pdus_pruned_from_federation_queue = Counter(
"synapse_federation_server_number_inbound_pdu_pruned",
"The number of events in the inbound federation staging that have been "
"pruned due to the queue getting too long",
)

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -1207,6 +1213,100 @@ def _get_next_staged_event_for_room_txn(txn):

return origin, event

async def prune_staged_events_in_room(
self,
room_id: str,
room_version: RoomVersion,
) -> bool:
"""Checks if there are lots of staged events for the room, and if so
prune them down.

Returns:
Whether any events were pruned
"""

# First check the size of the queue.
count = await self.db_pool.simple_select_one_onecol(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcol="COALESCE(COUNT(*), 0)",
desc="prune_staged_events_in_room_count",
)

if count < 100:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return False

# If the queue is too large, then we want clear the entire queue,
# keeping only the forward extremities (i.e. the events not referenced
# by other events in the queue). We do this so that we can always
# backpaginate in all the events we have dropped.
rows = await self.db_pool.simple_select_list(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcols=("event_id", "event_json"),
desc="prune_staged_events_in_room_fetch",
)

# Find the set of events referenced by those in the queue, as well as
# collecting all the event IDs in the queue.
referenced_events: Set[str] = set()
seen_events: Set[str] = set()
for row in rows:
event_id = row["event_id"]
seen_events.add(event_id)
event_d = db_to_json(row["event_json"])

# We don't bother parsing the dicts into full blown event objects,
# as that is needlessly expensive.

# We haven't checked that the `prev_events` have the right format
# yet, so we check as we go.
prev_events = event_d.get("prev_events", [])
if not isinstance(prev_events, list):
logger.info("Invalid prev_events for %s", event_id)
continue
Comment on lines +1262 to +1267
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we're going to end up logging this every time we get a new event, potentially for every event in the room. Shouldn't we just drop the offending event? (or better yet, check it before we get this far).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeeees, ideally we'd have checked long before here that the Event was valid (given we'd already have parsed the JSON into an event object before persistence). I'm slightly hesitant about dropping the "invalid" events here in case we update the format of events and forget to change it here, which would cause us to sometimes drop those events (as this function does not get called for each inbound event).


if room_version.event_format == EventFormatVersions.V1:
for prev_event_tuple in prev_events:
if not isinstance(prev_event_tuple, list) or len(prev_events) != 2:
logger.info("Invalid prev_events for %s", event_id)
break

prev_event_id = prev_event_tuple[0]
if not isinstance(prev_event_id, str):
logger.info("Invalid prev_events for %s", event_id)
break

referenced_events.add(prev_event_id)
else:
for prev_event_id in prev_events:
if not isinstance(prev_event_id, str):
logger.info("Invalid prev_events for %s", event_id)
break

referenced_events.add(prev_event_id)
richvdh marked this conversation as resolved.
Show resolved Hide resolved

to_delete = referenced_events & seen_events
if not to_delete:
return False

pdus_pruned_from_federation_queue.inc(len(to_delete))
logger.info(
"Pruning %d events in room %s from federation queue",
len(to_delete),
room_id,
)

await self.db_pool.simple_delete_many(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
iterable=to_delete,
column="event_id",
desc="prune_staged_events_in_room_delete",
)

return True

async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
"""Get the room IDs of all events currently staged."""
return await self.db_pool.simple_select_onecol(
Expand Down
57 changes: 57 additions & 0 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import attr
from parameterized import parameterized

from synapse.api.room_versions import RoomVersions
from synapse.events import _EventInternalMetadata
from synapse.util import json_encoder

import tests.unittest
import tests.utils
Expand Down Expand Up @@ -504,6 +506,61 @@ def insert_event(txn):
)
self.assertSetEqual(difference, set())

def test_prune_inbound_federation_queue(self):
"Test that pruning of inbound federation queues work"

room_id = "some_room_id"

# Insert a bunch of events that all reference the previous one.
self.get_success(
self.store.db_pool.simple_insert_many(
table="federation_inbound_events_staging",
values=[
{
"origin": "some_origin",
"room_id": room_id,
"received_ts": 0,
"event_id": f"$fake_event_id_{i + 1}",
"event_json": json_encoder.encode(
{"prev_events": [f"$fake_event_id_{i}"]}
),
"internal_metadata": "{}",
}
for i in range(500)
],
desc="test_prune_inbound_federation_queue",
)
)

# Calling prune once should return True, i.e. a prune happen. The second
# time it shouldn't.
pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
)
self.assertTrue(pruned)

pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
)
self.assertFalse(pruned)

# Assert that we only have a single event left in the queue, and that it
# is the last one.
count = self.get_success(
self.store.db_pool.simple_select_one_onecol(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcol="COALESCE(COUNT(*), 0)",
desc="test_prune_inbound_federation_queue",
)
)
self.assertEqual(count, 1)

_, event_id = self.get_success(
self.store.get_next_staged_event_id_for_room(room_id)
)
self.assertEqual(event_id, "$fake_event_id_500")


@attr.s
class FakeEvent:
Expand Down