Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Catch-up after Federation Outage #8096

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
18d900e
Fix wrong type annotation
reivilibre Jul 23, 2020
07a415c
Little type hint
reivilibre Aug 14, 2020
5bc321a
Add data store functions and delta
reivilibre Aug 14, 2020
c60e259
Track destination_rooms entries
reivilibre Aug 14, 2020
20c896a
Add catch-up logic
reivilibre Aug 14, 2020
d232200
Add tests!
reivilibre Aug 14, 2020
967d8c1
Merge branch 'develop' into rei/2528_catchup_fed_outage
reivilibre Aug 14, 2020
d910798
Track async/await and db_pool transitions
reivilibre Aug 14, 2020
74a6f4f
Newsfile
reivilibre Aug 14, 2020
c1b32ae
Antilint
reivilibre Aug 14, 2020
5de9313
Fix up test
reivilibre Aug 17, 2020
759e027
Remove unused method
reivilibre Aug 17, 2020
6c52666
Use Python 3.5-friendly Collection type
reivilibre Aug 17, 2020
9ba56cb
Fix logic bug in prior code
reivilibre Aug 17, 2020
af13948
Update changelog.d/8096.bugfix
reivilibre Aug 20, 2020
558af38
Handle suggestions from review
reivilibre Aug 20, 2020
44765e9
How could we ever forget you, ./scripts-dev/lint.sh?
reivilibre Aug 20, 2020
56aaa17
Apply suggestions from Rich's code review
reivilibre Aug 26, 2020
84dbc43
Apply suggestions from Rich's code review
reivilibre Aug 26, 2020
2c740a7
Foreign key on rooms, SQL comment
reivilibre Aug 26, 2020
d77e444
NOT NULL, foreign key (events)
reivilibre Aug 26, 2020
33874d4
SQL column doc
reivilibre Aug 26, 2020
c1a2b68
Behaviour confirmed reasonable-seeming
reivilibre Aug 26, 2020
16eec5c
The early bird gets the early return
reivilibre Aug 26, 2020
92517e9
Assertion on bug
reivilibre Aug 26, 2020
ef4680d
Last successful stream ordering is about destinations
reivilibre Aug 26, 2020
de5caf0
Catch-up on all cases except federation denial
reivilibre Aug 27, 2020
3e308f9
Don't explicitly store the event_id
reivilibre Aug 27, 2020
b0bdadd
Antilint
reivilibre Aug 27, 2020
843403f
Remove review question
reivilibre Aug 27, 2020
ad7124d
Merge branch 'develop' into rei/2528_catchup_fed_outage
reivilibre Aug 27, 2020
b1fd67b
Fix wrong type signatures (even if str is Iterable[str]…)
reivilibre Aug 27, 2020
e6890c7
Fix the tests after removing event_id column
reivilibre Aug 27, 2020
7cfecf3
Antilint
reivilibre Aug 27, 2020
bf51d2f
Also fix `simple_select_onecol_txn`
reivilibre Aug 27, 2020
7589a03
Antilint again :(
reivilibre Aug 27, 2020
8d9f4ba
Merge remote-tracking branch 'origin/develop' into rei/2528_catchup_f…
reivilibre Sep 1, 2020
b60ad35
Merge remote-tracking branch 'origin/develop' into rei/2528_catchup_f…
reivilibre Sep 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8096.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fill remote homeservers in on events they may have missed in rooms during a period of unreachability.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
14 changes: 12 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async def handle_event(event: EventBase) -> None:
logger.debug("Sending %s to %r", event, destinations)

if destinations:
self._send_pdu(event, destinations)
await self._send_pdu(event, destinations)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
Expand Down Expand Up @@ -267,7 +267,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
finally:
self._is_processing = False

def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
Expand All @@ -285,6 +285,16 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()

# track the fact that we are enqueuing this PDU for these destinations,
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
await self.store.store_destination_rooms_entries(
destinations,
pdu.room_id,
pdu.event_id,
pdu.internal_metadata.stream_ordering,
)

richvdh marked this conversation as resolved.
Show resolved Hide resolved
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)

Expand Down
162 changes: 160 additions & 2 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast

from prometheus_client import Counter

