Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Catch-up after Federation Outage #8096

Closed
wants to merge 38 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
18d900e
Fix wrong type annotation
reivilibre Jul 23, 2020
07a415c
Little type hint
reivilibre Aug 14, 2020
5bc321a
Add data store functions and delta
reivilibre Aug 14, 2020
c60e259
Track destination_rooms entries
reivilibre Aug 14, 2020
20c896a
Add catch-up logic
reivilibre Aug 14, 2020
d232200
Add tests!
reivilibre Aug 14, 2020
967d8c1
Merge branch 'develop' into rei/2528_catchup_fed_outage
reivilibre Aug 14, 2020
d910798
Track async/await and db_pool transitions
reivilibre Aug 14, 2020
74a6f4f
Newsfile
reivilibre Aug 14, 2020
c1b32ae
Antilint
reivilibre Aug 14, 2020
5de9313
Fix up test
reivilibre Aug 17, 2020
759e027
Remove unused method
reivilibre Aug 17, 2020
6c52666
Use Python 3.5-friendly Collection type
reivilibre Aug 17, 2020
9ba56cb
Fix logic bug in prior code
reivilibre Aug 17, 2020
af13948
Update changelog.d/8096.bugfix
reivilibre Aug 20, 2020
558af38
Handle suggestions from review
reivilibre Aug 20, 2020
44765e9
How could we ever forget you, ./scripts-dev/lint.sh?
reivilibre Aug 20, 2020
56aaa17
Apply suggestions from Rich's code review
reivilibre Aug 26, 2020
84dbc43
Apply suggestions from Rich's code review
reivilibre Aug 26, 2020
2c740a7
Foreign key on rooms, SQL comment
reivilibre Aug 26, 2020
d77e444
NOT NULL, foreign key (events)
reivilibre Aug 26, 2020
33874d4
SQL column doc
reivilibre Aug 26, 2020
c1a2b68
Behaviour confirmed reasonable-seeming
reivilibre Aug 26, 2020
16eec5c
The early bird gets the early return
reivilibre Aug 26, 2020
92517e9
Assertion on bug
reivilibre Aug 26, 2020
ef4680d
Last successful stream ordering is about destinations
reivilibre Aug 26, 2020
de5caf0
Catch-up on all cases except federation denial
reivilibre Aug 27, 2020
3e308f9
Don't explicitly store the event_id
reivilibre Aug 27, 2020
b0bdadd
Antilint
reivilibre Aug 27, 2020
843403f
Remove review question
reivilibre Aug 27, 2020
ad7124d
Merge branch 'develop' into rei/2528_catchup_fed_outage
reivilibre Aug 27, 2020
b1fd67b
Fix wrong type signatures (even if str is Iterable[str]…)
reivilibre Aug 27, 2020
e6890c7
Fix the tests after removing event_id column
reivilibre Aug 27, 2020
7cfecf3
Antilint
reivilibre Aug 27, 2020
bf51d2f
Also fix `simple_select_onecol_txn`
reivilibre Aug 27, 2020
7589a03
Antilint again :(
reivilibre Aug 27, 2020
8d9f4ba
Merge remote-tracking branch 'origin/develop' into rei/2528_catchup_f…
reivilibre Sep 1, 2020
b60ad35
Merge remote-tracking branch 'origin/develop' into rei/2528_catchup_f…
reivilibre Sep 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 153 additions & 2 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple

from prometheus_client import Counter

Expand Down Expand Up @@ -92,6 +92,18 @@ def __init__(
self._destination = destination
self.transmission_loop_running = False

# True whilst we are sending events that the remote homeserver missed
# because it was unreachable.
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
self._catching_up = True
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
# the maximum stream order to catch up to (PDUs after this are expected
# to be in the main transmission queue), inclusive
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
self._catch_up_max_stream_order = None # type: Optional[int]
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
self._last_successful_stream_order = None # type: Optional[int]

# a list of tuples of (pending pdu, order)
self._pending_pdus = [] # type: List[Tuple[EventBase, int]]

Expand Down Expand Up @@ -137,8 +149,15 @@ def send_pdu(self, pdu: EventBase, order: int) -> None:

Args:
pdu: pdu to send
order
order: an arbitrary order for the PDU — NOT the stream ordering
"""
if (
self._catch_up_max_stream_order
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
and pdu.internal_metadata.stream_ordering <= self._catch_up_max_stream_order
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this happen? I'm struggling to imagine how we could end up here with an event with a lower stream ordering than the catch-up max.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a (potentially hypothetical?) race condition.

        # track the fact that we have a PDU for these destinations,
        # to allow us to perform catch-up later on if the remote is unreachable
        # for a while.
        await self.store.store_destination_rooms_entries(
            destinations,
            pdu.room_id,
            pdu.event_id,
            pdu.internal_metadata.stream_ordering,
        )

# X <---

        for destination in destinations:
            self._get_per_destination_queue(destination).send_pdu(pdu, order)

I'm not sure if ^ has an opportunity to race or not; but even if it doesn't now, what about if someone innocently comes along and plops an await in there (at the X)?

The uncertainty made me feel like I should try and make this robust and 'just deal with it'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N.B. needs scrutiny about whether that needs to check we actually have self._catch_up 🤔

):
# we are in catch-up mode and this PDU is already scheduled to be
# part of the catch-up
return
self._pending_pdus.append((pdu, order))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to me that we shouldn't add new events to the queue while we are catching up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you note later, I set a high-water-mark. Anything after this watermark should be handled in the normal way.

(Part of this is also to reduce the use of /get_missing_events for when we think the remote is online.)

If we are backing off for a long time, it will clear the queue and high-water-mark when it attempts a transaction on the next line.

self.attempt_new_transaction()

Expand Down Expand Up @@ -219,6 +238,17 @@ async def _transaction_transmission_loop(self) -> None:
# hence why we throw the result away.
await get_retry_limiter(self._destination, self._clock, self._store)

if self._catching_up:
# we're catching up, so we should send old events instead
# in this case, we don't send anything from the new queue
# this keeps the catching-up logic simple
await self._catch_up_transmission_loop()
if self._catching_up:
# XXX if we aren't actually caught up still, shouldn't
# carry on to the main loop
# (but need to consider what we do in a failure...?)
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
return

pending_pdus = []
while True:
# We have to keep 2 free slots for presence and rr_edus
Expand Down Expand Up @@ -326,6 +356,15 @@ async def _transaction_transmission_loop(self) -> None:

self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id

if pending_pdus:
final_pdu, _ = pending_pdus[-1]
self._last_successful_stream_order = (
final_pdu.internal_metadata.stream_ordering
)
await self._store.set_last_successful_stream_ordering(
self._destination, self._last_successful_stream_order
)
else:
break
except NotRetryingDestination as e:
Expand All @@ -337,6 +376,8 @@ async def _transaction_transmission_loop(self) -> None:
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)

self._catching_up = True
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
Expand All @@ -346,6 +387,8 @@ async def _transaction_transmission_loop(self) -> None:
e.code,
e,
)

# XXX REVIEW should we be catching up?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are some typical situations where we could hit this? I could imagine a server is switching between available/not available behind its reverse proxy, such that we occasionally get 502's. In that case we should probably continue to catch up, correct?

Does Synapse purposely return any error codes other than 200 here? (I'm not sure). The spec doesn't seem to say so.

In that case, if it's only things in front of Synapse that would be returning other error codes, it seems sensible to continue trying to catch up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I want to be more clear on which of these exceptions, we should later try to catch up again, or which ones we should treat as 'actually drop the PDUs for good this time'?

After looking with a fresh mind, I think yes, we should enable catch-up on: RequestSendFailed, HttpResponseException

Not sure about: FederationDeniedError, Exception

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think we should enter "catchup" mode in most of these cases.

We should not do so for FederationDeniedError since that means the remote server is deliberately blocked in our whitelist.

All the other cases should trigger a catchup afaict.

except RequestSendFailed as e:
logger.warning(
"TX [%s] Failed to send transaction: %s", self._destination, e
Expand All @@ -365,6 +408,99 @@ async def _transaction_transmission_loop(self) -> None:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False

async def _catch_up_transmission_loop(self) -> None:
if self._last_successful_stream_order is None:
# first catch-up, so get from database
self._last_successful_stream_order = await self._store.get_last_successful_stream_ordering(
self._destination
)

if self._last_successful_stream_order is None:
# if it's still None, then this means we don't have the information
# in our database (oh, the perils of being a new feature).
# So we can't actually do anything here, and in this case, we don't
# know what to catch up, sadly.
# Trying to catch up right now is futile, so let's stop.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
self._catching_up = False
return

if self._catch_up_max_stream_order is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why do we need a high-water-mark at all? why not just keep going until get_catch_up_room_event_ids returns an empty list?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(doing so without care will introduce races though...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could do that if you are keen — as you say though, needs more thought.

The advantage of this approach is that it's easy to keep the logic in my head. Was keen not to give a chance for any nasty bugs to crawl in, because it'd be nice to have confidence in this.

# this is our first catch-up so we need to determine how much we
# want to catch-up.
if self._pending_pdus:
# we have PDUs already in the main queue so no need to ask the
# database
first_non_catch_up_pdu, _ = self._pending_pdus[0]
# -1 because we wish to exclude that one — we don't need to catch
# it up as it's in our main queue
self._catch_up_max_stream_order = (
first_non_catch_up_pdu.internal_metadata.stream_ordering - 1
)
else:
# we don't have any PDUs in the main queue so instead find out
# the largest stream order that we know of that has, once upon a
# time, been queued for this destination (i.e. this is what we
# *should* have sent if the remote server was reachable).
self._catch_up_max_stream_order = await self._store.get_largest_destination_rooms_stream_order(
self._destination
)
if self._catch_up_max_stream_order is None:
# not enough info to catch up
self._catching_up = False
return

# get 50 catchup room/PDUs
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
while self._last_successful_stream_order < self._catch_up_max_stream_order:
event_ids = await self._store.get_catch_up_room_event_ids(
self._destination,
self._last_successful_stream_order,
self._catch_up_max_stream_order,
)

if not event_ids:
# I don't believe this *should* happen unless someone has been
# tinkering with the database, but I also have limited foresight,
# so let's handle this properly
logger.warning(
"No event IDs found for catch-up: "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to note in the log line that this is unintended in some way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better?

"last successful = %d, max catch up = %d",
self._last_successful_stream_order,
self._catch_up_max_stream_order
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
)
self._catching_up = False
break

# fetch the relevant events from the event store
events = await self._store.get_events_as_list(
event_ids
) # XXX REVIEW: redact behaviour and allow_rejected ???
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

# zip them together with their stream orderings
catch_up_pdus = [
(event, event.internal_metadata.stream_ordering) for event in events
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as noted elsewhere: I think the order is redundant: it might be easier to get rid of it (in a separate PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll put it on my list, then :)

]

if not catch_up_pdus:
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does this happen, and why is breaking the loop the correct behaviour?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this should be log an ERROR, since this shouldn't happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break isn't the correct behaviour really, since it disables catch-up when we supposedly should have some to do.

But only break will let us make progress and go to the main loop again.

So I will log an error either way, but is it best to break or return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being unnecessarily paranoid?

There will be a foreign key constraint to link our rows to events, so this should darn well be impossible.

Better to assert events or if not events: raise AssertionError(...)?


success = await self._transaction_manager.send_new_transaction(
self._destination, catch_up_pdus, []
)
if success:
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
sent_transactions_counter.inc()
final_pdu, _ = catch_up_pdus[-1]
self._last_successful_stream_order = (
final_pdu.internal_metadata.stream_ordering
)
await self._store.set_last_successful_stream_ordering(
self._destination, self._last_successful_stream_order
)
else:
return

# once we have reached this point, catch-up is done!
self._catching_up = False

def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
return
Expand Down Expand Up @@ -408,6 +544,21 @@ async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]:

return (edus, now_stream_id)

def reset_catch_up_state(self) -> None:
"""
Resets the catch-up state of this destination.
This does the following:
- marks catch-up mode
- unsets the catch-up limit (max. stream order)
so that it is reset to the highest catch-up next time we have a
chance to catch up
- empties the main PDU queue as any PDUs in it will now be handled
by catch-up (because the max stream order limit was unset).
"""
self._pending_pdus = []
self._catching_up = True
self._catch_up_max_stream_order = None

reivilibre marked this conversation as resolved.
Show resolved Hide resolved
async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]:
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
Expand Down