From 07c7ddda03433fc920b412da66a90b603bdcd3dc Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 24 Nov 2022 19:08:48 +0000 Subject: [PATCH 1/7] Add tables for un-partial-stated events stream --- .../73/22_un_partial_stated_event_stream.sql | 34 +++++++++++++++++++ ...artial_stated_room_stream_seq.sql.postgres | 20 +++++++++++ 2 files changed, 54 insertions(+) create mode 100644 synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql create mode 100644 synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres diff --git a/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql new file mode 100644 index 000000000000..0e571f78c30f --- /dev/null +++ b/synapse/storage/schema/main/delta/73/22_un_partial_stated_event_stream.sql @@ -0,0 +1,34 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Stream for notifying that an event has become un-partial-stated. +CREATE TABLE un_partial_stated_event_stream( + -- Position in the stream + stream_id BIGINT PRIMARY KEY NOT NULL, + + -- Which instance wrote this entry. + instance_name TEXT NOT NULL, + + -- Which event has been un-partial-stated. + event_id TEXT NOT NULL REFERENCES events(event_id) ON DELETE CASCADE, + + -- true iff the `rejected` status of the event changed when it became + -- un-partial-stated. + rejection_status_changed BOOLEAN NOT NULL +); + +-- We want an index here because of the foreign key constraint: +-- upon deleting an event, the database needs to be able to check here. +CREATE UNIQUE INDEX un_partial_stated_event_stream_room_id ON un_partial_stated_event_stream (event_id); diff --git a/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres new file mode 100644 index 000000000000..1ec24702f39a --- /dev/null +++ b/synapse/storage/schema/main/delta/73/23_un_partial_stated_room_stream_seq.sql.postgres @@ -0,0 +1,20 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS un_partial_stated_event_stream_sequence; + +SELECT setval('un_partial_stated_event_stream_sequence', ( + SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_event_stream +)); From 19765306d75a244b4bc80aaabbabfa384b67ce85 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 24 Nov 2022 19:15:08 +0000 Subject: [PATCH 2/7] Add an ID generator for un-partial-stated events stream --- .../storage/databases/main/events_worker.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 01e935edef53..c6bbe60d4ddb 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -70,6 +70,7 @@ from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, AbstractStreamIdTracker, MultiWriterIdGenerator, StreamIdGenerator, @@ -291,6 +292,32 @@ def get_chain_id_txn(txn: Cursor) -> int: id_column="chain_id", ) + self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator + + if isinstance(database.engine, PostgresEngine): + self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="un_partial_stated_event_stream", + instance_name=hs.get_instance_name(), + tables=[ + ("un_partial_stated_event_stream", "instance_name", "stream_id") + ], + sequence_name="un_partial_stated_event_stream_sequence", + # TODO(faster_joins, multiple writers) Support multiple writers. + writers=["master"], + ) + else: + self._un_partial_stated_events_stream_id_gen = StreamIdGenerator( + db_conn, "un_partial_stated_event_stream", "stream_id" + ) + + def get_un_partial_stated_events_token(self) -> int: + # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple + # writers because workers that don't write often will hold all + # readers up. + return self._un_partial_stated_events_stream_id_gen.get_current_token() + def process_replication_rows( self, stream_name: str, From 4f48193bd35b70e14d5b0e507eae85e1d8d2f2be Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 24 Nov 2022 19:19:04 +0000 Subject: [PATCH 3/7] Add update fetch txn for unPS-event stream --- .../storage/databases/main/events_worker.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c6bbe60d4ddb..cdeaaf141008 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -318,6 +318,67 @@ def get_un_partial_stated_events_token(self) -> int: # readers up. return self._un_partial_stated_events_stream_id_gen.get_current_token() + async def get_un_partial_stated_events_from_stream( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]: + """Get updates for the un-partial-stated events replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + if last_id == current_id: + return [], current_id, False + + def get_un_partial_stated_events_from_stream_txn( + txn: LoggingTransaction, + ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]: + sql = """ + SELECT stream_id, event_id, rejection_status_changed + FROM un_partial_stated_event_stream + WHERE ? < stream_id AND stream_id <= ? AND instance_name = ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, instance_name, limit)) + updates = [ + ( + row[0], + ( + row[1], + bool(row[2]), + ), + ) + for row in txn + ] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db_pool.runInteraction( + "get_un_partial_stated_events_from_stream", + get_un_partial_stated_events_from_stream_txn, + ) + def process_replication_rows( self, stream_name: str, From 1d31fa160ee705e90740703fa3d5cc2ea6154913 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 24 Nov 2022 19:20:41 +0000 Subject: [PATCH 4/7] Add un-partial-stated events stream --- synapse/replication/tcp/streams/__init__.py | 7 ++++- .../replication/tcp/streams/partial_state.py | 28 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 8575666d9ce1..110f10aab9a5 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -42,7 +42,10 @@ ) from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.federation import FederationStream -from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream +from synapse.replication.tcp.streams.partial_state import ( + UnPartialStatedEventStream, + UnPartialStatedRoomStream, +) STREAMS_MAP = { stream.NAME: stream @@ -63,6 +66,7 @@ AccountDataStream, UserSignatureStream, UnPartialStatedRoomStream, + UnPartialStatedEventStream, ) } @@ -83,4 +87,5 @@ "AccountDataStream", "UserSignatureStream", "UnPartialStatedRoomStream", + "UnPartialStatedEventStream", ] diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py index 18f087ffa251..b5a2ae74b685 100644 --- a/synapse/replication/tcp/streams/partial_state.py +++ b/synapse/replication/tcp/streams/partial_state.py @@ -46,3 +46,31 @@ def __init__(self, hs: "HomeServer"): current_token_without_instance(store.get_un_partial_stated_rooms_token), store.get_un_partial_stated_rooms_from_stream, ) + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class UnPartialStatedEventStreamRow: + # ID of the event that has been un-partial-stated. + event_id: str + + # True iff the rejection status of the event changed as a result of being + # un-partial-stated. + rejection_status_changed: bool + + +class UnPartialStatedEventStream(Stream): + """ + Stream to notify about events becoming un-partial-stated. + """ + + NAME = "un_partial_stated_event" + ROW_TYPE = UnPartialStatedEventStreamRow + + def __init__(self, hs: "HomeServer"): + store = hs.get_datastores().main + super().__init__( + hs.get_instance_name(), + # TODO(faster_joins, multiple writers): we need to account for instance names + current_token_without_instance(store.get_un_partial_stated_events_token), + store.get_un_partial_stated_events_from_stream, + ) From 6ed52536e586c4dbf1527a3f25231848d8103c25 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 24 Nov 2022 19:48:18 +0000 Subject: [PATCH 5/7] Write to the un-partial-stated events stream when the event is un-partial-stated --- synapse/storage/databases/main/state.py | 34 ++++++++++++++++++------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index af7bebee8030..ba16f2dfdf98 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -80,6 +80,7 @@ def __init__( hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self._instance_name: str = hs.get_instance_name() async def get_room_version(self, room_id: str) -> RoomVersion: """Get the room_version of a given room @@ -404,18 +405,21 @@ async def update_state_for_partial_state_event( context: EventContext, ) -> None: """Update the state group for a partial state event""" - await self.db_pool.runInteraction( - "update_state_for_partial_state_event", - self._update_state_for_partial_state_event_txn, - event, - context, - ) + async with self._un_partial_stated_events_stream_id_gen.get_next() as un_partial_state_event_stream_id: + await self.db_pool.runInteraction( + "update_state_for_partial_state_event", + self._update_state_for_partial_state_event_txn, + event, + context, + un_partial_state_event_stream_id, + ) def _update_state_for_partial_state_event_txn( self, txn: LoggingTransaction, event: EventBase, context: EventContext, + un_partial_state_event_stream_id: int, ) -> None: # we shouldn't have any outliers here assert not event.internal_metadata.is_outlier() @@ -436,7 +440,10 @@ def _update_state_for_partial_state_event_txn( # the event may now be rejected where it was not before, or vice versa, # in which case we need to update the rejected flags. - if bool(context.rejected) != (event.rejected_reason is not None): + rejection_status_changed = bool(context.rejected) != ( + event.rejected_reason is not None + ) + if rejection_status_changed: self.mark_event_rejected_txn(txn, event.event_id, context.rejected) self.db_pool.simple_delete_one_txn( @@ -445,8 +452,6 @@ def _update_state_for_partial_state_event_txn( keyvalues={"event_id": event.event_id}, ) - # TODO(faster_joins): need to do something about workers here - # https://github.com/matrix-org/synapse/issues/12994 txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,)) txn.call_after( self._get_state_group_for_event.prefill, @@ -454,6 +459,17 @@ def _update_state_for_partial_state_event_txn( state_group, ) + self.db_pool.simple_insert_txn( + txn, + "un_partial_stated_event_stream", + { + "stream_id": un_partial_state_event_stream_id, + "instance_name": self._instance_name, + "event_id": event.event_id, + "rejection_status_changed": rejection_status_changed, + }, + ) + class MainStateBackgroundUpdateStore(RoomMemberWorkerStore): From b62e941ef5eb2b758115d979bfea70580c9bcd7a Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 24 Nov 2022 19:53:49 +0000 Subject: [PATCH 6/7] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/14545.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14545.misc diff --git a/changelog.d/14545.misc b/changelog.d/14545.misc new file mode 100644 index 000000000000..60b6761a51b3 --- /dev/null +++ b/changelog.d/14545.misc @@ -0,0 +1 @@ +Faster remote room joins: stream the un-partial-stating of events over replication. \ No newline at end of file From b5410012c0c6284a69ae04566442dc77933281bb Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 13 Dec 2022 14:44:16 +0000 Subject: [PATCH 7/7] Add call to notify_replication --- synapse/handlers/federation_event.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index f7223b03c364..8e6da3dafae3 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -609,6 +609,8 @@ async def update_state_for_partial_state_event( self._state_storage_controller.notify_event_un_partial_stated( event.event_id ) + # Notify that there's a new row in the un_partial_stated_events stream. + self._notifier.notify_replication() @trace async def backfill(