From ce32956573443a2ea3084fd528abe233dee045d1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 3 Sep 2020 19:41:50 +0100 Subject: [PATCH 1/8] Add last_successful_stream_ordering to destination table Signed-off-by: Olivier Wilkinson (reivilibre) --- .../delta/58/17_catchup_last_successful.sql | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql diff --git a/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql new file mode 100644 index 000000000000..0598e8e251d3 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql @@ -0,0 +1,21 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This column tracks the stream_ordering of the event that was most recently +-- successfully transmitted to the destination. +-- A value of NULL means that we have not sent an event successfully yet +-- (at least, not since the introduction of this column). +ALTER TABLE destinations + ADD COLUMN last_successful_stream_ordering INTEGER; From 46ae4c85b4c0210a7ae43d32b029af23f4885198 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 3 Sep 2020 19:42:12 +0100 Subject: [PATCH 2/8] Add getters and setters for last_successful_stream_ordering Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/databases/main/transactions.py | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 5b31aab700f9..9fd0d2fb3d30 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -273,3 +273,40 @@ def _cleanup_transactions_txn(txn): await self.db_pool.runInteraction( "_cleanup_transactions", _cleanup_transactions_txn ) + + async def get_destination_last_successful_stream_ordering( + self, destination: str + ) -> Optional[int]: + """ + Gets the stream ordering of the PDU most-recently successfully sent + to the specified destination. + + Args: + destination: the destination we have successfully sent to + """ + return await self.db_pool.simple_select_one_onecol( + "destinations", + {"destination": destination}, + "last_successful_stream_ordering", + allow_none=True, + desc="get_last_successful_stream_ordering", + ) + + async def set_destination_last_successful_stream_ordering( + self, destination: str, last_successful_stream_ordering: int + ) -> None: + """ + Marks that we have successfully sent the PDUs up to and including the + one specified. + + Args: + destination: the destination we have successfully sent to + last_successful_stream_ordering: the stream_ordering of the most + recent successfully-sent PDU + """ + return await self.db_pool.simple_upsert( + "destinations", + keyvalues={"destination": destination}, + values={"last_successful_stream_ordering": last_successful_stream_ordering}, + desc="set_last_successful_stream_ordering", + ) From 52f56ca0e2e5e8ff7dbdc77d823cccde2ccb71c4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 3 Sep 2020 19:45:44 +0100 Subject: [PATCH 3/8] Update last_successful_stream_ordering when transmitting Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/federation/sender/per_destination_queue.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index f1534d431dcb..2280a579b1f3 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -325,6 +325,17 @@ async def _transaction_transmission_loop(self) -> None: self._last_device_stream_id = device_stream_id self._last_device_list_stream_id = dev_list_id + + if pending_pdus: + # we sent some PDUs and it was successful, so update our + # last_successful_stream_ordering in the destinations table. + final_pdu = pending_pdus[-1] + last_successful_stream_ordering = ( + final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_destination_last_successful_stream_ordering( + self._destination, last_successful_stream_ordering + ) else: break except NotRetryingDestination as e: From 3eb290cd7d33685d7579f40c10391406d635ffe9 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 3 Sep 2020 19:47:19 +0100 Subject: [PATCH 4/8] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/8247.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8247.misc diff --git a/changelog.d/8247.misc b/changelog.d/8247.misc new file mode 100644 index 000000000000..3c27803be45f --- /dev/null +++ b/changelog.d/8247.misc @@ -0,0 +1 @@ +Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage. From ca7d6bba0ea96baf8e0ca6b0cdfe33d9881b57d8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 4 Sep 2020 09:41:30 +0100 Subject: [PATCH 5/8] Backport fix in get_destination_retry_timings Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/databases/main/transactions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 9fd0d2fb3d30..f81620a7afc3 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -164,7 +164,9 @@ def _get_destination_retry_timings(self, txn, destination): allow_none=True, ) - if result and result["retry_last_ts"] > 0: + # check we have a row and retry_last_ts is not null or zero + # (retry_last_ts can't be negative) + if result and result["retry_last_ts"]: return result else: return None From 8d4f0ad83d2af8f8c4ab7f9c2a0fe7941acf30a8 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Fri, 4 Sep 2020 14:39:40 +0100 Subject: [PATCH 6/8] Update synapse/storage/databases/main/transactions.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/databases/main/transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 27d608d1f936..0b7b0eed12c8 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -342,7 +342,7 @@ async def get_destination_last_successful_stream_ordering( to the specified destination. Args: - destination: the destination we have successfully sent to + destination: the destination to query """ return await self.db_pool.simple_select_one_onecol( "destinations", From e5c50cf2d481ac92af9beced16904c73621179f0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 4 Sep 2020 14:36:29 +0100 Subject: [PATCH 7/8] In light of #8255, use BIGINTs for stream_orderings --- .../main/schema/delta/58/17_catchup_last_successful.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql index 0598e8e251d3..a67aa5e500ae 100644 --- a/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql +++ b/synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql @@ -18,4 +18,4 @@ -- A value of NULL means that we have not sent an event successfully yet -- (at least, not since the introduction of this column). ALTER TABLE destinations - ADD COLUMN last_successful_stream_ordering INTEGER; + ADD COLUMN last_successful_stream_ordering BIGINT; From 818c02b9282df75429829a7206efa638aed6e386 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 4 Sep 2020 14:40:12 +0100 Subject: [PATCH 8/8] Clarify what None means --- synapse/storage/databases/main/transactions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 0b7b0eed12c8..c0a958252e5e 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -339,7 +339,8 @@ async def get_destination_last_successful_stream_ordering( ) -> Optional[int]: """ Gets the stream ordering of the PDU most-recently successfully sent - to the specified destination. + to the specified destination, or None if this information has not been + tracked yet. Args: destination: the destination to query