From 18d900eed02ff3525ec43a4e34e767fdf134e4df Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 23 Jul 2020 08:58:47 +0100 Subject: [PATCH 01/34] Fix wrong type annotation Confused me for a second there! --- synapse/federation/sender/transaction_manager.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 8280f8b9003d..c7f6cb3d73c3 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, List +from typing import TYPE_CHECKING, List, Tuple from canonicaljson import json @@ -54,7 +54,10 @@ def __init__(self, hs: "synapse.server.HomeServer"): @measure_func("_send_new_transaction") async def send_new_transaction( - self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu] + self, + destination: str, + pending_pdus: List[Tuple[EventBase, int]], + pending_edus: List[Edu], ): # Make a transaction-sending opentracing span. This span follows on from From 07a415ca94abd3493d3307e7b618aba980d85d11 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 14 Aug 2020 19:54:47 +0100 Subject: [PATCH 02/34] Little type hint --- tests/rest/client/v1/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 22d734e7630a..e1fdd7a27990 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -39,7 +39,9 @@ class RestHelper(object): resource = attr.ib() auth_user_id = attr.ib() - def create_room_as(self, room_creator=None, is_public=True, tok=None): + def create_room_as( + self, room_creator: str = None, is_public: bool = True, tok: str = None + ) -> str: temp_id = self.auth_user_id self.auth_user_id = room_creator path = "/_matrix/client/r0/createRoom" From 5bc321ae1a788254cbeaead336a647fca9e41083 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 14 Aug 2020 19:55:59 +0100 Subject: [PATCH 03/34] Add data store functions and delta --- .../delta/58/11recovery_after_outage.sql | 35 +++++ .../storage/data_stores/main/transactions.py | 136 +++++++++++++++++- 2 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/data_stores/main/schema/delta/58/11recovery_after_outage.sql diff --git a/synapse/storage/data_stores/main/schema/delta/58/11recovery_after_outage.sql b/synapse/storage/data_stores/main/schema/delta/58/11recovery_after_outage.sql new file mode 100644 index 000000000000..c903419eb43d --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/11recovery_after_outage.sql @@ -0,0 +1,35 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-- This schema delta alters the schema to enable 'catching up' remote homeservers +-- after there has been a connectivity problem for any reason. + +-- This stores, for each (destination, room) pair, the event_id and stream_ordering +-- of the latest event to be enqueued for transmission to that destination. +CREATE TABLE IF NOT EXISTS destination_rooms ( + -- the destination in question + destination TEXT NOT NULL, + -- the ID of the room in question + room_id TEXT NOT NULL, + -- the stream_ordering of the event + stream_ordering INTEGER, + -- the event_id of the event + event_id TEXT NOT NULL, + PRIMARY KEY (destination, room_id) +); + +-- this column tracks the stream_ordering of the event that was most recently +-- successfully transmitted to the destination. +ALTER TABLE destinations + ADD COLUMN last_successful_stream_ordering INTEGER; diff --git a/synapse/storage/data_stores/main/transactions.py b/synapse/storage/data_stores/main/transactions.py index a9bf457939f1..dcce59fdf7cb 100644 --- a/synapse/storage/data_stores/main/transactions.py +++ b/synapse/storage/data_stores/main/transactions.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from collections import namedtuple +from typing import List, Optional from canonicaljson import encode_canonical_json @@ -267,3 +267,137 @@ def _cleanup_transactions_txn(txn): return self.db.runInteraction( "_cleanup_transactions", _cleanup_transactions_txn ) + + def get_last_successful_stream_ordering(self, destination: str): + """ + Gets the stream ordering of the PDU most-recently successfully sent + to the specified destination. + + Args: + destination: the destination we have successfully sent to + """ + return self.db.simple_select_one_onecol( + "destinations", + {"destination": destination}, + "last_successful_stream_ordering", + allow_none=True, + desc="get_last_successful_stream_ordering", + ) + + def set_last_successful_stream_ordering( + self, destination: str, last_successful_stream_ordering: int + ): + """ + Marks that we have successfully sent the PDUs up to and including the + one specified. + + Args: + destination: the destination we have successfully sent to + last_successful_stream_ordering: the stream_ordering of the most + recent successfully-sent PDU + """ + return self.db.simple_upsert( + "destinations", + keyvalues={"destination": destination}, + values={"last_successful_stream_ordering": last_successful_stream_ordering}, + desc="set_last_successful_stream_ordering", + ) + + def get_catch_up_room_event_ids( + self, + destination: str, + last_successful_stream_ordering: int, + max_stream_order: int, + ): + """ + Returns 50 event IDs and their corresponding stream_orderings that + correspond to the least recent events that have not yet been sent to + a destination. + + Args: + destination: the destination in question + last_successful_stream_ordering: the stream_ordering of the + most-recently successfully-transmitted event to the destination + max_stream_order: an upper bound, inclusive, of the stream ordering + to return events for. + + Returns: + event_ids + """ + return self.db.runInteraction( + "get_catch_up_room_event_ids", + self._get_catch_up_room_event_ids_txn, + destination, + last_successful_stream_ordering, + max_stream_order, + ) + + @staticmethod + def _get_catch_up_room_event_ids_txn( + txn, + destination: str, + last_successful_stream_ordering: int, + max_stream_order: int, + ) -> List[str]: + q = """ + SELECT event_id FROM destination_rooms + WHERE destination = ? + AND stream_ordering > ? AND stream_ordering <= ? + ORDER BY stream_ordering + LIMIT 50 + """ + txn.execute( + q, (destination, last_successful_stream_ordering, max_stream_order), + ) + event_ids = [row[0] for row in txn] + return event_ids + + def get_largest_destination_rooms_stream_order(self, destination: str): + """ + Returns the largest stream_ordering from the destination_rooms table + that corresponds to this destination. + """ + return self.db.runInteraction( + "get_largest_destination_rooms_stream_order", + self._get_largest_destination_rooms_stream_order_txn, + destination, + ) + + @staticmethod + def _get_largest_destination_rooms_stream_order_txn(txn, destination: str) -> Optional[int]: + txn.execute( + """ + SELECT stream_ordering + FROM destination_rooms + WHERE destination = ? + ORDER BY stream_ordering DESC + LIMIT 1 + """, + (destination,), + ) + rows = [r[0] for r in txn] + if rows: + return rows[0] + else: + return None + + def store_destination_rooms_entries( + self, destinations: List[str], room_id: str, event_id: str, stream_ordering: int + ): + """ + Updates or creates `destination_rooms` entries in batch for a single event. + Args: + destinations: list of destinations + room_id: the room_id of the event + event_id: the ID of the event + stream_ordering: the stream_ordering of the event + """ + return self.db.runInteraction( + "store_destination_rooms_entries", + self.db.simple_upsert_many_txn, + "destination_rooms", + ["destination", "room_id"], + [(d, room_id) for d in destinations], + ["event_id", "stream_ordering"], + [(event_id, stream_ordering)] * len(destinations), + ) From c60e259a89c7350e594fd0b35ab31c1a2448672c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 14 Aug 2020 19:56:41 +0100 Subject: [PATCH 04/34] Track destination_rooms entries --- synapse/federation/sender/__init__.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index b328a4df096c..f651b338f92d 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -211,7 +211,7 @@ async def handle_event(event: EventBase) -> None: logger.debug("Sending %s to %r", event, destinations) if destinations: - self._send_pdu(event, destinations) + await self._send_pdu(event, destinations) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -267,7 +267,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None: finally: self._is_processing = False - def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: + async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -285,6 +285,16 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_count.inc() + # track the fact that we are enqueuing this PDU for these destinations, + # to allow us to perform catch-up later on if the remote is unreachable + # for a while. + await self.store.store_destination_rooms_entries( + destinations, + pdu.room_id, + pdu.event_id, + pdu.internal_metadata.stream_ordering, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu, order) From 20c896a1469c10f7e02b702edcd8824b9679e924 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 14 Aug 2020 19:56:53 +0100 Subject: [PATCH 05/34] Add catch-up logic --- .../sender/per_destination_queue.py | 155 +++++++++++++++++- 1 file changed, 153 insertions(+), 2 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 343674178327..9fc2ad8dffbc 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,7 +15,7 @@ # limitations under the License. import datetime import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple from prometheus_client import Counter @@ -92,6 +92,18 @@ def __init__( self._destination = destination self.transmission_loop_running = False + # True whilst we are sending events that the remote homeserver missed + # because it was unreachable. + # New events will only be sent once this is finished, at which point + # _catching_up is flipped to False. + self._catching_up = True + # the maximum stream order to catch up to (PDUs after this are expected + # to be in the main transmission queue), inclusive + self._catch_up_max_stream_order = None # type: Optional[int] + # Cache of the last successfully-transmitted stream ordering for this + # destination (we are the only updater so this is safe) + self._last_successful_stream_order = None # type: Optional[int] + # a list of tuples of (pending pdu, order) self._pending_pdus = [] # type: List[Tuple[EventBase, int]] @@ -137,8 +149,15 @@ def send_pdu(self, pdu: EventBase, order: int) -> None: Args: pdu: pdu to send - order + order: an arbitrary order for the PDU — NOT the stream ordering """ + if ( + self._catch_up_max_stream_order + and pdu.internal_metadata.stream_ordering <= self._catch_up_max_stream_order + ): + # we are in catch-up mode and this PDU is already scheduled to be + # part of the catch-up + return self._pending_pdus.append((pdu, order)) self.attempt_new_transaction() @@ -219,6 +238,17 @@ async def _transaction_transmission_loop(self) -> None: # hence why we throw the result away. await get_retry_limiter(self._destination, self._clock, self._store) + if self._catching_up: + # we're catching up, so we should send old events instead + # in this case, we don't send anything from the new queue + # this keeps the catching-up logic simple + await self._catch_up_transmission_loop() + if self._catching_up: + # XXX if we aren't actually caught up still, shouldn't + # carry on to the main loop + # (but need to consider what we do in a failure...?) + return + pending_pdus = [] while True: # We have to keep 2 free slots for presence and rr_edus @@ -326,6 +356,15 @@ async def _transaction_transmission_loop(self) -> None: self._last_device_stream_id = device_stream_id self._last_device_list_stream_id = dev_list_id + + if pending_pdus: + final_pdu, _ = pending_pdus[-1] + self._last_successful_stream_order = ( + final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_last_successful_stream_ordering( + self._destination, self._last_successful_stream_order + ) else: break except NotRetryingDestination as e: @@ -337,6 +376,8 @@ async def _transaction_transmission_loop(self) -> None: (e.retry_last_ts + e.retry_interval) / 1000.0 ), ) + + self._catching_up = True except FederationDeniedError as e: logger.info(e) except HttpResponseException as e: @@ -346,6 +387,8 @@ async def _transaction_transmission_loop(self) -> None: e.code, e, ) + + # XXX REVIEW should we be catching up? except RequestSendFailed as e: logger.warning( "TX [%s] Failed to send transaction: %s", self._destination, e @@ -365,6 +408,99 @@ async def _transaction_transmission_loop(self) -> None: # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False + async def _catch_up_transmission_loop(self) -> None: + if self._last_successful_stream_order is None: + # first catch-up, so get from database + self._last_successful_stream_order = await self._store.get_last_successful_stream_ordering( + self._destination + ) + + if self._last_successful_stream_order is None: + # if it's still None, then this means we don't have the information + # in our database (oh, the perils of being a new feature). + # So we can't actually do anything here, and in this case, we don't + # know what to catch up, sadly. + # Trying to catch up right now is futile, so let's stop. + self._catching_up = False + return + + if self._catch_up_max_stream_order is None: + # this is our first catch-up so we need to determine how much we + # want to catch-up. + if self._pending_pdus: + # we have PDUs already in the main queue so no need to ask the + # database + first_non_catch_up_pdu, _ = self._pending_pdus[0] + # -1 because we wish to exclude that one — we don't need to catch + # it up as it's in our main queue + self._catch_up_max_stream_order = ( + first_non_catch_up_pdu.internal_metadata.stream_ordering - 1 + ) + else: + # we don't have any PDUs in the main queue so instead find out + # the largest stream order that we know of that has, once upon a + # time, been queued for this destination (i.e. this is what we + # *should* have sent if the remote server was reachable). + self._catch_up_max_stream_order = await self._store.get_largest_destination_rooms_stream_order( + self._destination + ) + if self._catch_up_max_stream_order is None: + # not enough info to catch up + self._catching_up = False + return + + # get 50 catchup room/PDUs + while self._last_successful_stream_order < self._catch_up_max_stream_order: + event_ids = await self._store.get_catch_up_room_event_ids( + self._destination, + self._last_successful_stream_order, + self._catch_up_max_stream_order, + ) + + if not event_ids: + # I don't believe this *should* happen unless someone has been + # tinkering with the database, but I also have limited foresight, + # so let's handle this properly + logger.warning( + "No event IDs found for catch-up: " + "last successful = %d, max catch up = %d", + self._last_successful_stream_order, + self._catch_up_max_stream_order + ) + self._catching_up = False + break + + # fetch the relevant events from the event store + events = await self._store.get_events_as_list( + event_ids + ) # XXX REVIEW: redact behaviour and allow_rejected ??? + + # zip them together with their stream orderings + catch_up_pdus = [ + (event, event.internal_metadata.stream_ordering) for event in events + ] + + if not catch_up_pdus: + break + + success = await self._transaction_manager.send_new_transaction( + self._destination, catch_up_pdus, [] + ) + if success: + sent_transactions_counter.inc() + final_pdu, _ = catch_up_pdus[-1] + self._last_successful_stream_order = ( + final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_last_successful_stream_ordering( + self._destination, self._last_successful_stream_order + ) + else: + return + + # once we have reached this point, catch-up is done! + self._catching_up = False + def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: return @@ -408,6 +544,21 @@ async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]: return (edus, now_stream_id) + def reset_catch_up_state(self) -> None: + """ + Resets the catch-up state of this destination. + This does the following: + - marks catch-up mode + - unsets the catch-up limit (max. stream order) + so that it is reset to the highest catch-up next time we have a + chance to catch up + - empties the main PDU queue as any PDUs in it will now be handled + by catch-up (because the max stream order limit was unset). + """ + self._pending_pdus = [] + self._catching_up = True + self._catch_up_max_stream_order = None + async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]: last_device_stream_id = self._last_device_stream_id to_device_stream_id = self._store.get_to_device_stream_token() From d232200b3a610f277ec3c7e28e3c8a0a1e059fa6 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 14 Aug 2020 19:57:03 +0100 Subject: [PATCH 06/34] Add tests! --- tests/federation/test_federation_catch_up.py | 347 +++++++++++++++++++ 1 file changed, 347 insertions(+) create mode 100644 tests/federation/test_federation_catch_up.py diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py new file mode 100644 index 000000000000..f8169708115e --- /dev/null +++ b/tests/federation/test_federation_catch_up.py @@ -0,0 +1,347 @@ +from asyncio.futures import Future +from typing import List, Tuple + +from mock import Mock + +from twisted.internet import defer + +from synapse.events import EventBase +from synapse.federation.sender import PerDestinationQueue, TransactionManager +from synapse.federation.units import Edu +from synapse.rest import admin +from synapse.rest.client.v1 import login, room + +from tests.test_utils import event_injection +from tests.unittest import FederatingHomeserverTestCase, override_config + + +class FederationCatchUpTestCases(FederatingHomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver( + federation_transport_client=Mock(spec=["send_transaction"]), + ) + + def prepare(self, reactor, clock, hs): + # stub out get_current_hosts_in_room + state_handler = hs.get_state_handler() + + # need to do these Future shenanigans because someone awaits on this + # result + # This mock is crucial for destination_rooms to be populated. + fut = Future() + fut.set_result(["test", "host2"]) + state_handler.get_current_hosts_in_room = Mock(return_value=fut) + + # whenever send_transaction is called, record the pdu data + self.pdus = [] + self.failed_pdus = [] + self.is_online = True + self.hs.get_federation_transport_client().send_transaction.side_effect = ( + self.record_transaction + ) + + def get_destination_room(self, room: str, destination: str = "host2") -> dict: + """ + Gets the destination_rooms entry for a (destination, room_id) pair. + + Args: + room: room ID + destination: what destination, default is "host2" + + Returns: + Dictionary of { event_id: str, stream_ordering: int } + """ + return self.get_success( + self.hs.get_datastore().db.simple_select_one( + table="destination_rooms", + keyvalues={"destination": destination, "room_id": room}, + retcols=["event_id", "stream_ordering"], + ) + ) + + def make_fake_destination_queue( + self, destination: str = "host2" + ) -> Tuple[PerDestinationQueue, List[EventBase]]: + """ + Makes a fake per-destination queue. + """ + transaction_manager = TransactionManager(self.hs) + per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination) + results_list = [] + + async def fake_send( + destination_tm: str, + pending_pdus: List[Tuple[EventBase, int]], + _pending_edus: List[Edu], + ): + assert destination == destination_tm + results_list.extend([row[0] for row in pending_pdus]) + + transaction_manager.send_new_transaction = fake_send + + return per_dest_queue, results_list + + def record_transaction(self, txn, json_cb): + if self.is_online: + data = json_cb() + self.pdus.extend(data["pdus"]) + return defer.succeed({}) + else: + data = json_cb() + self.failed_pdus.extend(data["pdus"]) + return defer.fail(IOError("Failed to connect because this is a test!")) + + @override_config({"send_federation": True}) # critical (1) to federate + def test_catch_up_from_blank_state(self): + """ + Runs an overall test of federation catch-up from scratch. + Further tests will focus on more narrow aspects and edge-cases, but I + hope to provide an overall view with this test. + """ + # bring the other server online + self.is_online = True + + # let's make some events for the other server to receive + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + room_2 = self.helper.create_room_as("u1", tok=u1_token) + + # also critical (2) to federate + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + + self.helper.send_state( + room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token + ) + + # check: PDU received for topic event + self.assertEqual(len(self.pdus), 1) + self.assertEqual(self.pdus[0]["type"], "m.room.topic") + + # take the remote offline + self.is_online = False + + # send another event + self.helper.send(room_1, "hi user!", tok=u1_token) + + # check: things didn't go well since the remote is down + self.assertEqual(len(self.failed_pdus), 1) + self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!") + + # let's delete the federation transmission queue + # (this pretends we are starting up fresh.) + self.assertFalse( + self.hs.get_federation_sender() + ._per_destination_queues["host2"] + .transmission_loop_running + ) + del self.hs.get_federation_sender()._per_destination_queues["host2"] + + # let's also clear any backoffs + self.get_success( + self.hs.get_datastore().set_destination_retry_timings("host2", None, 0, 0) + ) + + # bring the remote online and clear the received pdu list + self.is_online = True + self.pdus = [] + + # now we need to initiate a federation transaction somehow… + # to do that, let's send another event (because it's simple to do) + # (do it to another room otherwise the catch-up logic decides it doesn't + # need to catch up room_1 — something I overlooked when first writing + # this test) + self.helper.send(room_2, "wombats!", tok=u1_token) + + # we should now have received both PDUs + self.assertEqual(len(self.pdus), 2) + self.assertEqual(self.pdus[0]["content"]["body"], "hi user!") + self.assertEqual(self.pdus[1]["content"]["body"], "wombats!") + + @override_config({"send_federation": True}) + def test_catch_up_destination_rooms_tracking(self): + """ + Tests that we populate the `destination_rooms` table as needed. + """ + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room = self.helper.create_room_as("u1", tok=u1_token) + + event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + + event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"] + + row_1 = self.get_destination_room(room) + + event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] + + row_2 = self.get_destination_room(room) + + # check: events correctly registered in order + self.assertEqual(row_1["event_id"], event_id_1) + self.assertEqual(row_2["event_id"], event_id_2) + self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1) + + @override_config({"send_federation": True}) + def test_catch_up_last_successful_stream_ordering_tracking(self): + """ + Tests that we populate the `destination_rooms` table as needed. + """ + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room = self.helper.create_room_as("u1", tok=u1_token) + + # take the remote offline + self.is_online = False + + event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + + self.helper.send(room, "wombats!", tok=u1_token)["event_id"] + + self.pump() + + lsso_1 = self.get_success( + self.hs.get_datastore().get_last_successful_stream_ordering("host2") + ) + + self.assertIsNone( + lsso_1, + "There should be no last successful stream ordering for an always-offline destination", + ) + + # bring the remote offline + self.is_online = True + + event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] + + lsso_2 = self.get_success( + self.hs.get_datastore().get_last_successful_stream_ordering("host2") + ) + row_2 = self.get_destination_room(room) + + self.assertEqual( + self.pdus[0]["content"]["body"], + "rabbits!", + "Test fault: didn't receive the right PDU", + ) + self.assertEqual( + row_2["event_id"], + event_id_2, + "Test fault: destination_rooms not updated correctly", + ) + self.assertEqual( + lsso_2, + row_2["stream_ordering"], + "Send succeeded but not marked as last_successful_stream_ordering", + ) + + @override_config({"send_federation": True}) + def test_catch_up_loop_no_pdus_in_main_queue(self): + """ + Tests, somewhat more synthetically, behaviour of + _catch_up_transmission_loop when there aren't any PDUs in the main queue. + """ + + # ARRANGE + per_dest_queue, sent_pdus = self.make_fake_destination_queue() + + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + room_2 = self.helper.create_room_as("u1", tok=u1_token) + room_3 = self.helper.create_room_as("u1", tok=u1_token) + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join") + + # create some events to play with + + self.helper.send(room_1, "you hear me!!", tok=u1_token) + event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"] + self.helper.send(room_3, "Matrix!", tok=u1_token) + event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"] + event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"] + + # destination_rooms should already be populated, but let us pretend that we already + # delivered up to and including event id 2 + + event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2)) + + self.get_success( + self.hs.get_datastore().set_last_successful_stream_ordering( + "host2", event_2.internal_metadata.stream_ordering + ) + ) + + # ACT + self.get_success(per_dest_queue._catch_up_transmission_loop()) + + # ASSERT, noticing in particular: + # - event 3 not sent out, because event 5 replaces it + # - order is least recent first, so event 5 comes after event 4 + self.assertEqual(len(sent_pdus), 2) + self.assertEqual(sent_pdus[0].event_id, event_id_4) + self.assertEqual(sent_pdus[1].event_id, event_id_5) + + @override_config({"send_federation": True}) + def test_catch_up_loop_with_pdus_in_main_queue(self): + """ + Tests, somewhat more synthetically, behaviour of + _catch_up_transmission_loop when there aren't any PDUs in the main queue. + """ + + # ARRANGE + per_dest_queue, sent_pdus = self.make_fake_destination_queue() + + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + room_2 = self.helper.create_room_as("u1", tok=u1_token) + room_3 = self.helper.create_room_as("u1", tok=u1_token) + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join") + + # create some events to play with + + self.helper.send(room_1, "you hear me!!", tok=u1_token) + event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"] + self.helper.send(room_3, "Matrix!", tok=u1_token) + event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"] + event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"] + + # put event 5 in the main queue — assume it's the cause of us triggering a + # catch-up (or is otherwise sent after retry backoff ends). + # (Block the transmission loop from running by marking it as already + # running, because we manually invoke the catch-up loop for testing + # purposes.) + per_dest_queue.transmission_loop_running = True + event_5 = self.get_success(self.hs.get_datastore().get_event(event_id_5)) + per_dest_queue.send_pdu(event_5, 1) + + # destination_rooms should already be populated, but let us pretend that we already + # delivered up to and including event id 2 + + event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2)) + + self.get_success( + self.hs.get_datastore().set_last_successful_stream_ordering( + "host2", event_2.internal_metadata.stream_ordering + ) + ) + + # ACT + self.get_success(per_dest_queue._catch_up_transmission_loop()) + + # ASSERT, noticing in particular: + # - event 3 not sent out, because event 5 replaces it + # - event 5 is not sent out, because it's already in our main PDU queue + self.assertEqual(len(sent_pdus), 1) + self.assertEqual(sent_pdus[0].event_id, event_id_4) From d910798c4d1d769d2623dffaca994168e0da1ba6 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 14 Aug 2020 20:08:11 +0100 Subject: [PATCH 07/34] Track async/await and db_pool transitions --- .../storage/databases/main/transactions.py | 12 +++++----- tests/federation/test_federation_catch_up.py | 22 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 2ee8c95262fc..70c62fc7f2fa 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -273,7 +273,7 @@ def get_last_successful_stream_ordering(self, destination: str): Args: destination: the destination we have successfully sent to """ - return self.db.simple_select_one_onecol( + return self.db_pool.simple_select_one_onecol( "destinations", {"destination": destination}, "last_successful_stream_ordering", @@ -293,7 +293,7 @@ def set_last_successful_stream_ordering( last_successful_stream_ordering: the stream_ordering of the most recent successfully-sent PDU """ - return self.db.simple_upsert( + return self.db_pool.simple_upsert( "destinations", keyvalues={"destination": destination}, values={"last_successful_stream_ordering": last_successful_stream_ordering}, @@ -321,7 +321,7 @@ def get_catch_up_room_event_ids( Returns: event_ids """ - return self.db.runInteraction( + return self.db_pool.runInteraction( "get_catch_up_room_event_ids", self._get_catch_up_room_event_ids_txn, destination, @@ -354,7 +354,7 @@ def get_largest_destination_rooms_stream_order(self, destination: str): Returns the largest stream_ordering from the destination_rooms table that corresponds to this destination. """ - return self.db.runInteraction( + return self.db_pool.runInteraction( "get_largest_destination_rooms_stream_order", self._get_largest_destination_rooms_stream_order_txn, destination, @@ -389,9 +389,9 @@ def store_destination_rooms_entries( event_id: the ID of the event stream_ordering: the stream_ordering of the event """ - return self.db.runInteraction( + return self.db_pool.runInteraction( "store_destination_rooms_entries", - self.db.simple_upsert_many_txn, + self.db_pool.simple_upsert_many_txn, "destination_rooms", ["destination", "room_id"], [(d, room_id) for d in destinations], diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index f8169708115e..2cb7d78b0e30 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -58,7 +58,7 @@ def get_destination_room(self, room: str, destination: str = "host2") -> dict: Dictionary of { event_id: str, stream_ordering: int } """ return self.get_success( - self.hs.get_datastore().db.simple_select_one( + self.hs.get_datastore().db_pool.simple_select_one( table="destination_rooms", keyvalues={"destination": destination, "room_id": room}, retcols=["event_id", "stream_ordering"], @@ -114,8 +114,8 @@ def test_catch_up_from_blank_state(self): room_2 = self.helper.create_room_as("u1", tok=u1_token) # also critical (2) to federate - event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") - event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + self.get_success(event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")) + self.get_success(event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")) self.helper.send_state( room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token @@ -174,7 +174,7 @@ def test_catch_up_destination_rooms_tracking(self): u1_token = self.login("u1", "you the one") room = self.helper.create_room_as("u1", tok=u1_token) - event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + self.get_success(event_injection.inject_member_event(self.hs, room, "@user:host2", "join")) event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"] @@ -201,7 +201,7 @@ def test_catch_up_last_successful_stream_ordering_tracking(self): # take the remote offline self.is_online = False - event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + self.get_success(event_injection.inject_member_event(self.hs, room, "@user:host2", "join")) self.helper.send(room, "wombats!", tok=u1_token)["event_id"] @@ -257,9 +257,9 @@ def test_catch_up_loop_no_pdus_in_main_queue(self): room_1 = self.helper.create_room_as("u1", tok=u1_token) room_2 = self.helper.create_room_as("u1", tok=u1_token) room_3 = self.helper.create_room_as("u1", tok=u1_token) - event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") - event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") - event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join") + self.get_success(event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")) + self.get_success(event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")) + self.get_success(event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")) # create some events to play with @@ -305,9 +305,9 @@ def test_catch_up_loop_with_pdus_in_main_queue(self): room_1 = self.helper.create_room_as("u1", tok=u1_token) room_2 = self.helper.create_room_as("u1", tok=u1_token) room_3 = self.helper.create_room_as("u1", tok=u1_token) - event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") - event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") - event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join") + self.get_success(event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")) + self.get_success(event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")) + self.get_success(event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")) # create some events to play with From 74a6f4f1379d61f3d110a4f56da65b370ff36afb Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 14 Aug 2020 20:11:24 +0100 Subject: [PATCH 08/34] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/8096.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8096.bugfix diff --git a/changelog.d/8096.bugfix b/changelog.d/8096.bugfix new file mode 100644 index 000000000000..5097dce8681d --- /dev/null +++ b/changelog.d/8096.bugfix @@ -0,0 +1 @@ +Fill remote homeservers in on events they may have missed in rooms during a period of unreachability. From c1b32ae494d07e4e2935373c1f111c8ffc31c8b4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 14 Aug 2020 20:20:25 +0100 Subject: [PATCH 09/34] Antilint --- .../sender/per_destination_queue.py | 8 ++-- .../storage/databases/main/transactions.py | 12 ++++-- tests/federation/test_federation_catch_up.py | 40 ++++++++++++++----- 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index c37101c26cef..c06aef1f2406 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,7 +15,7 @@ # limitations under the License. import datetime import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast from prometheus_client import Counter @@ -494,7 +494,7 @@ async def _catch_up_transmission_loop(self) -> None: "No event IDs found for catch-up: " "last successful = %d, max catch up = %d", self._last_successful_stream_order, - self._catch_up_max_stream_order + self._catch_up_max_stream_order, ) self._catching_up = False break @@ -518,8 +518,8 @@ async def _catch_up_transmission_loop(self) -> None: if success: sent_transactions_counter.inc() final_pdu, _ = catch_up_pdus[-1] - self._last_successful_stream_order = ( - final_pdu.internal_metadata.stream_ordering + self._last_successful_stream_order = cast( + int, final_pdu.internal_metadata.stream_ordering ) await self._store.set_last_successful_stream_ordering( self._destination, self._last_successful_stream_order diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 70c62fc7f2fa..1613f2149161 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ # limitations under the License. import logging from collections import namedtuple -from typing import List, Optional +from typing import Collection, List, Optional from canonicaljson import encode_canonical_json @@ -361,7 +361,9 @@ def get_largest_destination_rooms_stream_order(self, destination: str): ) @staticmethod - def _get_largest_destination_rooms_stream_order_txn(txn, destination: str) -> Optional[int]: + def _get_largest_destination_rooms_stream_order_txn( + txn, destination: str + ) -> Optional[int]: txn.execute( """ SELECT stream_ordering @@ -379,7 +381,11 @@ def _get_largest_destination_rooms_stream_order_txn(txn, destination: str) -> Op return None def store_destination_rooms_entries( - self, destinations: List[str], room_id: str, event_id: str, stream_ordering: int + self, + destinations: Collection[str], + room_id: str, + event_id: str, + stream_ordering: int, ): """ Updates or creates `destination_rooms` entries in batch for a single event. diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 2cb7d78b0e30..a021658ac1ce 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -114,8 +114,12 @@ def test_catch_up_from_blank_state(self): room_2 = self.helper.create_room_as("u1", tok=u1_token) # also critical (2) to federate - self.get_success(event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")) - self.get_success(event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")) + self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + ) self.helper.send_state( room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token @@ -174,7 +178,9 @@ def test_catch_up_destination_rooms_tracking(self): u1_token = self.login("u1", "you the one") room = self.helper.create_room_as("u1", tok=u1_token) - self.get_success(event_injection.inject_member_event(self.hs, room, "@user:host2", "join")) + self.get_success( + event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + ) event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"] @@ -201,7 +207,9 @@ def test_catch_up_last_successful_stream_ordering_tracking(self): # take the remote offline self.is_online = False - self.get_success(event_injection.inject_member_event(self.hs, room, "@user:host2", "join")) + self.get_success( + event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + ) self.helper.send(room, "wombats!", tok=u1_token)["event_id"] @@ -257,9 +265,15 @@ def test_catch_up_loop_no_pdus_in_main_queue(self): room_1 = self.helper.create_room_as("u1", tok=u1_token) room_2 = self.helper.create_room_as("u1", tok=u1_token) room_3 = self.helper.create_room_as("u1", tok=u1_token) - self.get_success(event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")) - self.get_success(event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")) - self.get_success(event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")) + self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join") + ) # create some events to play with @@ -305,9 +319,15 @@ def test_catch_up_loop_with_pdus_in_main_queue(self): room_1 = self.helper.create_room_as("u1", tok=u1_token) room_2 = self.helper.create_room_as("u1", tok=u1_token) room_3 = self.helper.create_room_as("u1", tok=u1_token) - self.get_success(event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")) - self.get_success(event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")) - self.get_success(event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")) + self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join") + ) # create some events to play with From 5de9313524e6b64d0fe56a1a0454c93874886a3e Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 17 Aug 2020 12:24:55 +0100 Subject: [PATCH 10/34] Fix up test --- tests/handlers/test_typing.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 64afd581bc42..9b3d034e2d86 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -76,6 +76,7 @@ def make_homeserver(self, reactor, clock): "get_destination_retry_timings", "get_devices_by_remote", "maybe_store_room_on_invite", + "get_last_successful_stream_ordering", # Bits that user_directory needs "get_user_directory_stream_pos", "get_current_state_deltas", @@ -160,6 +161,10 @@ def get_users_in_room(room_id): None ) + self.datastore.get_last_successful_stream_ordering = lambda *args, **kwargs: defer.succeed( + None + ) + def test_started_typing_local(self): self.room_members = [U_APPLE, U_BANANA] From 759e0270c5b5a5b052493b8d454f325fc3a18143 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 17 Aug 2020 12:29:22 +0100 Subject: [PATCH 11/34] Remove unused method --- .../federation/sender/per_destination_queue.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index c06aef1f2406..1e1a426a5457 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -573,21 +573,6 @@ async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]: return (edus, now_stream_id) - def reset_catch_up_state(self) -> None: - """ - Resets the catch-up state of this destination. - This does the following: - - marks catch-up mode - - unsets the catch-up limit (max. stream order) - so that it is reset to the highest catch-up next time we have a - chance to catch up - - empties the main PDU queue as any PDUs in it will now be handled - by catch-up (because the max stream order limit was unset). - """ - self._pending_pdus = [] - self._catching_up = True - self._catch_up_max_stream_order = None - async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]: last_device_stream_id = self._last_device_stream_id to_device_stream_id = self._store.get_to_device_stream_token() From 6c5266645311feb5aba0b8717113f26c92b97555 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 17 Aug 2020 12:40:09 +0100 Subject: [PATCH 12/34] Use Python 3.5-friendly Collection type --- synapse/storage/databases/main/transactions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 1613f2149161..d55def4f504b 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,13 +14,14 @@ # limitations under the License. import logging from collections import namedtuple -from typing import Collection, List, Optional +from typing import List, Optional from canonicaljson import encode_canonical_json from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool +from synapse.types import Collection from synapse.util.caches.expiringcache import ExpiringCache db_binary_type = memoryview From 9ba56cbf8a428cc545f2e3ec4f7e605247b3e814 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 17 Aug 2020 18:23:51 +0100 Subject: [PATCH 13/34] Fix logic bug in prior code --- synapse/storage/databases/main/transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index d55def4f504b..3d9010265cd3 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -160,7 +160,7 @@ def _get_destination_retry_timings(self, txn, destination): allow_none=True, ) - if result and result["retry_last_ts"] > 0: + if result and result["retry_last_ts"] and result["retry_last_ts"] > 0: return result else: return None From af139481b9f622eb8a4f2e5cb76cb6480040b97f Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Thu, 20 Aug 2020 09:37:26 +0100 Subject: [PATCH 14/34] Update changelog.d/8096.bugfix Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- changelog.d/8096.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/8096.bugfix b/changelog.d/8096.bugfix index 5097dce8681d..bda882866f13 100644 --- a/changelog.d/8096.bugfix +++ b/changelog.d/8096.bugfix @@ -1 +1 @@ -Fill remote homeservers in on events they may have missed in rooms during a period of unreachability. +Send events to homeservers that they may have missed in rooms during a period of unreachability. From 558af38cc2748946c308eacf096a096810e5f210 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 20 Aug 2020 10:18:49 +0100 Subject: [PATCH 15/34] Handle suggestions from review --- .../sender/per_destination_queue.py | 12 +++-- .../storage/databases/main/transactions.py | 49 ++++++++++--------- tests/federation/test_federation_catch_up.py | 12 ++--- 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 1e1a426a5457..1cc44bd91543 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -98,7 +98,7 @@ def __init__( # _catching_up is flipped to False. self._catching_up = True # the maximum stream order to catch up to (PDUs after this are expected - # to be in the main transmission queue), inclusive + # to be in the main transmission queue), inclusive self._catch_up_max_stream_order = None # type: Optional[int] # Cache of the last successfully-transmitted stream ordering for this # destination (we are the only updater so this is safe) @@ -478,7 +478,7 @@ async def _catch_up_transmission_loop(self) -> None: self._catching_up = False return - # get 50 catchup room/PDUs + # get at most 50 catchup room/PDUs while self._last_successful_stream_order < self._catch_up_max_stream_order: event_ids = await self._store.get_catch_up_room_event_ids( self._destination, @@ -491,7 +491,7 @@ async def _catch_up_transmission_loop(self) -> None: # tinkering with the database, but I also have limited foresight, # so let's handle this properly logger.warning( - "No event IDs found for catch-up: " + "Unexpectedly, no event IDs were found for catch-up: " "last successful = %d, max catch up = %d", self._last_successful_stream_order, self._catch_up_max_stream_order, @@ -500,9 +500,13 @@ async def _catch_up_transmission_loop(self) -> None: break # fetch the relevant events from the event store + # - redacted behaviour of REDACT is fine, since we only send metadata + # of redacted events to the destination. + # - don't need to worry about rejected events as we do not actively + # forward received events over federation. events = await self._store.get_events_as_list( event_ids - ) # XXX REVIEW: redact behaviour and allow_rejected ??? + ) # zip them together with their stream orderings catch_up_pdus = [ diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 3d9010265cd3..0b8b04a9abb7 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ # limitations under the License. import logging from collections import namedtuple -from typing import List, Optional +from typing import List, Optional, Iterable from canonicaljson import encode_canonical_json @@ -266,7 +266,7 @@ def _cleanup_transactions_txn(txn): "_cleanup_transactions", _cleanup_transactions_txn ) - def get_last_successful_stream_ordering(self, destination: str): + async def get_last_successful_stream_ordering(self, destination: str) -> Optional[int]: """ Gets the stream ordering of the PDU most-recently successfully sent to the specified destination. @@ -274,7 +274,7 @@ def get_last_successful_stream_ordering(self, destination: str): Args: destination: the destination we have successfully sent to """ - return self.db_pool.simple_select_one_onecol( + return await self.db_pool.simple_select_one_onecol( "destinations", {"destination": destination}, "last_successful_stream_ordering", @@ -282,9 +282,9 @@ def get_last_successful_stream_ordering(self, destination: str): desc="get_last_successful_stream_ordering", ) - def set_last_successful_stream_ordering( + async def set_last_successful_stream_ordering( self, destination: str, last_successful_stream_ordering: int - ): + ) -> None: """ Marks that we have successfully sent the PDUs up to and including the one specified. @@ -294,23 +294,23 @@ def set_last_successful_stream_ordering( last_successful_stream_ordering: the stream_ordering of the most recent successfully-sent PDU """ - return self.db_pool.simple_upsert( + return await self.db_pool.simple_upsert( "destinations", keyvalues={"destination": destination}, values={"last_successful_stream_ordering": last_successful_stream_ordering}, desc="set_last_successful_stream_ordering", ) - def get_catch_up_room_event_ids( + async def get_catch_up_room_event_ids( self, destination: str, last_successful_stream_ordering: int, max_stream_order: int, - ): + ) -> List[str]: """ - Returns 50 event IDs and their corresponding stream_orderings that - correspond to the least recent events that have not yet been sent to - a destination. + Returns at most 50 event IDs and their corresponding stream_orderings + that correspond to the oldest events that have not yet been sent to + the destination. Args: destination: the destination in question @@ -320,9 +320,9 @@ def get_catch_up_room_event_ids( to return events for. Returns: - event_ids + list of event_ids """ - return self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "get_catch_up_room_event_ids", self._get_catch_up_room_event_ids_txn, destination, @@ -350,12 +350,12 @@ def _get_catch_up_room_event_ids_txn( event_ids = [row[0] for row in txn] return event_ids - def get_largest_destination_rooms_stream_order(self, destination: str): + async def get_largest_destination_rooms_stream_order(self, destination: str) -> Optional[int]: """ Returns the largest stream_ordering from the destination_rooms table that corresponds to this destination. """ - return self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "get_largest_destination_rooms_stream_order", self._get_largest_destination_rooms_stream_order_txn, destination, @@ -378,30 +378,33 @@ def _get_largest_destination_rooms_stream_order_txn( rows = [r[0] for r in txn] if rows: return rows[0] - else: - return None + return None - def store_destination_rooms_entries( + async def store_destination_rooms_entries( self, - destinations: Collection[str], + destinations: Iterable[str], room_id: str, event_id: str, stream_ordering: int, - ): + ) -> None: """ Updates or creates `destination_rooms` entries in batch for a single event. + Args: destinations: list of destinations room_id: the room_id of the event event_id: the ID of the event stream_ordering: the stream_ordering of the event """ - return self.db_pool.runInteraction( + + rows = [(destination, room_id) for destination in destinations] + + return await self.db_pool.runInteraction( "store_destination_rooms_entries", self.db_pool.simple_upsert_many_txn, "destination_rooms", ["destination", "room_id"], - [(d, room_id) for d in destinations], + rows, ["event_id", "stream_ordering"], - [(event_id, stream_ordering)] * len(destinations), + [(event_id, stream_ordering)] * len(rows), ) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index a021658ac1ce..c70fc9cc2f9d 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -11,7 +11,7 @@ from synapse.rest import admin from synapse.rest.client.v1 import login, room -from tests.test_utils import event_injection +from tests.test_utils import event_injection, make_awaitable from tests.unittest import FederatingHomeserverTestCase, override_config @@ -31,12 +31,8 @@ def prepare(self, reactor, clock, hs): # stub out get_current_hosts_in_room state_handler = hs.get_state_handler() - # need to do these Future shenanigans because someone awaits on this - # result # This mock is crucial for destination_rooms to be populated. - fut = Future() - fut.set_result(["test", "host2"]) - state_handler.get_current_hosts_in_room = Mock(return_value=fut) + state_handler.get_current_hosts_in_room = Mock(return_value=make_awaitable(["test", "host2"])) # whenever send_transaction is called, record the pdu data self.pdus = [] @@ -160,8 +156,8 @@ def test_catch_up_from_blank_state(self): # now we need to initiate a federation transaction somehow… # to do that, let's send another event (because it's simple to do) # (do it to another room otherwise the catch-up logic decides it doesn't - # need to catch up room_1 — something I overlooked when first writing - # this test) + # need to catch up room_1 — something I overlooked when first writing + # this test) self.helper.send(room_2, "wombats!", tok=u1_token) # we should now have received both PDUs From 44765e95104e5eea7d6867f00c5deb12c9684de5 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 20 Aug 2020 10:28:16 +0100 Subject: [PATCH 16/34] How could we ever forget you, ./scripts-dev/lint.sh? --- synapse/federation/sender/per_destination_queue.py | 4 +--- synapse/storage/databases/main/transactions.py | 11 +++++++---- tests/federation/test_federation_catch_up.py | 5 +++-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 1cc44bd91543..c23365d1f8c1 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -504,9 +504,7 @@ async def _catch_up_transmission_loop(self) -> None: # of redacted events to the destination. # - don't need to worry about rejected events as we do not actively # forward received events over federation. - events = await self._store.get_events_as_list( - event_ids - ) + events = await self._store.get_events_as_list(event_ids) # zip them together with their stream orderings catch_up_pdus = [ diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 0b8b04a9abb7..6c6f51cdd00f 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,14 +14,13 @@ # limitations under the License. import logging from collections import namedtuple -from typing import List, Optional, Iterable +from typing import Iterable, List, Optional from canonicaljson import encode_canonical_json from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool -from synapse.types import Collection from synapse.util.caches.expiringcache import ExpiringCache db_binary_type = memoryview @@ -266,7 +265,9 @@ def _cleanup_transactions_txn(txn): "_cleanup_transactions", _cleanup_transactions_txn ) - async def get_last_successful_stream_ordering(self, destination: str) -> Optional[int]: + async def get_last_successful_stream_ordering( + self, destination: str + ) -> Optional[int]: """ Gets the stream ordering of the PDU most-recently successfully sent to the specified destination. @@ -350,7 +351,9 @@ def _get_catch_up_room_event_ids_txn( event_ids = [row[0] for row in txn] return event_ids - async def get_largest_destination_rooms_stream_order(self, destination: str) -> Optional[int]: + async def get_largest_destination_rooms_stream_order( + self, destination: str + ) -> Optional[int]: """ Returns the largest stream_ordering from the destination_rooms table that corresponds to this destination. diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index c70fc9cc2f9d..1d4cbcfab1d0 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -1,4 +1,3 @@ -from asyncio.futures import Future from typing import List, Tuple from mock import Mock @@ -32,7 +31,9 @@ def prepare(self, reactor, clock, hs): state_handler = hs.get_state_handler() # This mock is crucial for destination_rooms to be populated. - state_handler.get_current_hosts_in_room = Mock(return_value=make_awaitable(["test", "host2"])) + state_handler.get_current_hosts_in_room = Mock( + return_value=make_awaitable(["test", "host2"]) + ) # whenever send_transaction is called, record the pdu data self.pdus = [] From 56aaa17c8efe84bb3d20d11f30071e2808587bf8 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Wed, 26 Aug 2020 20:08:51 +0100 Subject: [PATCH 17/34] Apply suggestions from Rich's code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- .../databases/main/schema/delta/58/11recovery_after_outage.sql | 2 +- synapse/storage/databases/main/transactions.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql index c903419eb43d..5e331cc7c6cb 100644 --- a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql +++ b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql @@ -16,7 +16,7 @@ -- after there has been a connectivity problem for any reason. -- This stores, for each (destination, room) pair, the event_id and stream_ordering --- of the latest event to be enqueued for transmission to that destination. +-- of the latest event for that destination. CREATE TABLE IF NOT EXISTS destination_rooms ( -- the destination in question destination TEXT NOT NULL, diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 6c6f51cdd00f..052dffcd1b7d 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -159,7 +159,7 @@ def _get_destination_retry_timings(self, txn, destination): allow_none=True, ) - if result and result["retry_last_ts"] and result["retry_last_ts"] > 0: + if result and result["retry_last_ts"]: return result else: return None From 84dbc43ce190397cab4be6c9fc38f59eaa190248 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Wed, 26 Aug 2020 20:13:15 +0100 Subject: [PATCH 18/34] Apply suggestions from Rich's code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/federation/sender/__init__.py | 2 +- synapse/federation/sender/per_destination_queue.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index d71ef966f8e3..e9d66a6a7b38 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -285,7 +285,7 @@ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_count.inc() - # track the fact that we are enqueuing this PDU for these destinations, + # track the fact that we have a PDU for these destinations, # to allow us to perform catch-up later on if the remote is unreachable # for a while. await self.store.store_destination_rooms_entries( diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index c23365d1f8c1..a2497ad99dd1 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -97,9 +97,11 @@ def __init__( # New events will only be sent once this is finished, at which point # _catching_up is flipped to False. self._catching_up = True + # the maximum stream order to catch up to (PDUs after this are expected # to be in the main transmission queue), inclusive self._catch_up_max_stream_order = None # type: Optional[int] + # Cache of the last successfully-transmitted stream ordering for this # destination (we are the only updater so this is safe) self._last_successful_stream_order = None # type: Optional[int] @@ -152,7 +154,7 @@ def send_pdu(self, pdu: EventBase, order: int) -> None: order: an arbitrary order for the PDU — NOT the stream ordering """ if ( - self._catch_up_max_stream_order + self._catch_up_max_stream_order is not None and pdu.internal_metadata.stream_ordering <= self._catch_up_max_stream_order ): # we are in catch-up mode and this PDU is already scheduled to be From 2c740a79d37b580f2dbbde378165d3383c4757ee Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 26 Aug 2020 20:16:23 +0100 Subject: [PATCH 19/34] Foreign key on rooms, SQL comment --- .../main/schema/delta/58/11recovery_after_outage.sql | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql index 5e331cc7c6cb..e3644367a254 100644 --- a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql +++ b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql @@ -18,7 +18,10 @@ -- This stores, for each (destination, room) pair, the event_id and stream_ordering -- of the latest event for that destination. CREATE TABLE IF NOT EXISTS destination_rooms ( - -- the destination in question + -- the destination in question. + -- Can not be a foreign key because rows in the `destinations` table will + -- only be created when we back off or when we successfully send a + -- transaction. destination TEXT NOT NULL, -- the ID of the room in question room_id TEXT NOT NULL, @@ -26,7 +29,8 @@ CREATE TABLE IF NOT EXISTS destination_rooms ( stream_ordering INTEGER, -- the event_id of the event event_id TEXT NOT NULL, - PRIMARY KEY (destination, room_id) + PRIMARY KEY (destination, room_id), + FOREIGN KEY (room_id) REFERENCES rooms (room_id) ); -- this column tracks the stream_ordering of the event that was most recently From d77e444a854965c05c3e356093d9279bbe249f77 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 26 Aug 2020 20:20:43 +0100 Subject: [PATCH 20/34] NOT NULL, foreign key (events) --- .../main/schema/delta/58/11recovery_after_outage.sql | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql index e3644367a254..999ef83690a4 100644 --- a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql +++ b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql @@ -26,11 +26,14 @@ CREATE TABLE IF NOT EXISTS destination_rooms ( -- the ID of the room in question room_id TEXT NOT NULL, -- the stream_ordering of the event - stream_ordering INTEGER, + stream_ordering INTEGER NOT NULL, -- the event_id of the event event_id TEXT NOT NULL, PRIMARY KEY (destination, room_id), FOREIGN KEY (room_id) REFERENCES rooms (room_id) + ON DELETE CASCADE, + FOREIGN KEY (stream_ordering) REFERENCES events (stream_ordering) + ON DELETE CASCADE ); -- this column tracks the stream_ordering of the event that was most recently From 33874d4e547a21cec7413badade607578cefb519 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 26 Aug 2020 20:23:41 +0100 Subject: [PATCH 21/34] SQL column doc --- .../databases/main/schema/delta/58/11recovery_after_outage.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql index 999ef83690a4..00b2005c903c 100644 --- a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql +++ b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql @@ -38,5 +38,7 @@ CREATE TABLE IF NOT EXISTS destination_rooms ( -- this column tracks the stream_ordering of the event that was most recently -- successfully transmitted to the destination. +-- A value of NULL means that we have not sent an event successfully yet +-- (at least, not since the introduction of this column). ALTER TABLE destinations ADD COLUMN last_successful_stream_ordering INTEGER; From c1a2b680c48c742d80201e539cb1f8f9fd486fc1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 26 Aug 2020 20:31:57 +0100 Subject: [PATCH 22/34] Behaviour confirmed reasonable-seeming --- synapse/federation/sender/per_destination_queue.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index a2497ad99dd1..ad83db36bb35 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -246,9 +246,8 @@ async def _transaction_transmission_loop(self) -> None: # this keeps the catching-up logic simple await self._catch_up_transmission_loop() if self._catching_up: - # XXX if we aren't actually caught up still, shouldn't - # carry on to the main loop - # (but need to consider what we do in a failure...?) + # if we aren't actually caught up yet, shouldn't carry on to + # the main loop return pending_pdus = [] From 16eec5cbfa3ac42982f138efa7d56dcad8204c38 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 26 Aug 2020 20:49:50 +0100 Subject: [PATCH 23/34] The early bird gets the early return (hopefully with a worm as the return value) --- .../sender/per_destination_queue.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index ad83db36bb35..c36ec280f513 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -518,18 +518,19 @@ async def _catch_up_transmission_loop(self) -> None: success = await self._transaction_manager.send_new_transaction( self._destination, catch_up_pdus, [] ) - if success: - sent_transactions_counter.inc() - final_pdu, _ = catch_up_pdus[-1] - self._last_successful_stream_order = cast( - int, final_pdu.internal_metadata.stream_ordering - ) - await self._store.set_last_successful_stream_ordering( - self._destination, self._last_successful_stream_order - ) - else: + + if not success: return + sent_transactions_counter.inc() + final_pdu, _ = catch_up_pdus[-1] + self._last_successful_stream_order = cast( + int, final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_last_successful_stream_ordering( + self._destination, self._last_successful_stream_order + ) + # once we have reached this point, catch-up is done! self._catching_up = False From 92517e95fce8d863f4faa67ad9d7778482af8000 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 26 Aug 2020 21:01:06 +0100 Subject: [PATCH 24/34] Assertion on bug --- synapse/federation/sender/per_destination_queue.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index c36ec280f513..4c07d1f7fb90 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -506,15 +506,17 @@ async def _catch_up_transmission_loop(self) -> None: # - don't need to worry about rejected events as we do not actively # forward received events over federation. events = await self._store.get_events_as_list(event_ids) + if not events: + raise AssertionError( + "No events retrieved when we asked for %r. " + "This should not happen." % event_ids + ) # zip them together with their stream orderings catch_up_pdus = [ (event, event.internal_metadata.stream_ordering) for event in events ] - if not catch_up_pdus: - break - success = await self._transaction_manager.send_new_transaction( self._destination, catch_up_pdus, [] ) From ef4680df2241637851bcabdc877fd9b8f972595b Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 26 Aug 2020 21:01:48 +0100 Subject: [PATCH 25/34] Last successful stream ordering is about destinations --- synapse/federation/sender/per_destination_queue.py | 6 +++--- synapse/storage/databases/main/transactions.py | 4 ++-- tests/federation/test_federation_catch_up.py | 8 ++++---- tests/handlers/test_typing.py | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 4c07d1f7fb90..081e13f092da 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -363,7 +363,7 @@ async def _transaction_transmission_loop(self) -> None: self._last_successful_stream_order = ( final_pdu.internal_metadata.stream_ordering ) - await self._store.set_last_successful_stream_ordering( + await self._store.set_destination_last_successful_stream_ordering( self._destination, self._last_successful_stream_order ) else: @@ -441,7 +441,7 @@ async def _transaction_transmission_loop(self) -> None: async def _catch_up_transmission_loop(self) -> None: if self._last_successful_stream_order is None: # first catch-up, so get from database - self._last_successful_stream_order = await self._store.get_last_successful_stream_ordering( + self._last_successful_stream_order = await self._store.get_destination_last_successful_stream_ordering( self._destination ) @@ -529,7 +529,7 @@ async def _catch_up_transmission_loop(self) -> None: self._last_successful_stream_order = cast( int, final_pdu.internal_metadata.stream_ordering ) - await self._store.set_last_successful_stream_ordering( + await self._store.set_destination_last_successful_stream_ordering( self._destination, self._last_successful_stream_order ) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 052dffcd1b7d..7b4a85c9ea9f 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -265,7 +265,7 @@ def _cleanup_transactions_txn(txn): "_cleanup_transactions", _cleanup_transactions_txn ) - async def get_last_successful_stream_ordering( + async def get_destination_last_successful_stream_ordering( self, destination: str ) -> Optional[int]: """ @@ -283,7 +283,7 @@ async def get_last_successful_stream_ordering( desc="get_last_successful_stream_ordering", ) - async def set_last_successful_stream_ordering( + async def set_destination_last_successful_stream_ordering( self, destination: str, last_successful_stream_ordering: int ) -> None: """ diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 1d4cbcfab1d0..573ca9974f1f 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -213,7 +213,7 @@ def test_catch_up_last_successful_stream_ordering_tracking(self): self.pump() lsso_1 = self.get_success( - self.hs.get_datastore().get_last_successful_stream_ordering("host2") + self.hs.get_datastore().get_destination_last_successful_stream_ordering("host2") ) self.assertIsNone( @@ -227,7 +227,7 @@ def test_catch_up_last_successful_stream_ordering_tracking(self): event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] lsso_2 = self.get_success( - self.hs.get_datastore().get_last_successful_stream_ordering("host2") + self.hs.get_datastore().get_destination_last_successful_stream_ordering("host2") ) row_2 = self.get_destination_room(room) @@ -286,7 +286,7 @@ def test_catch_up_loop_no_pdus_in_main_queue(self): event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2)) self.get_success( - self.hs.get_datastore().set_last_successful_stream_ordering( + self.hs.get_datastore().set_destination_last_successful_stream_ordering( "host2", event_2.internal_metadata.stream_ordering ) ) @@ -349,7 +349,7 @@ def test_catch_up_loop_with_pdus_in_main_queue(self): event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2)) self.get_success( - self.hs.get_datastore().set_last_successful_stream_ordering( + self.hs.get_datastore().set_destination_last_successful_stream_ordering( "host2", event_2.internal_metadata.stream_ordering ) ) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 9b3d034e2d86..228f4e68fab4 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -161,7 +161,7 @@ def get_users_in_room(room_id): None ) - self.datastore.get_last_successful_stream_ordering = lambda *args, **kwargs: defer.succeed( + self.datastore.get_destination_last_successful_stream_ordering = lambda *args, **kwargs: defer.succeed( None ) From de5caf0928e3e8ead1806e6dea90705fa02b69ab Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 27 Aug 2020 07:01:32 +0100 Subject: [PATCH 26/34] Catch-up on all cases except federation denial --- synapse/federation/sender/per_destination_queue.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 081e13f092da..be262e64df86 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -409,6 +409,7 @@ async def _transaction_transmission_loop(self) -> None: # reset max catch up since we have dropped PDUs here self._catch_up_max_stream_order = None except FederationDeniedError as e: + # remote server is not in our federation whitelist logger.info(e) except HttpResponseException as e: logger.warning( @@ -418,7 +419,9 @@ async def _transaction_transmission_loop(self) -> None: e, ) - # XXX REVIEW should we be catching up? + self._catching_up = True + # reset max catch up since we have dropped PDUs here + self._catch_up_max_stream_order = None except RequestSendFailed as e: logger.warning( "TX [%s] Failed to send transaction: %s", self._destination, e @@ -428,12 +431,20 @@ async def _transaction_transmission_loop(self) -> None: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) + + self._catching_up = True + # reset max catch up since we have dropped PDUs here + self._catch_up_max_stream_order = None except Exception: logger.exception("TX [%s] Failed to send transaction", self._destination) for p, _ in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) + + self._catching_up = True + # reset max catch up since we have dropped PDUs here + self._catch_up_max_stream_order = None finally: # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False From 3e308f9e0c140f9933e1692c91de1218cead31f0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 27 Aug 2020 08:22:02 +0100 Subject: [PATCH 27/34] Don't explicitly store the event_id --- .../main/schema/delta/58/11recovery_after_outage.sql | 6 ++---- synapse/storage/databases/main/transactions.py | 1 + 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql index 00b2005c903c..4b6e49487a88 100644 --- a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql +++ b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql @@ -15,8 +15,8 @@ -- This schema delta alters the schema to enable 'catching up' remote homeservers -- after there has been a connectivity problem for any reason. --- This stores, for each (destination, room) pair, the event_id and stream_ordering --- of the latest event for that destination. +-- This stores, for each (destination, room) pair and stream_ordering of the +-- latest event for that destination. CREATE TABLE IF NOT EXISTS destination_rooms ( -- the destination in question. -- Can not be a foreign key because rows in the `destinations` table will @@ -27,8 +27,6 @@ CREATE TABLE IF NOT EXISTS destination_rooms ( room_id TEXT NOT NULL, -- the stream_ordering of the event stream_ordering INTEGER NOT NULL, - -- the event_id of the event - event_id TEXT NOT NULL, PRIMARY KEY (destination, room_id), FOREIGN KEY (room_id) REFERENCES rooms (room_id) ON DELETE CASCADE, diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 7b4a85c9ea9f..08ecf8ece5e1 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -340,6 +340,7 @@ def _get_catch_up_room_event_ids_txn( ) -> List[str]: q = """ SELECT event_id FROM destination_rooms + JOIN events USING (stream_ordering) WHERE destination = ? AND stream_ordering > ? AND stream_ordering <= ? ORDER BY stream_ordering From b0bdadd0bf8f5f4b68405b5ec5d200d8d3896e31 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 27 Aug 2020 08:25:39 +0100 Subject: [PATCH 28/34] Antilint --- tests/federation/test_federation_catch_up.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 573ca9974f1f..11eb565b3b44 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -213,7 +213,9 @@ def test_catch_up_last_successful_stream_ordering_tracking(self): self.pump() lsso_1 = self.get_success( - self.hs.get_datastore().get_destination_last_successful_stream_ordering("host2") + self.hs.get_datastore().get_destination_last_successful_stream_ordering( + "host2" + ) ) self.assertIsNone( @@ -227,7 +229,9 @@ def test_catch_up_last_successful_stream_ordering_tracking(self): event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] lsso_2 = self.get_success( - self.hs.get_datastore().get_destination_last_successful_stream_ordering("host2") + self.hs.get_datastore().get_destination_last_successful_stream_ordering( + "host2" + ) ) row_2 = self.get_destination_room(room) From 843403f2d41d21a908eba9fe5771f775c00207d6 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 27 Aug 2020 08:32:45 +0100 Subject: [PATCH 29/34] Remove review question --- synapse/federation/sender/per_destination_queue.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index be262e64df86..6eb4dc416c31 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -378,11 +378,6 @@ async def _transaction_transmission_loop(self) -> None: ), ) - # XXX REVIEW needs scrutiny - # to note: up to 50 pdus can be lost from the - # main queue by a transaction that triggers a backoff — do we - # clear the main queue now? I can see arguments for and against. - if e.retry_interval > 60 * 60 * 1000: # we won't retry for another hour! # (this suggests a significant outage) From b1fd67bdf018c32739e1ec2e44ebdc3fa8ffd251 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 27 Aug 2020 08:53:23 +0100 Subject: [PATCH 30/34] =?UTF-8?q?Fix=20wrong=20type=20signatures=20(even?= =?UTF-8?q?=20if=20str=20is=20Iterable[str]=E2=80=A6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- synapse/storage/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 181c3ec24994..207fa9d7df00 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1070,7 +1070,7 @@ async def simple_select_one_onecol( self, table: str, keyvalues: Dict[str, Any], - retcol: Iterable[str], + retcol: str, allow_none: bool = False, desc: str = "simple_select_one_onecol", ) -> Optional[Any]: @@ -1100,7 +1100,7 @@ def simple_select_one_onecol_txn( txn: LoggingTransaction, table: str, keyvalues: Dict[str, Any], - retcol: Iterable[str], + retcol: str, allow_none: bool = False, ) -> Optional[Any]: ret = cls.simple_select_onecol_txn( From e6890c77dfaac97ecdb0a37a3caae64478bab5d1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 27 Aug 2020 09:00:47 +0100 Subject: [PATCH 31/34] Fix the tests after removing event_id column --- .../storage/databases/main/transactions.py | 4 ++-- tests/federation/test_federation_catch_up.py | 23 ++++++++++++++----- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 08ecf8ece5e1..45eee028fbff 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -409,6 +409,6 @@ async def store_destination_rooms_entries( "destination_rooms", ["destination", "room_id"], rows, - ["event_id", "stream_ordering"], - [(event_id, stream_ordering)] * len(rows), + ["stream_ordering"], + [(stream_ordering,)] * len(rows), ) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 11eb565b3b44..84a01809114f 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -54,13 +54,24 @@ def get_destination_room(self, room: str, destination: str = "host2") -> dict: Returns: Dictionary of { event_id: str, stream_ordering: int } """ - return self.get_success( - self.hs.get_datastore().db_pool.simple_select_one( - table="destination_rooms", - keyvalues={"destination": destination, "room_id": room}, - retcols=["event_id", "stream_ordering"], + event_id, stream_ordering = self.get_success( + self.hs.get_datastore().db_pool.execute( + "test:get_destination_rooms", + None, + """ + SELECT event_id, stream_ordering + FROM destination_rooms dr + JOIN events USING (stream_ordering) + WHERE dr.destination = ? AND dr.room_id = ? + """, + destination, + room ) - ) + )[0] + return { + "event_id": event_id, + "stream_ordering": stream_ordering + } def make_fake_destination_queue( self, destination: str = "host2" From 7cfecf3b17377c66762ca6d23addd5374206cb8f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 27 Aug 2020 09:02:05 +0100 Subject: [PATCH 32/34] Antilint --- tests/federation/test_federation_catch_up.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 84a01809114f..117a69389177 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -65,13 +65,10 @@ def get_destination_room(self, room: str, destination: str = "host2") -> dict: WHERE dr.destination = ? AND dr.room_id = ? """, destination, - room + room, ) )[0] - return { - "event_id": event_id, - "stream_ordering": stream_ordering - } + return {"event_id": event_id, "stream_ordering": stream_ordering} def make_fake_destination_queue( self, destination: str = "host2" From bf51d2ffc7840af0f7f7d7c34fa469e2bff0ed33 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 27 Aug 2020 09:10:07 +0100 Subject: [PATCH 33/34] Also fix `simple_select_onecol_txn` --- synapse/storage/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 207fa9d7df00..53858db9a33d 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1120,7 +1120,7 @@ def simple_select_onecol_txn( txn: LoggingTransaction, table: str, keyvalues: Dict[str, Any], - retcol: Iterable[str], + retcol: str, ) -> List[Any]: sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table} From 7589a0311cf8dce98cc09cb24bbf7f1a55bfbe79 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 27 Aug 2020 10:41:33 +0100 Subject: [PATCH 34/34] Antilint again :( --- synapse/storage/database.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 53858db9a33d..98aaf13945bc 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1117,10 +1117,7 @@ def simple_select_one_onecol_txn( @staticmethod def simple_select_onecol_txn( - txn: LoggingTransaction, - table: str, - keyvalues: Dict[str, Any], - retcol: str, + txn: LoggingTransaction, table: str, keyvalues: Dict[str, Any], retcol: str, ) -> List[Any]: sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table}