Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Catch-up after Federation Outage (split, 4): catch-up loop #8272

Merged
merged 18 commits into from
Sep 15, 2020
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,15 @@ def __init__(
self.transmission_loop_running = False

# True whilst we are sending events that the remote homeserver missed
# because it was unreachable.
# False whilst we are known to not be catching up.
# None when it has not been determined if we are catching-up yet (we
# do so lazily).
# because it was unreachable. We start in this state so we can perform
# catch-up at startup.
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
self._catching_up = None # type: Optional[bool]
self._catching_up = True # type: bool

# The stream_ordering of the most recent PDU that was discarded due to
# being in catch-up mode, or None if not applicable.
self._catchup_last_skipped = None # type: Optional[int]
# being in catch-up mode.
self._catchup_last_skipped = 0 # type: int

# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
Expand Down Expand Up @@ -155,10 +153,12 @@ def send_pdu(self, pdu: EventBase) -> None:
Args:
pdu: pdu to send
"""
if not self._catching_up:
if not self._catching_up or self._last_successful_stream_ordering is None:
# only enqueue the PDU if we are not catching up (False) or do not
# yet know if we are to catch up (None)
# yet know if we have anything to catch up (None)
self._pending_pdus.append(pdu)
else:
self._catchup_last_skipped = pdu.internal_metadata.stream_ordering

self.attempt_new_transaction()

Expand Down Expand Up @@ -239,7 +239,7 @@ async def _transaction_transmission_loop(self) -> None:
# hence why we throw the result away.
await get_retry_limiter(self._destination, self._clock, self._store)

if self._catching_up is None or self._catching_up is True:
if self._catching_up:
# we potentially need to catch-up first
await self._catch_up_transmission_loop()
if self._catching_up:
Expand Down Expand Up @@ -434,8 +434,10 @@ async def _transaction_transmission_loop(self) -> None:
self.transmission_loop_running = False

async def _catch_up_transmission_loop(self) -> None:
if self._last_successful_stream_ordering is None:
# first catch-up, so get from database
first_catch_up_check = self._last_successful_stream_ordering is None

if first_catch_up_check:
# first catchup so get last_successful_stream_ordering from database
self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering(
self._destination
)
Expand All @@ -460,21 +462,18 @@ async def _catch_up_transmission_loop(self) -> None:
# of a race condition, so we check that no new events have been
# skipped due to us being in catch-up mode

if (
self._catchup_last_skipped is not None
and self._catchup_last_skipped
> self._last_successful_stream_ordering
):
if self._catchup_last_skipped > self._last_successful_stream_ordering:
# another event has been skipped because we were in catch-up mode
continue

# we are done catching up!
self._catching_up = False
break

if self._catching_up is None:
# we didn't know if we're in catch-up mode yet but now we know
# we are — so mark us as in catch-up mode and drop the queue.
if first_catch_up_check:
# as this is our check for needing catch-up, we may have PDUs in
# the queue from before we *knew* we had to do catch-up, so
# clear those out now.
self._start_catching_up()

# fetch the relevant events from the event store
Expand All @@ -489,6 +488,10 @@ async def _catch_up_transmission_loop(self) -> None:
"This should not happen." % event_ids
)

if logger.isEnabledFor(logging.DEBUG):
rooms = (p.room_id for p in catchup_pdus)
logger.debug("Catching up rooms to %s: %r", self._destination, rooms)
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

success = await self._transaction_manager.send_new_transaction(
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
self._destination, catchup_pdus, []
)
Expand Down