Expand Down Expand Up @@ -92,6 +92,18 @@ def __init__(
self._destination = destination
self.transmission_loop_running = False

# True whilst we are sending events that the remote homeserver missed
# because it was unreachable.
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
self._catching_up = True
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
# the maximum stream order to catch up to (PDUs after this are expected
# to be in the main transmission queue), inclusive
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
self._catch_up_max_stream_order = None # type: Optional[int]
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
self._last_successful_stream_order = None # type: Optional[int]

# a list of tuples of (pending pdu, order)
self._pending_pdus = [] # type: List[Tuple[EventBase, int]]

Expand Down Expand Up @@ -137,8 +149,15 @@ def send_pdu(self, pdu: EventBase, order: int) -> None:

Args:
pdu: pdu to send
order
order: an arbitrary order for the PDU — NOT the stream ordering
"""
if (
self._catch_up_max_stream_order
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
and pdu.internal_metadata.stream_ordering <= self._catch_up_max_stream_order
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this happen? I'm struggling to imagine how we could end up here with an event with a lower stream ordering than the catch-up max.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a (potentially hypothetical?) race condition.

        # track the fact that we have a PDU for these destinations,
        # to allow us to perform catch-up later on if the remote is unreachable
        # for a while.
        await self.store.store_destination_rooms_entries(
            destinations,
            pdu.room_id,
            pdu.event_id,
            pdu.internal_metadata.stream_ordering,
        )

# X <---

        for destination in destinations:
            self._get_per_destination_queue(destination).send_pdu(pdu, order)

I'm not sure if ^ has an opportunity to race or not; but even if it doesn't now, what about if someone innocently comes along and plops an await in there (at the X)?

The uncertainty made me feel like I should try and make this robust and 'just deal with it'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N.B. needs scrutiny about whether that needs to check we actually have self._catch_up 🤔

):
# we are in catch-up mode and this PDU is already scheduled to be
# part of the catch-up
return
self._pending_pdus.append((pdu, order))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems to me that we shouldn't add new events to the queue while we are catching up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you note later, I set a high-water-mark. Anything after this watermark should be handled in the normal way.

(Part of this is also to reduce the use of /get_missing_events for when we think the remote is online.)

If we are backing off for a long time, it will clear the queue and high-water-mark when it attempts a transaction on the next line.

self.attempt_new_transaction()

Expand Down Expand Up @@ -219,6 +238,17 @@ async def _transaction_transmission_loop(self) -> None:
# hence why we throw the result away.
await get_retry_limiter(self._destination, self._clock, self._store)

if self._catching_up:
# we're catching up, so we should send old events instead
# in this case, we don't send anything from the new queue
# this keeps the catching-up logic simple
await self._catch_up_transmission_loop()
if self._catching_up:
# XXX if we aren't actually caught up still, shouldn't
# carry on to the main loop
# (but need to consider what we do in a failure...?)
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
return

pending_pdus = []
while True:
# We have to keep 2 free slots for presence and rr_edus
Expand Down Expand Up @@ -326,6 +356,15 @@ async def _transaction_transmission_loop(self) -> None:

self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id

if pending_pdus:
final_pdu, _ = pending_pdus[-1]
self._last_successful_stream_order = (
final_pdu.internal_metadata.stream_ordering
)
await self._store.set_last_successful_stream_ordering(
self._destination, self._last_successful_stream_order
)
else:
break
except NotRetryingDestination as e:
Expand All @@ -338,6 +377,11 @@ async def _transaction_transmission_loop(self) -> None:
),
)

# XXX REVIEW needs scrutiny
# to note: up to 50 pdus can be lost from the
# main queue by a transaction that triggers a backoff — do we
# clear the main queue now? I can see arguments for and against.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are those arguments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'ow, that'll teach me for not writing my full thoughts whilst I still remember what I was on about.

So, vaguely I may have had two wavelengths of thought here:

  1. We're dropping 50 PDUs, isn't that bad?! Won't that mean we will forget to catch-up some rooms potentially?

If I recall, I appear to have been under the delusion that those PDUs would be lost altogether.

However, now I realise that this will enable catch-up, and those PDUs will be eligible for catch-up if they are the most recent PDUs in their respective rooms (a destination_rooms entry will exist and we won't update our last_successful_stream_ordering on a failure…) — so the remote will still get a chance to hear about them when they next come up. (If they aren't, then the more recent PDUs will be in the main queue so the remote will hear eventually!)

So: all OK here.

  1. Is there any point in dropping 50 but not just dropping the whole queue?
  • Erik mentioned that sometimes, moving on to a different transaction can help a remote recover (i.e. just one particular one causes it to blow up), so 'skipping' is not necessarily bad.
  • I wonder if waiting for retries to exceed an hour before sweeping out the queue could be bad for memory usage (of course, no worse than present-day, but …. is this a chance to make it better?)
  • On the other hand, I don't suppose that /get_missing_events is the most efficient/high-throughput way to get events so forcing catch-up onto destinations too aggressively may be harmful.

Broadly I am tempted to think this is currently fine, but may need tweaking/limiting/aggressifying in the future in case that really does grow too large. I think Erik or Rich will have a more trustworthy sense here, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine for now, though once we have more confidence in the "catching up" behaviour, I think we may as well drop the whole queue immediately.


if e.retry_interval > 60 * 60 * 1000:
# we won't retry for another hour!
# (this suggests a significant outage)
Expand All @@ -359,6 +403,10 @@ async def _transaction_transmission_loop(self) -> None:
self._pending_edus_keyed = {}
self._pending_presence = {}
self._pending_rrs = {}

self._catching_up = True
# reset max catch up since we have dropped PDUs here
self._catch_up_max_stream_order = None
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
Expand All @@ -368,6 +416,8 @@ async def _transaction_transmission_loop(self) -> None:
e.code,
e,
)

# XXX REVIEW should we be catching up?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are some typical situations where we could hit this? I could imagine a server is switching between available/not available behind its reverse proxy, such that we occasionally get 502's. In that case we should probably continue to catch up, correct?

Does Synapse purposely return any error codes other than 200 here? (I'm not sure). The spec doesn't seem to say so.

In that case, if it's only things in front of Synapse that would be returning other error codes, it seems sensible to continue trying to catch up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I want to be more clear on which of these exceptions, we should later try to catch up again, or which ones we should treat as 'actually drop the PDUs for good this time'?

After looking with a fresh mind, I think yes, we should enable catch-up on: RequestSendFailed, HttpResponseException

Not sure about: FederationDeniedError, Exception

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think we should enter "catchup" mode in most of these cases.

We should not do so for FederationDeniedError since that means the remote server is deliberately blocked in our whitelist.

All the other cases should trigger a catchup afaict.

except RequestSendFailed as e:
logger.warning(
"TX [%s] Failed to send transaction: %s", self._destination, e
Expand All @@ -387,6 +437,99 @@ async def _transaction_transmission_loop(self) -> None:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False

async def _catch_up_transmission_loop(self) -> None:
if self._last_successful_stream_order is None:
# first catch-up, so get from database
self._last_successful_stream_order = await self._store.get_last_successful_stream_ordering(
self._destination
)

if self._last_successful_stream_order is None:
# if it's still None, then this means we don't have the information
# in our database (oh, the perils of being a new feature).
# So we can't actually do anything here, and in this case, we don't
# know what to catch up, sadly.
# Trying to catch up right now is futile, so let's stop.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
self._catching_up = False
return

if self._catch_up_max_stream_order is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why do we need a high-water-mark at all? why not just keep going until get_catch_up_room_event_ids returns an empty list?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(doing so without care will introduce races though...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could do that if you are keen — as you say though, needs more thought.

The advantage of this approach is that it's easy to keep the logic in my head. Was keen not to give a chance for any nasty bugs to crawl in, because it'd be nice to have confidence in this.

# this is our first catch-up so we need to determine how much we
# want to catch-up.
if self._pending_pdus:
# we have PDUs already in the main queue so no need to ask the
# database
first_non_catch_up_pdu, _ = self._pending_pdus[0]
# -1 because we wish to exclude that one — we don't need to catch
# it up as it's in our main queue
self._catch_up_max_stream_order = (
first_non_catch_up_pdu.internal_metadata.stream_ordering - 1
)
else:
# we don't have any PDUs in the main queue so instead find out
# the largest stream order that we know of that has, once upon a
# time, been queued for this destination (i.e. this is what we
# *should* have sent if the remote server was reachable).
self._catch_up_max_stream_order = await self._store.get_largest_destination_rooms_stream_order(
self._destination
)
if self._catch_up_max_stream_order is None:
# not enough info to catch up
self._catching_up = False
return

# get 50 catchup room/PDUs
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
while self._last_successful_stream_order < self._catch_up_max_stream_order:
event_ids = await self._store.get_catch_up_room_event_ids(
self._destination,
self._last_successful_stream_order,
self._catch_up_max_stream_order,
)

if not event_ids:
# I don't believe this *should* happen unless someone has been
# tinkering with the database, but I also have limited foresight,
# so let's handle this properly
logger.warning(
"No event IDs found for catch-up: "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to note in the log line that this is unintended in some way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better?

"last successful = %d, max catch up = %d",
self._last_successful_stream_order,
self._catch_up_max_stream_order,
)
self._catching_up = False
break

# fetch the relevant events from the event store
events = await self._store.get_events_as_list(
event_ids
) # XXX REVIEW: redact behaviour and allow_rejected ???
reivilibre marked this conversation as resolved.
Show resolved Hide resolved

# zip them together with their stream orderings
catch_up_pdus = [
(event, event.internal_metadata.stream_ordering) for event in events
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as noted elsewhere: I think the order is redundant: it might be easier to get rid of it (in a separate PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll put it on my list, then :)

]

if not catch_up_pdus:
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does this happen, and why is breaking the loop the correct behaviour?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this should be log an ERROR, since this shouldn't happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break isn't the correct behaviour really, since it disables catch-up when we supposedly should have some to do.

But only break will let us make progress and go to the main loop again.

So I will log an error either way, but is it best to break or return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being unnecessarily paranoid?

There will be a foreign key constraint to link our rows to events, so this should darn well be impossible.

Better to assert events or if not events: raise AssertionError(...)?


success = await self._transaction_manager.send_new_transaction(
self._destination, catch_up_pdus, []
)
if success:
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
sent_transactions_counter.inc()
final_pdu, _ = catch_up_pdus[-1]
self._last_successful_stream_order = cast(
int, final_pdu.internal_metadata.stream_ordering
)
await self._store.set_last_successful_stream_ordering(
self._destination, self._last_successful_stream_order
)
else:
return

# once we have reached this point, catch-up is done!
self._catching_up = False

def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
return
Expand Down Expand Up @@ -430,6 +573,21 @@ async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]:

return (edus, now_stream_id)

def reset_catch_up_state(self) -> None:
"""
Resets the catch-up state of this destination.
This does the following:
- marks catch-up mode
- unsets the catch-up limit (max. stream order)
so that it is reset to the highest catch-up next time we have a
chance to catch up
- empties the main PDU queue as any PDUs in it will now be handled
by catch-up (because the max stream order limit was unset).
"""
self._pending_pdus = []
self._catching_up = True
self._catch_up_max_stream_order = None

reivilibre marked this conversation as resolved.
Show resolved Hide resolved
async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]:
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This schema delta alters the schema to enable 'catching up' remote homeservers
-- after there has been a connectivity problem for any reason.

-- This stores, for each (destination, room) pair, the event_id and stream_ordering
-- of the latest event to be enqueued for transmission to that destination.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
CREATE TABLE IF NOT EXISTS destination_rooms (
-- the destination in question
destination TEXT NOT NULL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd kinda like to see the logic shuffled so that this can be a foreign key; however at the very least it needs a comment saying why it can't be one.

Copy link
Contributor Author

@reivilibre reivilibre Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, would it suffice to INSERT IGNORE a NULL row into the destinations table when upserting a destination_rooms row?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably. It would be nice to avoid doing so on every PDU (for example by doing it when we first create a PerDestinationQueue for the destination), but that might be fiddly, and is something we can do later.

-- the ID of the room in question
room_id TEXT NOT NULL,
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
-- the stream_ordering of the event
stream_ordering INTEGER,
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
-- the event_id of the event
event_id TEXT NOT NULL,
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
PRIMARY KEY (destination, room_id)
);

-- this column tracks the stream_ordering of the event that was most recently
-- successfully transmitted to the destination.
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
ALTER TABLE destinations
ADD COLUMN last_successful_stream_ordering INTEGER;
Loading