From e0d7fab784b31a72fe5215ababbff325782ccef2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 Aug 2022 20:13:28 -0500 Subject: [PATCH 01/27] Keep track when we tried to backfill an event --- .../delta/72/04event_backfill_access_time.sql | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql diff --git a/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql b/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql new file mode 100644 index 000000000000..bd936e467851 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql @@ -0,0 +1,26 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Add a table that keeps track of when we last tried to backfill an event. This +-- allows us to be more intelligent when we decide to retry (we don't need to +-- fail over and over) and we can process that event in the background so we +-- don't block on it each time. +CREATE TABLE IF NOT EXISTS event_backfill_access_time( + event_id TEXT NOT NULL, + ts BIGINT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS event_backfill_access_time_event_id ON event_backfill_access_time(event_id); From b8d55d38d823ac040a6179091f345edaba38075e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 24 Aug 2022 22:56:50 -0500 Subject: [PATCH 02/27] Record in some fail spots --- synapse/handlers/federation_event.py | 5 +++++ .../storage/databases/main/event_federation.py | 16 ++++++++++++++++ .../delta/72/04event_backfill_access_time.sql | 7 ++++--- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 32326975a187..9309e4ae9b0b 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -852,6 +852,8 @@ async def _process_pulled_event( self._sanity_check_event(event) except SynapseError as err: logger.warning("Event %s failed sanity check: %s", event_id, err) + if backfilled: + await self._store.record_event_backfill_attempt(event_id) return try: @@ -887,6 +889,9 @@ async def _process_pulled_event( backfilled=backfilled, ) except FederationError as e: + if backfilled: + await self._store.record_event_backfill_attempt(event_id) + if e.code == 403: logger.warning("Pulled event %s failed history check.", event_id) else: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index c836078da683..3c007a0ee9a1 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1292,6 +1292,22 @@ def _get_backfill_events( return event_id_results + @trace + async def record_event_backfill_attempt(self, event_id: str, room_id: str) -> None: + await self.db_pool.simple_upsert( + table="event_backfill_attempts", + keyvalues={"event_id": event_id}, + values={ + "event_id": event_id, + # TODO: This needs to increment + "num_attempts": 1, + "last_attempt_ts": self._clock.time_msec(), + }, + insertion_values={}, + desc="insert_event_backfill_attempt", + lock=False, + ) + async def get_missing_events( self, room_id: str, diff --git a/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql b/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql index bd936e467851..71224c2b48c0 100644 --- a/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql +++ b/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql @@ -18,9 +18,10 @@ -- allows us to be more intelligent when we decide to retry (we don't need to -- fail over and over) and we can process that event in the background so we -- don't block on it each time. -CREATE TABLE IF NOT EXISTS event_backfill_access_time( +CREATE TABLE IF NOT EXISTS event_backfill_attempts( event_id TEXT NOT NULL, - ts BIGINT NOT NULL + num_attempts INT NOT NULL, + last_attempt_ts BIGINT NOT NULL ); -CREATE UNIQUE INDEX IF NOT EXISTS event_backfill_access_time_event_id ON event_backfill_access_time(event_id); +CREATE UNIQUE INDEX IF NOT EXISTS event_backfill_attempts_event_id ON event_backfill_attempts(event_id); From bec26e23fe922835ac76b265a060015946008869 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 00:04:59 -0500 Subject: [PATCH 03/27] Record and clear attempts --- synapse/federation/federation_base.py | 7 +++++ .../databases/main/event_federation.py | 4 +-- synapse/storage/databases/main/events.py | 28 +++++++++++++------ 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 4269a98db2db..0ee222239939 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -129,6 +129,13 @@ async def _check_sigs_and_hash( "event_id": pdu.event_id, } ) + + # TODO: Is it okay to assume this is called from backfilling? + # + # In any case, we definately don't want to keep fetching spam over + # and over and failing it. + await self._store.record_event_backfill_attempt(pdu.event_id) + # we redact (to save disk space) as well as soft-failing (to stop # using the event in prev_events). redacted_event = prune_event(pdu) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 3c007a0ee9a1..627577dbafc5 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1293,7 +1293,7 @@ def _get_backfill_events( return event_id_results @trace - async def record_event_backfill_attempt(self, event_id: str, room_id: str) -> None: + async def record_event_backfill_attempt(self, event_id: str) -> None: await self.db_pool.simple_upsert( table="event_backfill_attempts", keyvalues={"event_id": event_id}, @@ -1304,7 +1304,7 @@ async def record_event_backfill_attempt(self, event_id: str, room_id: str) -> No "last_attempt_ts": self._clock.time_msec(), }, insertion_values={}, - desc="insert_event_backfill_attempt", + desc="record_event_backfill_attempt", lock=False, ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a4010ee28dca..75254106a1ee 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2435,17 +2435,27 @@ def _update_backward_extremeties( "DELETE FROM event_backward_extremities" " WHERE event_id = ? AND room_id = ?" ) + backward_extremities_to_remove = [ + (ev.event_id, ev.room_id) + for ev in events + if not ev.internal_metadata.is_outlier() + # If we encountered an event with no prev_events, then we might + # as well remove it now because it won't ever have anything else + # to backfill from. + or len(ev.prev_event_ids()) == 0 + ] txn.execute_batch( query, - [ - (ev.event_id, ev.room_id) - for ev in events - if not ev.internal_metadata.is_outlier() - # If we encountered an event with no prev_events, then we might - # as well remove it now because it won't ever have anything else - # to backfill from. - or len(ev.prev_event_ids()) == 0 - ], + backward_extremities_to_remove, + ) + + # Since we no longer need it as a backward extremity, it won't be + # backfilled again so we no longer need to store the backfill attempts + # around it. + query = "DELETE FROM event_backfill_attempts" " WHERE event_id = ?" + txn.execute_batch( + query, + backward_extremities_to_remove, ) From fee37c3a1c18fa8c50394916972e41e74ea12b8e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 00:13:33 -0500 Subject: [PATCH 04/27] Add changelog --- changelog.d/13589.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13589.feature diff --git a/changelog.d/13589.feature b/changelog.d/13589.feature new file mode 100644 index 000000000000..78fa1ddb5202 --- /dev/null +++ b/changelog.d/13589.feature @@ -0,0 +1 @@ +Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future. From d1290be6ade3dd095259595a46056351c01db18c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 00:26:57 -0500 Subject: [PATCH 05/27] Remove from when spam checker fails See https://github.com/matrix-org/synapse/pull/13589#discussion_r954515215 --- synapse/federation/federation_base.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 0ee222239939..6326024f2072 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -130,12 +130,6 @@ async def _check_sigs_and_hash( } ) - # TODO: Is it okay to assume this is called from backfilling? - # - # In any case, we definately don't want to keep fetching spam over - # and over and failing it. - await self._store.record_event_backfill_attempt(pdu.event_id) - # we redact (to save disk space) as well as soft-failing (to stop # using the event in prev_events). redacted_event = prune_event(pdu) From f9119d0a9ea8f011babe7f6851a4033d3294f018 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 17:05:28 -0500 Subject: [PATCH 06/27] Custom upsert to increment See https://github.com/matrix-org/synapse/pull/13589#discussion_r954506714 --- synapse/handlers/pagination.py | 3 + .../databases/main/event_federation.py | 79 ++++++++++++++++--- synapse/storage/schema/__init__.py | 2 +- .../01event_backfill_access_time.sql} | 0 4 files changed, 73 insertions(+), 11 deletions(-) rename synapse/storage/schema/main/delta/{72/04event_backfill_access_time.sql => 73/01event_backfill_access_time.sql} (100%) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 74e944bce72d..b1f4db5584bc 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -589,6 +589,9 @@ async def get_messages( state, time_now, config=serialize_options ) + # TODO: Remove (just for testing) + await self.store.record_event_backfill_attempt(chunk["chunk"][0]["event_id"]) + return chunk async def _shutdown_and_purge_room( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 627577dbafc5..ab9cbe4e5578 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1294,20 +1294,79 @@ def _get_backfill_events( @trace async def record_event_backfill_attempt(self, event_id: str) -> None: - await self.db_pool.simple_upsert( + if self.database_engine.can_native_upsert: + await self.db_pool.runInteraction( + "record_event_backfill_attempt", + self._record_event_backfill_attempt_upsert_native_txn, + event_id, + db_autocommit=True, # Safe as its a single upsert + ) + else: + await self.db_pool.runInteraction( + "record_event_backfill_attempt", + self._record_event_backfill_attempt_upsert_emulated_txn, + event_id, + ) + + def _record_event_backfill_attempt_upsert_native_txn( + self, + txn: LoggingTransaction, + event_id: str, + ) -> None: + assert self.database_engine.can_native_upsert + + sql = """ + INSERT INTO event_backfill_attempts ( + event_id, num_attempts, last_attempt_ts + ) + VALUES (?, ?, ?) + ON CONFLICT (event_id) DO UPDATE SET + event_id=EXCLUDED.event_id, + num_attempts=event_backfill_attempts.num_attempts + 1, + last_attempt_ts=EXCLUDED.last_attempt_ts; + """ + + txn.execute( + sql, (event_id, 1, self._clock.time_msec()) # type: ignore[attr-defined] + ) + + def _record_event_backfill_attempt_upsert_emulated_txn( + self, + txn: LoggingTransaction, + event_id: str, + ) -> None: + self.database_engine.lock_table(txn, "event_backfill_attempts") + + prev_row = self.db_pool.simple_select_one_txn( + txn, table="event_backfill_attempts", keyvalues={"event_id": event_id}, - values={ - "event_id": event_id, - # TODO: This needs to increment - "num_attempts": 1, - "last_attempt_ts": self._clock.time_msec(), - }, - insertion_values={}, - desc="record_event_backfill_attempt", - lock=False, + retcols=("num_attempts"), + allow_none=True, ) + if not prev_row: + self.db_pool.simple_insert_txn( + txn, + table="event_backfill_attempts", + values={ + "event_id": event_id, + "num_attempts": 1, + "last_attempt_ts": self._clock.time_msec(), + }, + ) + else: + self.db_pool.simple_update_one_txn( + txn, + table="event_backfill_attempts", + keyvalues={"event_id": event_id}, + updatevalues={ + "event_id": event_id, + "num_attempts": prev_row["num_attempts"] + 1, + "last_attempt_ts": self._clock.time_msec(), + }, + ) + async def get_missing_events( self, room_id: str, diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index a9a88c8bfd8b..4bc8d280019f 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 72 # remember to update the list below when updating +SCHEMA_VERSION = 73 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the diff --git a/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql b/synapse/storage/schema/main/delta/73/01event_backfill_access_time.sql similarity index 100% rename from synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql rename to synapse/storage/schema/main/delta/73/01event_backfill_access_time.sql From f5c6fe7ef934ddef211afcc3b6b15fd3b4a39561 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 17:10:59 -0500 Subject: [PATCH 07/27] Fix lints --- synapse/handlers/pagination.py | 3 --- synapse/storage/databases/main/event_federation.py | 4 +--- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index b1f4db5584bc..74e944bce72d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -589,9 +589,6 @@ async def get_messages( state, time_now, config=serialize_options ) - # TODO: Remove (just for testing) - await self.store.record_event_backfill_attempt(chunk["chunk"][0]["event_id"]) - return chunk async def _shutdown_and_purge_room( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ab9cbe4e5578..6e93a5af3921 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1326,9 +1326,7 @@ def _record_event_backfill_attempt_upsert_native_txn( last_attempt_ts=EXCLUDED.last_attempt_ts; """ - txn.execute( - sql, (event_id, 1, self._clock.time_msec()) # type: ignore[attr-defined] - ) + txn.execute(sql, (event_id, 1, self._clock.time_msec())) def _record_event_backfill_attempt_upsert_emulated_txn( self, From 16ebec01f03a6a3c701a171fb0c48cb590e23973 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 17:15:57 -0500 Subject: [PATCH 08/27] Remove extra whitespace --- synapse/federation/federation_base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 6326024f2072..4269a98db2db 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -129,7 +129,6 @@ async def _check_sigs_and_hash( "event_id": pdu.event_id, } ) - # we redact (to save disk space) as well as soft-failing (to stop # using the event in prev_events). redacted_event = prune_event(pdu) From ce07aa1aaa9182ff2cecd774b88f7ee980f314f1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 17:48:38 -0500 Subject: [PATCH 09/27] Move to correct folder --- .../04event_backfill_access_time.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename synapse/storage/schema/main/delta/{73/01event_backfill_access_time.sql => 72/04event_backfill_access_time.sql} (100%) diff --git a/synapse/storage/schema/main/delta/73/01event_backfill_access_time.sql b/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql similarity index 100% rename from synapse/storage/schema/main/delta/73/01event_backfill_access_time.sql rename to synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql From 5811ba135e3297f99f53b919ea948084b9f75afc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 17:48:53 -0500 Subject: [PATCH 10/27] Set the version back --- synapse/storage/schema/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 4bc8d280019f..a9a88c8bfd8b 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 73 # remember to update the list below when updating +SCHEMA_VERSION = 72 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the From cf2b0937fed0b9bd59124f9e110f51a5c8c96078 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 17:56:42 -0500 Subject: [PATCH 11/27] Fix `TypeError: not all arguments converted during string formatting` --- synapse/storage/databases/main/events.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 75254106a1ee..005ec1b22c00 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2435,7 +2435,7 @@ def _update_backward_extremeties( "DELETE FROM event_backward_extremities" " WHERE event_id = ? AND room_id = ?" ) - backward_extremities_to_remove = [ + backward_extremity_tuples_to_remove = [ (ev.event_id, ev.room_id) for ev in events if not ev.internal_metadata.is_outlier() @@ -2446,16 +2446,23 @@ def _update_backward_extremeties( ] txn.execute_batch( query, - backward_extremities_to_remove, + backward_extremity_tuples_to_remove, ) # Since we no longer need it as a backward extremity, it won't be # backfilled again so we no longer need to store the backfill attempts # around it. - query = "DELETE FROM event_backfill_attempts" " WHERE event_id = ?" + query = """ + DELETE FROM event_backfill_attempts + WHERE event_id = ? + """ + backward_extremity_event_ids_to_remove = [ + (extremity_tuple[0],) + for extremity_tuple in backward_extremity_tuples_to_remove + ] txn.execute_batch( query, - backward_extremities_to_remove, + backward_extremity_event_ids_to_remove, ) From cbb4194da8fd327989b0d5b6e5bbfabe6272ad4c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 19:34:01 -0500 Subject: [PATCH 12/27] Add test to make sure failed backfill attempts are recorded --- tests/handlers/test_federation_event.py | 156 ++++++++++++++++++++++-- 1 file changed, 149 insertions(+), 7 deletions(-) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 51c8dd649822..3da00e8ea201 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -12,8 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. from unittest import mock +from typing import ( + List, +) -from synapse.events import make_event_from_dict +from synapse.events import EventBase, make_event_from_dict from synapse.events.snapshot import EventContext from synapse.federation.transport.client import StateRequestResponse from synapse.logging.context import LoggingContext @@ -51,7 +54,16 @@ def test_process_pulled_event_with_missing_state(self) -> None: We check that the pulled event is correctly persisted, and that the state is as we expect. """ - return self._test_process_pulled_event_with_missing_state(False) + results = self._test_process_pulled_event_with_missing_state( + prev_exists_as_outlier=False, + ) + + self._assert_pulled_event(results.get("pulled_event")) + self._assert_state_at_pulled_event( + pulled_event=results.get("pulled_event"), + state_at_pulled_event=results.get("state_at_pulled_event"), + prev_exists_as_outlier=False, + ) def test_process_pulled_event_with_missing_state_where_prev_is_outlier( self, @@ -63,14 +75,127 @@ def test_process_pulled_event_with_missing_state_where_prev_is_outlier( but in this case we already have the prev_event (as an outlier, obviously - if it were a regular event, we wouldn't need to request the state). """ - return self._test_process_pulled_event_with_missing_state(True) + results = self._test_process_pulled_event_with_missing_state( + prev_exists_as_outlier=True, + ) + + self._assert_pulled_event(results.get("pulled_event")) + self._assert_state_at_pulled_event( + pulled_event=results.get("pulled_event"), + state_at_pulled_event=results.get("state_at_pulled_event"), + prev_exists_as_outlier=True, + ) + + def test_process_pulled_event_records_failed_backfill_attempts( + self, + ) -> None: + """ """ + OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" + main_store = self.hs.get_datastores().main + + # Create the room + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + room_version = self.get_success(main_store.get_room_version(room_id)) + + # We expect an outbound request to /state_ids, so stub that out + self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable( + { + # Mimic the other server not knowing about the state at all. + # We want to cause Synapse to throw an error (`Unable to get + # missing prev_event $fake_prev_event`) and fail to backfill + # the pulled event. + "pdu_ids": [], + "auth_chain_ids": [], + } + ) + # We also expect an outbound request to /state + self.mock_federation_transport_client.get_room_state.return_value = make_awaitable( + StateRequestResponse( + # Mimic the other server not knowing about the state at all. + # We want to cause Synapse to throw an error (`Unable to get + # missing prev_event $fake_prev_event`) and fail to backfill + # the pulled event. + auth_events=[], + state=[], + ) + ) + + pulled_event = make_event_from_dict( + self.add_hashes_and_signatures_from_other_server( + { + "type": "test_regular_type", + "room_id": room_id, + "sender": OTHER_USER, + "prev_events": [ + # The fake prev event will make the pulled event fail + # the history check (`Unable to get missing prev_event + # $fake_prev_event`) + "$fake_prev_event" + ], + "auth_events": [], + "origin_server_ts": 1, + "depth": 12, + "content": {"body": "pulled"}, + } + ), + room_version, + ) + + # The function under test: try to process the pulled event + with LoggingContext("test"): + self.get_success( + self.hs.get_federation_event_handler()._process_pulled_event( + self.OTHER_SERVER_NAME, pulled_event, backfilled=True + ) + ) + + # Make sure our backfill attempt was recorded + backfill_num_attempts = self.get_success( + main_store.db_pool.simple_select_one_onecol( + table="event_backfill_attempts", + keyvalues={"event_id": pulled_event.event_id}, + retcol="num_attempts", + ) + ) + self.assertEqual(backfill_num_attempts, 1) + + # The function under test: try to process the pulled event again + with LoggingContext("test"): + self.get_success( + self.hs.get_federation_event_handler()._process_pulled_event( + self.OTHER_SERVER_NAME, pulled_event, backfilled=True + ) + ) + + # Make sure our second backfill attempt was recorded + backfill_num_attempts = self.get_success( + main_store.db_pool.simple_select_one_onecol( + table="event_backfill_attempts", + keyvalues={"event_id": pulled_event.event_id}, + retcol="num_attempts", + ) + ) + self.assertEqual(backfill_num_attempts, 2) + + # And as a sanity check, make sure the event was not persisted through all of this. + persisted = self.get_success( + main_store.get_event(pulled_event.event_id, allow_none=True) + ) + self.assertIsNone( + persisted, + "pulled event with invalid signature should not be persisted at all", + ) def _test_process_pulled_event_with_missing_state( - self, prev_exists_as_outlier: bool + self, + *, + backfilled: bool = False, + prev_exists_as_outlier: bool = False, ) -> None: OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" main_store = self.hs.get_datastores().main - state_storage_controller = self.hs.get_storage_controllers().state # create the room user_id = self.register_user("kermit", "test") @@ -205,10 +330,18 @@ async def get_event(destination: str, event_id: str, timeout=None): with LoggingContext("test"): self.get_success( self.hs.get_federation_event_handler()._process_pulled_event( - self.OTHER_SERVER_NAME, pulled_event, backfilled=False + self.OTHER_SERVER_NAME, pulled_event, backfilled=backfilled ) ) + return { + "pulled_event": pulled_event, + "state_at_pulled_event": state_at_prev_event, + } + + def _assert_pulled_event(self, pulled_event: EventBase) -> None: + main_store = self.hs.get_datastores().main + # check that the event is correctly persisted persisted = self.get_success(main_store.get_event(pulled_event.event_id)) self.assertIsNotNone(persisted, "pulled event was not persisted at all") @@ -216,12 +349,21 @@ async def get_event(destination: str, event_id: str, timeout=None): persisted.internal_metadata.is_outlier(), "pulled event was an outlier" ) + def _assert_state_at_pulled_event( + self, + *, + pulled_event: EventBase, + state_at_pulled_event: List[EventBase], + prev_exists_as_outlier: bool, + ) -> None: + state_storage_controller = self.hs.get_storage_controllers().state + # check that the state at that event is as expected state = self.get_success( state_storage_controller.get_state_ids_for_event(pulled_event.event_id) ) expected_state = { - (e.type, e.state_key): e.event_id for e in state_at_prev_event + (e.type, e.state_key): e.event_id for e in state_at_pulled_event } self.assertEqual(state, expected_state) From 621c5d39c86185829012f6d2e2b4072221c7dd4f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 19:40:44 -0500 Subject: [PATCH 13/27] Clean up test --- tests/handlers/test_federation_event.py | 272 +++++++++++------------- 1 file changed, 123 insertions(+), 149 deletions(-) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 3da00e8ea201..beda5457dc35 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -12,11 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. from unittest import mock -from typing import ( - List, -) -from synapse.events import EventBase, make_event_from_dict +from synapse.events import make_event_from_dict from synapse.events.snapshot import EventContext from synapse.federation.transport.client import StateRequestResponse from synapse.logging.context import LoggingContext @@ -54,16 +51,7 @@ def test_process_pulled_event_with_missing_state(self) -> None: We check that the pulled event is correctly persisted, and that the state is as we expect. """ - results = self._test_process_pulled_event_with_missing_state( - prev_exists_as_outlier=False, - ) - - self._assert_pulled_event(results.get("pulled_event")) - self._assert_state_at_pulled_event( - pulled_event=results.get("pulled_event"), - state_at_pulled_event=results.get("state_at_pulled_event"), - prev_exists_as_outlier=False, - ) + return self._test_process_pulled_event_with_missing_state(False) def test_process_pulled_event_with_missing_state_where_prev_is_outlier( self, @@ -75,127 +63,14 @@ def test_process_pulled_event_with_missing_state_where_prev_is_outlier( but in this case we already have the prev_event (as an outlier, obviously - if it were a regular event, we wouldn't need to request the state). """ - results = self._test_process_pulled_event_with_missing_state( - prev_exists_as_outlier=True, - ) - - self._assert_pulled_event(results.get("pulled_event")) - self._assert_state_at_pulled_event( - pulled_event=results.get("pulled_event"), - state_at_pulled_event=results.get("state_at_pulled_event"), - prev_exists_as_outlier=True, - ) - - def test_process_pulled_event_records_failed_backfill_attempts( - self, - ) -> None: - """ """ - OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" - main_store = self.hs.get_datastores().main - - # Create the room - user_id = self.register_user("kermit", "test") - tok = self.login("kermit", "test") - room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) - room_version = self.get_success(main_store.get_room_version(room_id)) - - # We expect an outbound request to /state_ids, so stub that out - self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable( - { - # Mimic the other server not knowing about the state at all. - # We want to cause Synapse to throw an error (`Unable to get - # missing prev_event $fake_prev_event`) and fail to backfill - # the pulled event. - "pdu_ids": [], - "auth_chain_ids": [], - } - ) - # We also expect an outbound request to /state - self.mock_federation_transport_client.get_room_state.return_value = make_awaitable( - StateRequestResponse( - # Mimic the other server not knowing about the state at all. - # We want to cause Synapse to throw an error (`Unable to get - # missing prev_event $fake_prev_event`) and fail to backfill - # the pulled event. - auth_events=[], - state=[], - ) - ) - - pulled_event = make_event_from_dict( - self.add_hashes_and_signatures_from_other_server( - { - "type": "test_regular_type", - "room_id": room_id, - "sender": OTHER_USER, - "prev_events": [ - # The fake prev event will make the pulled event fail - # the history check (`Unable to get missing prev_event - # $fake_prev_event`) - "$fake_prev_event" - ], - "auth_events": [], - "origin_server_ts": 1, - "depth": 12, - "content": {"body": "pulled"}, - } - ), - room_version, - ) - - # The function under test: try to process the pulled event - with LoggingContext("test"): - self.get_success( - self.hs.get_federation_event_handler()._process_pulled_event( - self.OTHER_SERVER_NAME, pulled_event, backfilled=True - ) - ) - - # Make sure our backfill attempt was recorded - backfill_num_attempts = self.get_success( - main_store.db_pool.simple_select_one_onecol( - table="event_backfill_attempts", - keyvalues={"event_id": pulled_event.event_id}, - retcol="num_attempts", - ) - ) - self.assertEqual(backfill_num_attempts, 1) - - # The function under test: try to process the pulled event again - with LoggingContext("test"): - self.get_success( - self.hs.get_federation_event_handler()._process_pulled_event( - self.OTHER_SERVER_NAME, pulled_event, backfilled=True - ) - ) - - # Make sure our second backfill attempt was recorded - backfill_num_attempts = self.get_success( - main_store.db_pool.simple_select_one_onecol( - table="event_backfill_attempts", - keyvalues={"event_id": pulled_event.event_id}, - retcol="num_attempts", - ) - ) - self.assertEqual(backfill_num_attempts, 2) - - # And as a sanity check, make sure the event was not persisted through all of this. - persisted = self.get_success( - main_store.get_event(pulled_event.event_id, allow_none=True) - ) - self.assertIsNone( - persisted, - "pulled event with invalid signature should not be persisted at all", - ) + return self._test_process_pulled_event_with_missing_state(True) def _test_process_pulled_event_with_missing_state( - self, - *, - backfilled: bool = False, - prev_exists_as_outlier: bool = False, + self, prev_exists_as_outlier: bool ) -> None: OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" main_store = self.hs.get_datastores().main + state_storage_controller = self.hs.get_storage_controllers().state # create the room user_id = self.register_user("kermit", "test") @@ -330,18 +205,10 @@ async def get_event(destination: str, event_id: str, timeout=None): with LoggingContext("test"): self.get_success( self.hs.get_federation_event_handler()._process_pulled_event( - self.OTHER_SERVER_NAME, pulled_event, backfilled=backfilled + self.OTHER_SERVER_NAME, pulled_event, backfilled=False ) ) - return { - "pulled_event": pulled_event, - "state_at_pulled_event": state_at_prev_event, - } - - def _assert_pulled_event(self, pulled_event: EventBase) -> None: - main_store = self.hs.get_datastores().main - # check that the event is correctly persisted persisted = self.get_success(main_store.get_event(pulled_event.event_id)) self.assertIsNotNone(persisted, "pulled event was not persisted at all") @@ -349,23 +216,130 @@ def _assert_pulled_event(self, pulled_event: EventBase) -> None: persisted.internal_metadata.is_outlier(), "pulled event was an outlier" ) - def _assert_state_at_pulled_event( - self, - *, - pulled_event: EventBase, - state_at_pulled_event: List[EventBase], - prev_exists_as_outlier: bool, - ) -> None: - state_storage_controller = self.hs.get_storage_controllers().state - # check that the state at that event is as expected state = self.get_success( state_storage_controller.get_state_ids_for_event(pulled_event.event_id) ) expected_state = { - (e.type, e.state_key): e.event_id for e in state_at_pulled_event + (e.type, e.state_key): e.event_id for e in state_at_prev_event } self.assertEqual(state, expected_state) if prev_exists_as_outlier: self.mock_federation_transport_client.get_event.assert_not_called() + + def test_process_pulled_event_records_failed_backfill_attempts( + self, + ) -> None: + """ + Test to make sure that failed backfill attempts for an event are + recorded in the `event_backfill_attempts` table. + + In this test, we pretend we are processing a "pulled" event (eg, via + backfill). The pulled event has a fake `prev_event` which our server has + obviously never seen before so it attempts to request the state at that + `prev_event` which expectedly fails because it's a fake event. Because + the server can't fetch the state at the missing `prev_event`, the + "pulled" event fails the history check and is fails to process. + + We check that we correctly record the number of failed backfill attempts + to the pulled event and as a sanity check, that the "pulled" event isn't + persisted. expect. + """ + OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" + main_store = self.hs.get_datastores().main + + # Create the room + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + room_version = self.get_success(main_store.get_room_version(room_id)) + + # We expect an outbound request to /state_ids, so stub that out + self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable( + { + # Mimic the other server not knowing about the state at all. + # We want to cause Synapse to throw an error (`Unable to get + # missing prev_event $fake_prev_event`) and fail to backfill + # the pulled event. + "pdu_ids": [], + "auth_chain_ids": [], + } + ) + # We also expect an outbound request to /state + self.mock_federation_transport_client.get_room_state.return_value = make_awaitable( + StateRequestResponse( + # Mimic the other server not knowing about the state at all. + # We want to cause Synapse to throw an error (`Unable to get + # missing prev_event $fake_prev_event`) and fail to backfill + # the pulled event. + auth_events=[], + state=[], + ) + ) + + pulled_event = make_event_from_dict( + self.add_hashes_and_signatures_from_other_server( + { + "type": "test_regular_type", + "room_id": room_id, + "sender": OTHER_USER, + "prev_events": [ + # The fake prev event will make the pulled event fail + # the history check (`Unable to get missing prev_event + # $fake_prev_event`) + "$fake_prev_event" + ], + "auth_events": [], + "origin_server_ts": 1, + "depth": 12, + "content": {"body": "pulled"}, + } + ), + room_version, + ) + + # The function under test: try to process the pulled event + with LoggingContext("test"): + self.get_success( + self.hs.get_federation_event_handler()._process_pulled_event( + self.OTHER_SERVER_NAME, pulled_event, backfilled=True + ) + ) + + # Make sure our backfill attempt was recorded + backfill_num_attempts = self.get_success( + main_store.db_pool.simple_select_one_onecol( + table="event_backfill_attempts", + keyvalues={"event_id": pulled_event.event_id}, + retcol="num_attempts", + ) + ) + self.assertEqual(backfill_num_attempts, 1) + + # The function under test: try to process the pulled event again + with LoggingContext("test"): + self.get_success( + self.hs.get_federation_event_handler()._process_pulled_event( + self.OTHER_SERVER_NAME, pulled_event, backfilled=True + ) + ) + + # Make sure our second backfill attempt was recorded + backfill_num_attempts = self.get_success( + main_store.db_pool.simple_select_one_onecol( + table="event_backfill_attempts", + keyvalues={"event_id": pulled_event.event_id}, + retcol="num_attempts", + ) + ) + self.assertEqual(backfill_num_attempts, 2) + + # And as a sanity check, make sure the event was not persisted through all of this. + persisted = self.get_success( + main_store.get_event(pulled_event.event_id, allow_none=True) + ) + self.assertIsNone( + persisted, + "pulled event with invalid signature should not be persisted at all", + ) From 75c07bbb5f116800fca164a9ff68bf3c1a94d000 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 19:42:23 -0500 Subject: [PATCH 14/27] Fix comments --- tests/handlers/test_federation_event.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index beda5457dc35..6e1f80a10c51 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -325,7 +325,7 @@ def test_process_pulled_event_records_failed_backfill_attempts( ) ) - # Make sure our second backfill attempt was recorded + # Make sure our second backfill attempt was recorded (`num_attempts` was incremented) backfill_num_attempts = self.get_success( main_store.db_pool.simple_select_one_onecol( table="event_backfill_attempts", @@ -341,5 +341,5 @@ def test_process_pulled_event_records_failed_backfill_attempts( ) self.assertIsNone( persisted, - "pulled event with invalid signature should not be persisted at all", + "pulled event that fails the history check should not be persisted at all", ) From 783cce5271f490f7d27255a6a09520030e653096 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 20:08:46 -0500 Subject: [PATCH 15/27] Add test for clearing backfill attempts --- tests/handlers/test_federation_event.py | 108 +++++++++++++++++++++++- 1 file changed, 106 insertions(+), 2 deletions(-) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 6e1f80a10c51..6461efbe4536 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -235,8 +235,8 @@ def test_process_pulled_event_records_failed_backfill_attempts( Test to make sure that failed backfill attempts for an event are recorded in the `event_backfill_attempts` table. - In this test, we pretend we are processing a "pulled" event (eg, via - backfill). The pulled event has a fake `prev_event` which our server has + In this test, we pretend we are processing a "pulled" event via + backfill. The pulled event has a fake `prev_event` which our server has obviously never seen before so it attempts to request the state at that `prev_event` which expectedly fails because it's a fake event. Because the server can't fetch the state at the missing `prev_event`, the @@ -343,3 +343,107 @@ def test_process_pulled_event_records_failed_backfill_attempts( persisted, "pulled event that fails the history check should not be persisted at all", ) + + def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted( + self, + ) -> None: + """ + Test to make sure that failed backfill attempts + (`event_backfill_attempts` table) for an event are cleared after the + event is successfully persisted. + + In this test, we pretend we are processing a "pulled" event via + backfill. The pulled event succesfully processes and the backward + extremeties are updated along with clearing out any backfill attempts + for those old extremities. + + We check that we correctly cleared failed backfill attempts of the + pulled event. + """ + OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" + main_store = self.hs.get_datastores().main + + # Create the room + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + room_version = self.get_success(main_store.get_room_version(room_id)) + + # allow the remote user to send state events + self.helper.send_state( + room_id, + "m.room.power_levels", + {"events_default": 0, "state_default": 0}, + tok=tok, + ) + + # add the remote user to the room + member_event = self.get_success( + event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join") + ) + + initial_state_map = self.get_success( + main_store.get_partial_current_state_ids(room_id) + ) + + auth_event_ids = [ + initial_state_map[("m.room.create", "")], + initial_state_map[("m.room.power_levels", "")], + member_event.event_id, + ] + + pulled_event = make_event_from_dict( + self.add_hashes_and_signatures_from_other_server( + { + "type": "test_regular_type", + "room_id": room_id, + "sender": OTHER_USER, + "prev_events": [member_event.event_id], + "auth_events": auth_event_ids, + "origin_server_ts": 1, + "depth": 12, + "content": {"body": "pulled"}, + } + ), + room_version, + ) + + # Fake the "pulled" event failing to backfill once so we can test + # if it's cleared out later on. + self.get_success( + main_store.record_event_backfill_attempt(pulled_event.event_id) + ) + # Make sure we have a backfill attempt recorded for the pulled event + backfill_num_attempts = self.get_success( + main_store.db_pool.simple_select_one_onecol( + table="event_backfill_attempts", + keyvalues={"event_id": pulled_event.event_id}, + retcol="num_attempts", + ) + ) + self.assertEqual(backfill_num_attempts, 1) + + # The function under test: try to process the pulled event + with LoggingContext("test"): + self.get_success( + self.hs.get_federation_event_handler()._process_pulled_event( + self.OTHER_SERVER_NAME, pulled_event, backfilled=True + ) + ) + + # Make sure the backfill attempt for the pulled event are cleared + backfill_num_attempts = self.get_success( + main_store.db_pool.simple_select_one_onecol( + table="event_backfill_attempts", + keyvalues={"event_id": pulled_event.event_id}, + retcol="num_attempts", + allow_none=True, + ) + ) + self.assertIsNone(backfill_num_attempts) + + # And as a sanity check, make sure the "pulled" event was persisted. + persisted = self.get_success( + main_store.get_event(pulled_event.event_id, allow_none=True) + ) + self.assertIsNotNone(persisted, "pulled event was not persisted at all") From 54ef84bd5c27ab2f36bedfd6e5413a5363ad9e40 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 25 Aug 2022 20:11:31 -0500 Subject: [PATCH 16/27] Maybe a better comment --- synapse/storage/databases/main/events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 005ec1b22c00..4ca326cad8ed 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2449,9 +2449,9 @@ def _update_backward_extremeties( backward_extremity_tuples_to_remove, ) - # Since we no longer need it as a backward extremity, it won't be - # backfilled again so we no longer need to store the backfill attempts - # around it. + # Since we no longer need these backward extremities, it also means that + # they won't be backfilled from again so we no longer need to store the + # backfill attempts around it. query = """ DELETE FROM event_backfill_attempts WHERE event_id = ? From e4192d73e794380b08744922d55ac2d114c01d71 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Aug 2022 14:35:28 -0500 Subject: [PATCH 17/27] Update table name with "failed" and include room_id in the primary key See: - https://github.com/matrix-org/synapse/pull/13589#discussion_r956531173 - https://github.com/matrix-org/synapse/pull/13589#discussion_r959836863 - https://github.com/matrix-org/synapse/pull/13589#discussion_r959836134 - https://github.com/matrix-org/synapse/pull/13589#discussion_r959838812 --- .../databases/main/event_federation.py | 33 ++++++++++--------- synapse/storage/databases/main/events.py | 17 ++++------ .../delta/72/04event_backfill_access_time.sql | 10 +++--- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 6e93a5af3921..b314bf557b4f 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1293,11 +1293,12 @@ def _get_backfill_events( return event_id_results @trace - async def record_event_backfill_attempt(self, event_id: str) -> None: + async def record_event_backfill_attempt(self, room_id: str, event_id: str) -> None: if self.database_engine.can_native_upsert: await self.db_pool.runInteraction( "record_event_backfill_attempt", self._record_event_backfill_attempt_upsert_native_txn, + room_id, event_id, db_autocommit=True, # Safe as its a single upsert ) @@ -1305,40 +1306,42 @@ async def record_event_backfill_attempt(self, event_id: str) -> None: await self.db_pool.runInteraction( "record_event_backfill_attempt", self._record_event_backfill_attempt_upsert_emulated_txn, + room_id, event_id, ) def _record_event_backfill_attempt_upsert_native_txn( self, txn: LoggingTransaction, + room_id: str, event_id: str, ) -> None: assert self.database_engine.can_native_upsert sql = """ - INSERT INTO event_backfill_attempts ( - event_id, num_attempts, last_attempt_ts + INSERT INTO event_failed_backfill_attempts ( + room_id, event_id, num_attempts, last_attempt_ts ) - VALUES (?, ?, ?) - ON CONFLICT (event_id) DO UPDATE SET - event_id=EXCLUDED.event_id, - num_attempts=event_backfill_attempts.num_attempts + 1, + VALUES (?, ?, ?, ?) + ON CONFLICT (room_id, event_id) DO UPDATE SET + num_attempts=event_failed_backfill_attempts.num_attempts + 1, last_attempt_ts=EXCLUDED.last_attempt_ts; """ - txn.execute(sql, (event_id, 1, self._clock.time_msec())) + txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec())) def _record_event_backfill_attempt_upsert_emulated_txn( self, txn: LoggingTransaction, + room_id: str, event_id: str, ) -> None: - self.database_engine.lock_table(txn, "event_backfill_attempts") + self.database_engine.lock_table(txn, "event_failed_backfill_attempts") prev_row = self.db_pool.simple_select_one_txn( txn, - table="event_backfill_attempts", - keyvalues={"event_id": event_id}, + table="event_failed_backfill_attempts", + keyvalues={"room_id": room_id, "event_id": event_id}, retcols=("num_attempts"), allow_none=True, ) @@ -1346,8 +1349,9 @@ def _record_event_backfill_attempt_upsert_emulated_txn( if not prev_row: self.db_pool.simple_insert_txn( txn, - table="event_backfill_attempts", + table="event_failed_backfill_attempts", values={ + "room_id": room_id, "event_id": event_id, "num_attempts": 1, "last_attempt_ts": self._clock.time_msec(), @@ -1356,10 +1360,9 @@ def _record_event_backfill_attempt_upsert_emulated_txn( else: self.db_pool.simple_update_one_txn( txn, - table="event_backfill_attempts", - keyvalues={"event_id": event_id}, + table="event_failed_backfill_attempts", + keyvalues={"room_id": room_id, "event_id": event_id}, updatevalues={ - "event_id": event_id, "num_attempts": prev_row["num_attempts"] + 1, "last_attempt_ts": self._clock.time_msec(), }, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 4ca326cad8ed..881819fc5774 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2449,20 +2449,17 @@ def _update_backward_extremeties( backward_extremity_tuples_to_remove, ) - # Since we no longer need these backward extremities, it also means that - # they won't be backfilled from again so we no longer need to store the - # backfill attempts around it. + # Clear out the failed backfill attempts after we successfully pulled + # the event. Since we no longer need these events as backward + # extremities, it also means that they won't be backfilled from again so + # we no longer need to store the backfill attempts around it. query = """ - DELETE FROM event_backfill_attempts - WHERE event_id = ? + DELETE FROM event_failed_backfill_attempts + WHERE event_id = ? and room_id = ? """ - backward_extremity_event_ids_to_remove = [ - (extremity_tuple[0],) - for extremity_tuple in backward_extremity_tuples_to_remove - ] txn.execute_batch( query, - backward_extremity_event_ids_to_remove, + backward_extremity_tuples_to_remove, ) diff --git a/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql b/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql index 71224c2b48c0..fbd366809e7e 100644 --- a/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql +++ b/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql @@ -14,14 +14,14 @@ */ --- Add a table that keeps track of when we last tried to backfill an event. This +-- Add a table that keeps track of when we failed to backfill an event. This -- allows us to be more intelligent when we decide to retry (we don't need to -- fail over and over) and we can process that event in the background so we -- don't block on it each time. -CREATE TABLE IF NOT EXISTS event_backfill_attempts( +CREATE TABLE IF NOT EXISTS event_failed_backfill_attempts( + room_id TEXT NOT NULL, event_id TEXT NOT NULL, num_attempts INT NOT NULL, - last_attempt_ts BIGINT NOT NULL + last_attempt_ts BIGINT NOT NULL, + PRIMARY KEY (room_id, event_id) ); - -CREATE UNIQUE INDEX IF NOT EXISTS event_backfill_attempts_event_id ON event_backfill_attempts(event_id); From 7a449326afd72a639dc122bddb5e4d042d6b5158 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 31 Aug 2022 14:45:51 -0500 Subject: [PATCH 18/27] Rename to record_event_failed_backfill_attempt See https://github.com/matrix-org/synapse/pull/13589#discussion_r959838055 --- synapse/handlers/federation_event.py | 8 ++++-- .../databases/main/event_federation.py | 27 ++++++++++++++----- ...l => 04event_failed_backfill_attempts.sql} | 0 tests/handlers/test_federation_event.py | 16 ++++++----- 4 files changed, 35 insertions(+), 16 deletions(-) rename synapse/storage/schema/main/delta/72/{04event_backfill_access_time.sql => 04event_failed_backfill_attempts.sql} (100%) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 695dde417dfc..60dd024cb154 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -863,7 +863,9 @@ async def _process_pulled_event( except SynapseError as err: logger.warning("Event %s failed sanity check: %s", event_id, err) if backfilled: - await self._store.record_event_backfill_attempt(event_id) + await self._store.record_event_failed_backfill_attempt( + event.room_id, event_id + ) return try: @@ -900,7 +902,9 @@ async def _process_pulled_event( ) except FederationError as e: if backfilled: - await self._store.record_event_backfill_attempt(event_id) + await self._store.record_event_failed_backfill_attempt( + event.room_id, event_id + ) if e.code == 403: logger.warning("Pulled event %s failed history check.", event_id) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index b314bf557b4f..e4fd9b1a0c56 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1293,24 +1293,37 @@ def _get_backfill_events( return event_id_results @trace - async def record_event_backfill_attempt(self, room_id: str, event_id: str) -> None: + async def record_event_failed_backfill_attempt( + self, room_id: str, event_id: str + ) -> None: + """ + Record when we fail to backfill an event. + + This information allows us to be more intelligent when we decide to + retry (we don't need to fail over and over) and we can process that + event in the background so we don't block on it each time. + + Args: + room_id: The room where the event failed to backfill from + event_id: The event that failed to be backfilled + """ if self.database_engine.can_native_upsert: await self.db_pool.runInteraction( - "record_event_backfill_attempt", - self._record_event_backfill_attempt_upsert_native_txn, + "record_event_failed_backfill_attempt", + self._record_event_failed_backfill_attempt_upsert_native_txn, room_id, event_id, db_autocommit=True, # Safe as its a single upsert ) else: await self.db_pool.runInteraction( - "record_event_backfill_attempt", - self._record_event_backfill_attempt_upsert_emulated_txn, + "record_event_failed_backfill_attempt", + self._record_event_failed_backfill_attempt_upsert_emulated_txn, room_id, event_id, ) - def _record_event_backfill_attempt_upsert_native_txn( + def _record_event_failed_backfill_attempt_upsert_native_txn( self, txn: LoggingTransaction, room_id: str, @@ -1330,7 +1343,7 @@ def _record_event_backfill_attempt_upsert_native_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec())) - def _record_event_backfill_attempt_upsert_emulated_txn( + def _record_event_failed_backfill_attempt_upsert_emulated_txn( self, txn: LoggingTransaction, room_id: str, diff --git a/synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql b/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql similarity index 100% rename from synapse/storage/schema/main/delta/72/04event_backfill_access_time.sql rename to synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 6461efbe4536..102129025b2f 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -233,7 +233,7 @@ def test_process_pulled_event_records_failed_backfill_attempts( ) -> None: """ Test to make sure that failed backfill attempts for an event are - recorded in the `event_backfill_attempts` table. + recorded in the `event_failed_backfill_attempts` table. In this test, we pretend we are processing a "pulled" event via backfill. The pulled event has a fake `prev_event` which our server has @@ -310,7 +310,7 @@ def test_process_pulled_event_records_failed_backfill_attempts( # Make sure our backfill attempt was recorded backfill_num_attempts = self.get_success( main_store.db_pool.simple_select_one_onecol( - table="event_backfill_attempts", + table="event_failed_backfill_attempts", keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", ) @@ -328,7 +328,7 @@ def test_process_pulled_event_records_failed_backfill_attempts( # Make sure our second backfill attempt was recorded (`num_attempts` was incremented) backfill_num_attempts = self.get_success( main_store.db_pool.simple_select_one_onecol( - table="event_backfill_attempts", + table="event_failed_backfill_attempts", keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", ) @@ -349,7 +349,7 @@ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_ ) -> None: """ Test to make sure that failed backfill attempts - (`event_backfill_attempts` table) for an event are cleared after the + (`event_failed_backfill_attempts` table) for an event are cleared after the event is successfully persisted. In this test, we pretend we are processing a "pulled" event via @@ -411,12 +411,14 @@ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_ # Fake the "pulled" event failing to backfill once so we can test # if it's cleared out later on. self.get_success( - main_store.record_event_backfill_attempt(pulled_event.event_id) + main_store.record_event_failed_backfill_attempt( + pulled_event.room_id, pulled_event.event_id + ) ) # Make sure we have a backfill attempt recorded for the pulled event backfill_num_attempts = self.get_success( main_store.db_pool.simple_select_one_onecol( - table="event_backfill_attempts", + table="event_failed_backfill_attempts", keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", ) @@ -434,7 +436,7 @@ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_ # Make sure the backfill attempt for the pulled event are cleared backfill_num_attempts = self.get_success( main_store.db_pool.simple_select_one_onecol( - table="event_backfill_attempts", + table="event_failed_backfill_attempts", keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", allow_none=True, From 1464101c468484cee27882698a5eb32e1d4a5fc0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 1 Sep 2022 14:56:58 -0500 Subject: [PATCH 19/27] Add _unsafe_to_upsert_tables check See https://github.com/matrix-org/synapse/pull/13589#discussion_r959840527 --- synapse/storage/databases/main/event_federation.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index e4fd9b1a0c56..95e58480e4fd 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1307,7 +1307,11 @@ async def record_event_failed_backfill_attempt( room_id: The room where the event failed to backfill from event_id: The event that failed to be backfilled """ - if self.database_engine.can_native_upsert: + if ( + self.database_engine.can_native_upsert + and "event_failed_backfill_attempts" + not in self.db_pool._unsafe_to_upsert_tables + ): await self.db_pool.runInteraction( "record_event_failed_backfill_attempt", self._record_event_failed_backfill_attempt_upsert_native_txn, From 71c77382f7181e989e1bc51ab1134979884cb9e1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 1 Sep 2022 15:20:09 -0500 Subject: [PATCH 20/27] Add foreign key references See https://github.com/matrix-org/synapse/pull/13589#discussion_r960568864 --- .../schema/main/delta/72/04event_failed_backfill_attempts.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql b/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql index fbd366809e7e..f39fff0d60c7 100644 --- a/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql +++ b/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql @@ -19,8 +19,8 @@ -- fail over and over) and we can process that event in the background so we -- don't block on it each time. CREATE TABLE IF NOT EXISTS event_failed_backfill_attempts( - room_id TEXT NOT NULL, - event_id TEXT NOT NULL, + room_id TEXT NOT NULL REFERENCES rooms (room_id), + event_id TEXT NOT NULL REFERENCES events (event_id), num_attempts INT NOT NULL, last_attempt_ts BIGINT NOT NULL, PRIMARY KEY (room_id, event_id) From d45b0783557eb3b0f60393cbb6b9d9fd6769f1e3 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 1 Sep 2022 16:11:49 -0500 Subject: [PATCH 21/27] Remove reference to event that won't be in the events table See https://github.com/matrix-org/synapse/pull/13589#discussion_r961104120 --- .../schema/main/delta/72/04event_failed_backfill_attempts.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql b/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql index f39fff0d60c7..e3c557540cdc 100644 --- a/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql +++ b/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql @@ -20,7 +20,7 @@ -- don't block on it each time. CREATE TABLE IF NOT EXISTS event_failed_backfill_attempts( room_id TEXT NOT NULL REFERENCES rooms (room_id), - event_id TEXT NOT NULL REFERENCES events (event_id), + event_id TEXT NOT NULL, num_attempts INT NOT NULL, last_attempt_ts BIGINT NOT NULL, PRIMARY KEY (room_id, event_id) From 63bec99dff683e68ae38837ef4bee837a90e1b22 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 9 Sep 2022 15:32:41 -0500 Subject: [PATCH 22/27] Remove emulated upsert code (all of our dbs no support it) We now specify a SQLite version that supports upserts natively so we no longer need the emulated version, https://github.com/matrix-org/synapse/pull/13760 See: - https://github.com/matrix-org/synapse/pull/13589#discussion_r961101746 - https://github.com/matrix-org/synapse/pull/13589#discussion_r961102072 --- .../databases/main/event_federation.py | 66 +++---------------- 1 file changed, 8 insertions(+), 58 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 3324a894e2c8..885ff189bcad 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1309,27 +1309,15 @@ async def record_event_failed_backfill_attempt( room_id: The room where the event failed to backfill from event_id: The event that failed to be backfilled """ - if ( - self.database_engine.can_native_upsert - and "event_failed_backfill_attempts" - not in self.db_pool._unsafe_to_upsert_tables - ): - await self.db_pool.runInteraction( - "record_event_failed_backfill_attempt", - self._record_event_failed_backfill_attempt_upsert_native_txn, - room_id, - event_id, - db_autocommit=True, # Safe as its a single upsert - ) - else: - await self.db_pool.runInteraction( - "record_event_failed_backfill_attempt", - self._record_event_failed_backfill_attempt_upsert_emulated_txn, - room_id, - event_id, - ) + await self.db_pool.runInteraction( + "record_event_failed_backfill_attempt", + self._record_event_failed_backfill_attempt_upsert_txn, + room_id, + event_id, + db_autocommit=True, # Safe as it's a single upsert + ) - def _record_event_failed_backfill_attempt_upsert_native_txn( + def _record_event_failed_backfill_attempt_upsert_txn( self, txn: LoggingTransaction, room_id: str, @@ -1349,44 +1337,6 @@ def _record_event_failed_backfill_attempt_upsert_native_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec())) - def _record_event_failed_backfill_attempt_upsert_emulated_txn( - self, - txn: LoggingTransaction, - room_id: str, - event_id: str, - ) -> None: - self.database_engine.lock_table(txn, "event_failed_backfill_attempts") - - prev_row = self.db_pool.simple_select_one_txn( - txn, - table="event_failed_backfill_attempts", - keyvalues={"room_id": room_id, "event_id": event_id}, - retcols=("num_attempts"), - allow_none=True, - ) - - if not prev_row: - self.db_pool.simple_insert_txn( - txn, - table="event_failed_backfill_attempts", - values={ - "room_id": room_id, - "event_id": event_id, - "num_attempts": 1, - "last_attempt_ts": self._clock.time_msec(), - }, - ) - else: - self.db_pool.simple_update_one_txn( - txn, - table="event_failed_backfill_attempts", - keyvalues={"room_id": room_id, "event_id": event_id}, - updatevalues={ - "num_attempts": prev_row["num_attempts"] + 1, - "last_attempt_ts": self._clock.time_msec(), - }, - ) - async def get_missing_events( self, room_id: str, From 31d7502dba2f2365557c7a0b7647ace6e926a004 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 9 Sep 2022 15:51:51 -0500 Subject: [PATCH 23/27] Table rename `event_failed_pull_attempts` See https://github.com/matrix-org/synapse/pull/13589#discussion_r967238724 --- synapse/handlers/federation_event.py | 4 +-- .../databases/main/event_federation.py | 20 +++++------ synapse/storage/databases/main/events.py | 2 +- ...s.sql => 07event_failed_pull_attempts.sql} | 11 +++--- tests/handlers/test_federation_event.py | 34 +++++++++---------- 5 files changed, 35 insertions(+), 36 deletions(-) rename synapse/storage/schema/main/delta/72/{04event_failed_backfill_attempts.sql => 07event_failed_pull_attempts.sql} (67%) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 60dd024cb154..cffbf68432c1 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -863,7 +863,7 @@ async def _process_pulled_event( except SynapseError as err: logger.warning("Event %s failed sanity check: %s", event_id, err) if backfilled: - await self._store.record_event_failed_backfill_attempt( + await self._store.record_event_failed_pull_attempt( event.room_id, event_id ) return @@ -902,7 +902,7 @@ async def _process_pulled_event( ) except FederationError as e: if backfilled: - await self._store.record_event_failed_backfill_attempt( + await self._store.record_event_failed_pull_attempt( event.room_id, event_id ) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 885ff189bcad..7459f5d9841f 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1295,43 +1295,41 @@ def _get_backfill_events( return event_id_results @trace - async def record_event_failed_backfill_attempt( + async def record_event_failed_pull_attempt( self, room_id: str, event_id: str ) -> None: """ - Record when we fail to backfill an event. + Record when we fail to pull an event over federation. This information allows us to be more intelligent when we decide to retry (we don't need to fail over and over) and we can process that event in the background so we don't block on it each time. Args: - room_id: The room where the event failed to backfill from - event_id: The event that failed to be backfilled + room_id: The room where the event failed to pull from + event_id: The event that failed to be fetched or processed """ await self.db_pool.runInteraction( - "record_event_failed_backfill_attempt", - self._record_event_failed_backfill_attempt_upsert_txn, + "record_event_failed_pull_attempt", + self._record_event_failed_pull_attempt_upsert_txn, room_id, event_id, db_autocommit=True, # Safe as it's a single upsert ) - def _record_event_failed_backfill_attempt_upsert_txn( + def _record_event_failed_pull_attempt_upsert_txn( self, txn: LoggingTransaction, room_id: str, event_id: str, ) -> None: - assert self.database_engine.can_native_upsert - sql = """ - INSERT INTO event_failed_backfill_attempts ( + INSERT INTO event_failed_pull_attempts ( room_id, event_id, num_attempts, last_attempt_ts ) VALUES (?, ?, ?, ?) ON CONFLICT (room_id, event_id) DO UPDATE SET - num_attempts=event_failed_backfill_attempts.num_attempts + 1, + num_attempts=event_failed_pull_attempts.num_attempts + 1, last_attempt_ts=EXCLUDED.last_attempt_ts; """ diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 881819fc5774..6fc2fbcc53fe 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2454,7 +2454,7 @@ def _update_backward_extremeties( # extremities, it also means that they won't be backfilled from again so # we no longer need to store the backfill attempts around it. query = """ - DELETE FROM event_failed_backfill_attempts + DELETE FROM event_failed_pull_attempts WHERE event_id = ? and room_id = ? """ txn.execute_batch( diff --git a/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql b/synapse/storage/schema/main/delta/72/07event_failed_pull_attempts.sql similarity index 67% rename from synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql rename to synapse/storage/schema/main/delta/72/07event_failed_pull_attempts.sql index e3c557540cdc..9ed9e38e6dca 100644 --- a/synapse/storage/schema/main/delta/72/04event_failed_backfill_attempts.sql +++ b/synapse/storage/schema/main/delta/72/07event_failed_pull_attempts.sql @@ -14,11 +14,12 @@ */ --- Add a table that keeps track of when we failed to backfill an event. This --- allows us to be more intelligent when we decide to retry (we don't need to --- fail over and over) and we can process that event in the background so we --- don't block on it each time. -CREATE TABLE IF NOT EXISTS event_failed_backfill_attempts( +-- Add a table that keeps track of when we failed to pull an event over +-- federation (via /backfill, `/event`, `/get_missing_events`, etc). This allows +-- us to be more intelligent when we decide to retry (we don't need to fail over +-- and over) and we can process that event in the background so we don't block +-- on it each time. +CREATE TABLE IF NOT EXISTS event_failed_pull_attempts( room_id TEXT NOT NULL REFERENCES rooms (room_id), event_id TEXT NOT NULL, num_attempts INT NOT NULL, diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 102129025b2f..ad33144ade34 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -233,7 +233,7 @@ def test_process_pulled_event_records_failed_backfill_attempts( ) -> None: """ Test to make sure that failed backfill attempts for an event are - recorded in the `event_failed_backfill_attempts` table. + recorded in the `event_failed_pull_attempts` table. In this test, we pretend we are processing a "pulled" event via backfill. The pulled event has a fake `prev_event` which our server has @@ -242,9 +242,9 @@ def test_process_pulled_event_records_failed_backfill_attempts( the server can't fetch the state at the missing `prev_event`, the "pulled" event fails the history check and is fails to process. - We check that we correctly record the number of failed backfill attempts - to the pulled event and as a sanity check, that the "pulled" event isn't - persisted. expect. + We check that we correctly record the number of failed pull attempts + of the pulled event and as a sanity check, that the "pulled" event isn't + persisted. """ OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" main_store = self.hs.get_datastores().main @@ -307,10 +307,10 @@ def test_process_pulled_event_records_failed_backfill_attempts( ) ) - # Make sure our backfill attempt was recorded + # Make sure our failed pull attempt was recorded backfill_num_attempts = self.get_success( main_store.db_pool.simple_select_one_onecol( - table="event_failed_backfill_attempts", + table="event_failed_pull_attempts", keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", ) @@ -325,10 +325,10 @@ def test_process_pulled_event_records_failed_backfill_attempts( ) ) - # Make sure our second backfill attempt was recorded (`num_attempts` was incremented) + # Make sure our second failed pull attempt was recorded (`num_attempts` was incremented) backfill_num_attempts = self.get_success( main_store.db_pool.simple_select_one_onecol( - table="event_failed_backfill_attempts", + table="event_failed_pull_attempts", keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", ) @@ -348,16 +348,16 @@ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_ self, ) -> None: """ - Test to make sure that failed backfill attempts - (`event_failed_backfill_attempts` table) for an event are cleared after the + Test to make sure that failed pull attempts + (`event_failed_pull_attempts` table) for an event are cleared after the event is successfully persisted. In this test, we pretend we are processing a "pulled" event via backfill. The pulled event succesfully processes and the backward - extremeties are updated along with clearing out any backfill attempts + extremeties are updated along with clearing out any failed pull attempts for those old extremities. - We check that we correctly cleared failed backfill attempts of the + We check that we correctly cleared failed pull attempts of the pulled event. """ OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" @@ -411,14 +411,14 @@ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_ # Fake the "pulled" event failing to backfill once so we can test # if it's cleared out later on. self.get_success( - main_store.record_event_failed_backfill_attempt( + main_store.record_event_failed_pull_attempt( pulled_event.room_id, pulled_event.event_id ) ) - # Make sure we have a backfill attempt recorded for the pulled event + # Make sure we have a failed pull attempt recorded for the pulled event backfill_num_attempts = self.get_success( main_store.db_pool.simple_select_one_onecol( - table="event_failed_backfill_attempts", + table="event_failed_pull_attempts", keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", ) @@ -433,10 +433,10 @@ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_ ) ) - # Make sure the backfill attempt for the pulled event are cleared + # Make sure the failed pull attempts for the pulled event are cleared backfill_num_attempts = self.get_success( main_store.db_pool.simple_select_one_onecol( - table="event_failed_backfill_attempts", + table="event_failed_pull_attempts", keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", allow_none=True, From 0b5f1db174287c7a09f1d632d2f71aec49b9fd91 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 9 Sep 2022 16:03:15 -0500 Subject: [PATCH 24/27] Add `last_cause` column See https://github.com/matrix-org/synapse/pull/13589#discussion_r967235752 --- synapse/handlers/federation_event.py | 4 ++-- synapse/storage/databases/main/event_federation.py | 14 +++++++++----- .../main/delta/72/07event_failed_pull_attempts.sql | 1 + tests/handlers/test_federation_event.py | 2 +- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index cffbf68432c1..3f5530963693 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -864,7 +864,7 @@ async def _process_pulled_event( logger.warning("Event %s failed sanity check: %s", event_id, err) if backfilled: await self._store.record_event_failed_pull_attempt( - event.room_id, event_id + event.room_id, event_id, str(err) ) return @@ -903,7 +903,7 @@ async def _process_pulled_event( except FederationError as e: if backfilled: await self._store.record_event_failed_pull_attempt( - event.room_id, event_id + event.room_id, event_id, str(e) ) if e.code == 403: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 7459f5d9841f..ef477978ed63 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1296,7 +1296,7 @@ def _get_backfill_events( @trace async def record_event_failed_pull_attempt( - self, room_id: str, event_id: str + self, room_id: str, event_id: str, cause: str ) -> None: """ Record when we fail to pull an event over federation. @@ -1308,12 +1308,14 @@ async def record_event_failed_pull_attempt( Args: room_id: The room where the event failed to pull from event_id: The event that failed to be fetched or processed + cause: The error message or reason that we failed to pull the event """ await self.db_pool.runInteraction( "record_event_failed_pull_attempt", self._record_event_failed_pull_attempt_upsert_txn, room_id, event_id, + cause, db_autocommit=True, # Safe as it's a single upsert ) @@ -1322,18 +1324,20 @@ def _record_event_failed_pull_attempt_upsert_txn( txn: LoggingTransaction, room_id: str, event_id: str, + cause: str, ) -> None: sql = """ INSERT INTO event_failed_pull_attempts ( - room_id, event_id, num_attempts, last_attempt_ts + room_id, event_id, num_attempts, last_attempt_ts, last_cause ) - VALUES (?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?) ON CONFLICT (room_id, event_id) DO UPDATE SET num_attempts=event_failed_pull_attempts.num_attempts + 1, - last_attempt_ts=EXCLUDED.last_attempt_ts; + last_attempt_ts=EXCLUDED.last_attempt_ts, + last_cause=EXCLUDED.last_cause; """ - txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec())) + txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) async def get_missing_events( self, diff --git a/synapse/storage/schema/main/delta/72/07event_failed_pull_attempts.sql b/synapse/storage/schema/main/delta/72/07event_failed_pull_attempts.sql index 9ed9e38e6dca..d397ee10826d 100644 --- a/synapse/storage/schema/main/delta/72/07event_failed_pull_attempts.sql +++ b/synapse/storage/schema/main/delta/72/07event_failed_pull_attempts.sql @@ -24,5 +24,6 @@ CREATE TABLE IF NOT EXISTS event_failed_pull_attempts( event_id TEXT NOT NULL, num_attempts INT NOT NULL, last_attempt_ts BIGINT NOT NULL, + last_cause TEXT NOT NULL, PRIMARY KEY (room_id, event_id) ); diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index ad33144ade34..b5b89405a4f2 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -412,7 +412,7 @@ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_ # if it's cleared out later on. self.get_success( main_store.record_event_failed_pull_attempt( - pulled_event.room_id, pulled_event.event_id + pulled_event.room_id, pulled_event.event_id, "fake cause" ) ) # Make sure we have a failed pull attempt recorded for the pulled event From 1347686828b019e4e704d8d4deba107acec6c07d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 13 Sep 2022 16:06:20 -0500 Subject: [PATCH 25/27] Update schema version summary --- synapse/storage/schema/__init__.py | 2 ++ ...ailed_pull_attempts.sql => 09event_failed_pull_attempts.sql} | 0 2 files changed, 2 insertions(+) rename synapse/storage/schema/main/delta/72/{07event_failed_pull_attempts.sql => 09event_failed_pull_attempts.sql} (100%) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 32cda5e3ba2b..76ec65731c12 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -77,6 +77,8 @@ - Tables related to groups are dropped. - Unused column application_services_state.last_txn is dropped - Cache invalidation stream id sequence now begins at 2 to match code expectation. + - Add table `event_failed_pull_attempts` to keep track when we fail to pull + events over federation. """ diff --git a/synapse/storage/schema/main/delta/72/07event_failed_pull_attempts.sql b/synapse/storage/schema/main/delta/72/09event_failed_pull_attempts.sql similarity index 100% rename from synapse/storage/schema/main/delta/72/07event_failed_pull_attempts.sql rename to synapse/storage/schema/main/delta/72/09event_failed_pull_attempts.sql From 57182dce9487eefb928acf2a25fd91b1ce9e83c7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Sep 2022 12:45:24 -0500 Subject: [PATCH 26/27] Remove backfilled check since we plan to go general anyway See https://github.com/matrix-org/synapse/pull/13589#discussion_r969250906 --- synapse/handlers/federation_event.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 3f5530963693..9e065e1116b5 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -862,10 +862,9 @@ async def _process_pulled_event( self._sanity_check_event(event) except SynapseError as err: logger.warning("Event %s failed sanity check: %s", event_id, err) - if backfilled: - await self._store.record_event_failed_pull_attempt( - event.room_id, event_id, str(err) - ) + await self._store.record_event_failed_pull_attempt( + event.room_id, event_id, str(err) + ) return try: @@ -901,10 +900,9 @@ async def _process_pulled_event( backfilled=backfilled, ) except FederationError as e: - if backfilled: - await self._store.record_event_failed_pull_attempt( - event.room_id, event_id, str(e) - ) + await self._store.record_event_failed_pull_attempt( + event.room_id, event_id, str(e) + ) if e.code == 403: logger.warning("Pulled event %s failed history check.", event_id) From 70019d2cffdc14ca5f486474019b22f8f479ee12 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Sep 2022 12:53:44 -0500 Subject: [PATCH 27/27] Move change to latest delta 73 --- synapse/storage/schema/__init__.py | 4 ++-- .../01event_failed_pull_attempts.sql} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename synapse/storage/schema/main/delta/{72/09event_failed_pull_attempts.sql => 73/01event_failed_pull_attempts.sql} (100%) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index a1951c4ec164..68e055c66471 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -77,12 +77,12 @@ - Tables related to groups are dropped. - Unused column application_services_state.last_txn is dropped - Cache invalidation stream id sequence now begins at 2 to match code expectation. - - Add table `event_failed_pull_attempts` to keep track when we fail to pull - events over federation. Changes in SCHEMA_VERSION = 73; - thread_id column is added to event_push_actions, event_push_actions_staging event_push_summary, receipts_linearized, and receipts_graph. + - Add table `event_failed_pull_attempts` to keep track when we fail to pull + events over federation. """ diff --git a/synapse/storage/schema/main/delta/72/09event_failed_pull_attempts.sql b/synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql similarity index 100% rename from synapse/storage/schema/main/delta/72/09event_failed_pull_attempts.sql rename to synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql