Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimize backfill receiving to have less missing prev_event thrashing (v2) #13970

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
Expand Down Expand Up @@ -644,9 +645,71 @@ async def backfill(
f"room {ev.room_id}, when we were backfilling in {room_id}"
)

# We expect the events from the `/backfill` response to start from
# `?v` and include events that preceded it (so the list will be
# newest -> oldest, reverse-chronological). It's described in the
# spec this way so we can rely on people doing it the right way for
# the historical messages to show up correctly.
reverse_chronological_events = events
# `[::-1]` is just syntax to reverse the list and give us a copy
chronological_events = reverse_chronological_events[::-1]

# We want to calculate the `stream_ordering` from newest -> oldest
# (reverse-chronological) (so MSC2716 historical events end up
# sorting in the correct order) and persist oldest -> newest
# (chronological) to get the least missing `prev_event` fetch
# thrashing.
# ------------------------------------------------------------------

# Since we have been configured to write, we ought to have id generators,
# rather than id trackers.
assert (
self._instance_name in self._config.worker.writers.events
), "Can only write stream IDs on master"
assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator)
Comment on lines +665 to +669
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rely on the worker instance that backfills events to be a writer instance? I don't think so. The event persister is different than the workers that ingest new events.

How do I do this correctly?

stream_ordering_manager = self._store._backfill_id_gen.get_next_mult(
len(reverse_chronological_events)
)
async with stream_ordering_manager as stream_orderings:
# Calculate the `stream_ordering` from newest -> oldest
# (reverse-chronological) (so historical events end up sorting
# in the correct order).
#
# Backfilled events start with `stream_ordering=-1` and
# decrement. For events, that we backfill at the same `depth`
# (like chains of historical messages) in order for them to have
# the best chance of ending up in the correct order, assign
# `stream_ordering` to the assumed reverse-chronological list of
# events to backfill (where the newest events get
# stream_ordering assigned first)
#
# depth : stream_ordering : event
# ----- : --------------- : -----------------------
# 1 : 1 : Event before 1
# 2 : 2 : Event before 2
# 3 : -4 : Historical message 1
# 3 : -4 : Historical message 2
# 3 : -3 : Historical message 3
# 3 : -2 : Historical message 4
# 3 : -1 : Historical message 5
# 3 : 3 : Event after 1
# 4 : 4 : Event after 2
#
for event, stream in zip(
reverse_chronological_events, stream_orderings
):
event.internal_metadata.stream_ordering = stream

await self._process_pulled_events(
dest,
events,
# Persist events from oldest -> newest (chronological) to get
# the least missing `prev_event` fetch thrashing.
# `_process_pulled_events` does some sorting of its own by
# `depth` but if we let it sort the reverse-chronological list
# of events, it naively orders events with the same depth in the
# opposite order we want. If we pass it an already sorted by
# depth list, then everything lines up.
chronological_events,
backfilled=True,
)

Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,11 @@ async def _persist_events_and_state_updates(

async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
# If someone has already decided the stream_ordering for the
# event before, then just use that. This is done during backfill
# to help ordering of MSC2716 historical messages.
if event.internal_metadata.stream_ordering is None:
event.internal_metadata.stream_ordering = stream

await self.db_pool.runInteraction(
"persist_events",
Expand Down