From e57023337272660369094c308b0b28d597a0b84d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 7 Sep 2022 13:31:35 -0400 Subject: [PATCH 1/8] Update event push action and receipt tables to support threads. --- changelog.d/13753.misc | 1 + .../databases/main/event_push_actions.py | 8 +++ synapse/storage/databases/main/receipts.py | 18 +++++ .../main/delta/72/06thread_notifications.sql | 27 ++++++++ .../delta/72/07thread_receipts.sql.postgres | 30 ++++++++ .../delta/72/07thread_receipts.sql.sqlite | 69 +++++++++++++++++++ .../main/delta/72/08thread_receipts.sql | 20 ++++++ 7 files changed, 173 insertions(+) create mode 100644 changelog.d/13753.misc create mode 100644 synapse/storage/schema/main/delta/72/06thread_notifications.sql create mode 100644 synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres create mode 100644 synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite create mode 100644 synapse/storage/schema/main/delta/72/08thread_receipts.sql diff --git a/changelog.d/13753.misc b/changelog.d/13753.misc new file mode 100644 index 000000000000..63de2eb9f91e --- /dev/null +++ b/changelog.d/13753.misc @@ -0,0 +1 @@ +Prepatory work for storing thread IDs for notifications and receipts. diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index f4a07de2a354..fd80ec1e4988 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -232,6 +232,14 @@ def __init__( replaces_index="event_push_summary_user_rm", ) + self.db_pool.updates.register_background_index_update( + "event_push_summary_unique_index2", + index_name="event_push_summary_unique_index2", + table="event_push_summary", + columns=["user_id", "room_id", "thread_id"], + unique=True, + ) + @cached(tree=True, max_entries=5000) async def get_unread_event_push_actions_by_room_for_user( self, diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3838409519c3..a7d2a88dfb24 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -113,6 +113,24 @@ def __init__( prefilled_cache=receipts_stream_prefill, ) + self.db_pool.updates.register_background_index_update( + "receipts_linearized_unique_index", + index_name="receipts_linearized_unique_index", + table="receipts_linearized", + columns=["room_id", "receipt_type", "user_id"], + where_clause="thread_id IS NULL", + unique=True, + ) + + self.db_pool.updates.register_background_index_update( + "receipts_graph_unique_index", + index_name="receipts_graph_unique_index", + table="receipts_graph", + columns=["room_id", "receipt_type", "user_id"], + where_clause="thread_id IS NULL", + unique=True, + ) + def get_max_receipt_stream_id(self) -> int: """Get the current max stream ID for receipts stream""" return self._receipts_id_gen.get_current_token() diff --git a/synapse/storage/schema/main/delta/72/06thread_notifications.sql b/synapse/storage/schema/main/delta/72/06thread_notifications.sql new file mode 100644 index 000000000000..0dfbcdc730ff --- /dev/null +++ b/synapse/storage/schema/main/delta/72/06thread_notifications.sql @@ -0,0 +1,27 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a nullable column for thread ID to the event push actions tables; this +-- will be filled in with a default value for any previously existing rows. +-- +-- After migration this can be made non-nullable. + +ALTER TABLE event_push_actions_staging ADD COLUMN thread_id TEXT; +ALTER TABLE event_push_actions ADD COLUMN thread_id TEXT; +ALTER TABLE event_push_summary ADD COLUMN thread_id TEXT; + +-- Update the unique index for `event_push_summary`. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7006, 'event_push_summary_unique_index2', '{}'); diff --git a/synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres new file mode 100644 index 000000000000..55fff9e278e7 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres @@ -0,0 +1,30 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a nullable column for thread ID to the receipts table; this allows a +-- receipt per user, per room, as well as an unthreaded receipt (corresponding +-- to a null thread ID). + +ALTER TABLE receipts_linearized ADD COLUMN thread_id TEXT; +ALTER TABLE receipts_graph ADD COLUMN thread_id TEXT; + +-- Rebuild the unique constraint with the thread_id. +ALTER TABLE receipts_linearized + ADD CONSTRAINT receipts_linearized_uniqueness_thread + UNIQUE (room_id, receipt_type, user_id, thread_id); + +ALTER TABLE receipts_graph + ADD CONSTRAINT receipts_graph_uniqueness_thread + UNIQUE (room_id, receipt_type, user_id, thread_id); diff --git a/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite new file mode 100644 index 000000000000..226710122f70 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite @@ -0,0 +1,69 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Allow multiple receipts per user per room via a nullable thread_id column. +-- +-- SQLite doesn't support modifying constraints to an existing table, so it must +-- be recreated. + +-- Create the new tables. +CREATE TABLE receipts_linearized_new ( + stream_id BIGINT NOT NULL, + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + thread_id TEXT, + data TEXT NOT NULL, + CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id), + CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id) +); + +CREATE TABLE receipts_graph_new ( + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_ids TEXT NOT NULL, + thread_id TEXT, + data TEXT NOT NULL, + CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id), + CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id) +); + +-- Drop the old indexes. +DROP INDEX IF EXISTS receipts_linearized_id; +DROP INDEX IF EXISTS receipts_linearized_room_stream; +DROP INDEX IF EXISTS receipts_linearized_user; + +-- Copy the data. +INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, data) + SELECT stream_id, room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized; +INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data) + SELECT room_id, receipt_type, user_id, event_ids, data + FROM receipts_graph; + +-- Drop the old tables. +DROP TABLE receipts_linearized; +DROP TABLE receipts_graph; + +-- Rename the tables. +ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized; +ALTER TABLE receipts_graph_new RENAME TO receipts_graph; + +-- Create the indices. +CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id ); +CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id ); +CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id ); diff --git a/synapse/storage/schema/main/delta/72/08thread_receipts.sql b/synapse/storage/schema/main/delta/72/08thread_receipts.sql new file mode 100644 index 000000000000..e35b021f316a --- /dev/null +++ b/synapse/storage/schema/main/delta/72/08thread_receipts.sql @@ -0,0 +1,20 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7007, 'receipts_linearized_unique_index', '{}'); + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7007, 'receipts_graph_unique_index', '{}'); From 59e2789080d0ef4da4b568d34a8c4040f2be0d5b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 7 Sep 2022 14:12:41 -0400 Subject: [PATCH 2/8] Backfill the thread_id column for event_push_actions and event_push_summary. --- .../databases/main/event_push_actions.py | 81 +++++++++++++++++++ .../main/delta/72/06thread_notifications.sql | 3 + 2 files changed, 84 insertions(+) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index fd80ec1e4988..9c6deb56b95e 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -98,6 +98,7 @@ ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore +from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -240,6 +241,86 @@ def __init__( unique=True, ) + self.db_pool.updates.register_background_update_handler( + "event_push_backfill_thread_id", + self._background_backfill_thread_id, + ) + + async def _background_backfill_thread_id( + self, progress: JsonDict, batch_size: int + ) -> int: + event_push_actions_done = progress.get("event_push_actions_done", False) + + def add_thread_id_txn( + txn: LoggingTransaction, table_name: str, start_stream_ordering: int + ) -> int: + sql = f""" + SELECT stream_ordering + FROM {table_name} + WHERE + thread_id IS NULL + AND stream_ordering > ? + ORDER BY stream_ordering + LIMIT ? + """ + txn.execute(sql, (start_stream_ordering, batch_size)) + + # No more rows to process. + rows = txn.fetchall() + if not rows: + progress[f"{table_name}_done"] = True + self.db_pool.updates._background_update_progress_txn( + txn, "event_push_backfill_thread_id", progress + ) + return 0 + + # Update the thread ID for any of those rows. + max_stream_ordering = rows[-1][0] + + sql = f""" + UPDATE {table_name} + SET thread_id = 'main' + WHERE stream_ordering <= ? AND thread_id IS NULL + """ + txn.execute(sql, (max_stream_ordering,)) + + # Update progress. + processed_rows = txn.rowcount + progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering + self.db_pool.updates._background_update_progress_txn( + txn, "event_push_backfill_thread_id", progress + ) + + return processed_rows + + # First update the event_push_actions table, then the event_push_summary table. + # + # Note that the event_push_actions_staging table is ignored since it is + # assumed that items in that table will only exist for a short period of + # time. + if not event_push_actions_done: + result = await self.db_pool.runInteraction( + "event_push_backfill_thread_id", + add_thread_id_txn, + "event_push_actions", + progress.get("max_event_push_actions_stream_ordering", 0), + ) + else: + result = await self.db_pool.runInteraction( + "event_push_backfill_thread_id", + add_thread_id_txn, + "event_push_summary", + progress.get("max_event_push_summary_stream_ordering", 0), + ) + + # Only done after the event_push_summary table is done. + if not result: + await self.db_pool.updates._end_background_update( + "event_push_backfill_thread_id" + ) + + return result + @cached(tree=True, max_entries=5000) async def get_unread_event_push_actions_by_room_for_user( self, diff --git a/synapse/storage/schema/main/delta/72/06thread_notifications.sql b/synapse/storage/schema/main/delta/72/06thread_notifications.sql index 0dfbcdc730ff..2f4f5dac7a04 100644 --- a/synapse/storage/schema/main/delta/72/06thread_notifications.sql +++ b/synapse/storage/schema/main/delta/72/06thread_notifications.sql @@ -25,3 +25,6 @@ ALTER TABLE event_push_summary ADD COLUMN thread_id TEXT; -- Update the unique index for `event_push_summary`. INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (7006, 'event_push_summary_unique_index2', '{}'); + +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + (7006, 'event_push_backfill_thread_id', '{}', 'event_push_summary_unique_index2'); From fc8e0fa6812bd4c528f6a57a42545582da65254c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 8 Sep 2022 10:40:17 -0400 Subject: [PATCH 3/8] Insert a thread_id when inserting new data. --- synapse/push/bulk_push_rule_evaluator.py | 29 ++++++++++--------- .../databases/main/event_push_actions.py | 22 ++++++++++++-- synapse/storage/databases/main/events.py | 4 +-- synapse/storage/databases/main/receipts.py | 2 ++ .../replication/slave/storage/test_events.py | 1 + 5 files changed, 39 insertions(+), 19 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index d1caf8a0f7a0..3846fbc5f042 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -198,7 +198,7 @@ async def _get_power_levels_and_sender_level( return pl_event.content if pl_event else {}, sender_level async def _get_mutual_relations( - self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]] + self, parent_id: str, rules: Iterable[Tuple[PushRule, bool]] ) -> Dict[str, Set[Tuple[str, str]]]: """ Fetch event metadata for events which related to the same event as the given event. @@ -206,7 +206,7 @@ async def _get_mutual_relations( If the given event has no relation information, returns an empty dictionary. Args: - event_id: The event ID which is targeted by relations. + parent_id: The event ID which is targeted by relations. rules: The push rules which will be processed for this event. Returns: @@ -220,12 +220,6 @@ async def _get_mutual_relations( if not self._relations_match_enabled: return {} - # If the event does not have a relation, then cannot have any mutual - # relations. - relation = relation_from_event(event) - if not relation: - return {} - # Pre-filter to figure out which relation types are interesting. rel_types = set() for rule, enabled in rules: @@ -246,9 +240,7 @@ async def _get_mutual_relations( return {} # If any valid rules were found, fetch the mutual relations. - return await self.store.get_mutual_event_relations( - relation.parent_id, rel_types - ) + return await self.store.get_mutual_event_relations(parent_id, rel_types) @measure_func("action_for_event_by_user") async def action_for_event_by_user( @@ -281,9 +273,17 @@ async def action_for_event_by_user( sender_power_level, ) = await self._get_power_levels_and_sender_level(event, context) - relations = await self._get_mutual_relations( - event, itertools.chain(*rules_by_user.values()) - ) + relation = relation_from_event(event) + # If the event does not have a relation, then cannot have any mutual + # relations or thread ID. + relations = {} + thread_id = "main" + if relation: + relations = await self._get_mutual_relations( + relation.parent_id, itertools.chain(*rules_by_user.values()) + ) + if relation.rel_type == RelationTypes.THREAD: + thread_id = relation.parent_id evaluator = PushRuleEvaluatorForEvent( event, @@ -352,6 +352,7 @@ async def action_for_event_by_user( event.event_id, actions_by_user, count_as_unread, + thread_id, ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 9c6deb56b95e..ca51d0dab34a 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -759,6 +759,7 @@ async def add_push_actions_to_staging( event_id: str, user_id_actions: Dict[str, Collection[Union[Mapping, str]]], count_as_unread: bool, + thread_id: str, ) -> None: """Add the push actions for the event to the push action staging area. @@ -767,6 +768,7 @@ async def add_push_actions_to_staging( user_id_actions: A mapping of user_id to list of push actions, where an action can either be a string or dict. count_as_unread: Whether this event should increment unread counts. + thread_id: The thread this event is parent of, if applicable. """ if not user_id_actions: return @@ -775,7 +777,7 @@ async def add_push_actions_to_staging( # can be used to insert into the `event_push_actions_staging` table. def _gen_entry( user_id: str, actions: Collection[Union[Mapping, str]] - ) -> Tuple[str, str, str, int, int, int]: + ) -> Tuple[str, str, str, int, int, int, str]: is_highlight = 1 if _action_has_highlight(actions) else 0 notif = 1 if "notify" in actions else 0 return ( @@ -785,11 +787,20 @@ def _gen_entry( notif, # notif column is_highlight, # highlight column int(count_as_unread), # unread column + thread_id, # thread_id column ) await self.db_pool.simple_insert_many( "event_push_actions_staging", - keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"), + keys=( + "event_id", + "user_id", + "actions", + "notif", + "highlight", + "unread", + "thread_id", + ), values=[ _gen_entry(user_id, actions) for user_id, actions in user_id_actions.items() @@ -1070,6 +1081,8 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: ) # Replace the previous summary with the new counts. + # + # TODO(threads): Upsert per-thread instead of setting them all to main. self.db_pool.simple_upsert_txn( txn, table="event_push_summary", @@ -1079,6 +1092,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: "unread_count": unread_count, "stream_ordering": old_rotate_stream_ordering, "last_receipt_stream_ordering": stream_ordering, + "thread_id": "main", }, ) @@ -1227,17 +1241,19 @@ def _rotate_notifs_before_txn( logger.info("Rotating notifications, handling %d rows", len(summaries)) + # TODO(threads): Update on a per-thread basis. self.db_pool.simple_upsert_many_txn( txn, table="event_push_summary", key_names=("user_id", "room_id"), key_values=[(user_id, room_id) for user_id, room_id in summaries], - value_names=("notif_count", "unread_count", "stream_ordering"), + value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"), value_values=[ ( summary.notif_count, summary.unread_count, summary.stream_ordering, + "main", ) for summary in summaries.values() ], diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a4010ee28dca..c0b4080e4b3d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2192,9 +2192,9 @@ def _set_push_actions_for_event_and_users_txn( sql = """ INSERT INTO event_push_actions ( room_id, event_id, user_id, actions, stream_ordering, - topological_ordering, notif, highlight, unread + topological_ordering, notif, highlight, unread, thread_id ) - SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread + SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id FROM event_push_actions_staging WHERE event_id = ? """ diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index a7d2a88dfb24..d1fdc387915b 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -694,6 +694,7 @@ def _insert_linearized_receipt_txn( "stream_id": stream_id, "event_id": event_id, "data": json_encoder.encode(data), + "thread_id": None, }, # receipts_linearized has a unique constraint on # (user_id, room_id, receipt_type), so no need to lock @@ -841,6 +842,7 @@ def _insert_graph_receipt_txn( values={ "event_ids": json_encoder.encode(event_ids), "data": json_encoder.encode(data), + "thread_id": None, }, # receipts_graph has a unique constraint on # (user_id, room_id, receipt_type), so no need to lock diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 531a0db2d071..49a21e2e8581 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -404,6 +404,7 @@ def build_event( event.event_id, {user_id: actions for user_id, actions in push_actions}, False, + "main", ) ) return event, context From b7a7d6d4e5b3c328a54a8e3ce189ca87bec0ead6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 14 Sep 2022 09:43:51 -0400 Subject: [PATCH 4/8] Add docstring to background update. --- synapse/storage/databases/main/event_push_actions.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index ca51d0dab34a..863308fbd861 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -249,6 +249,16 @@ def __init__( async def _background_backfill_thread_id( self, progress: JsonDict, batch_size: int ) -> int: + """ + Fill in the thread_id field for event_push_actions and event_push_summary. + + This is preparatory so that it can be made non-nullable in the future. + + Because all current (null) data is done in an unthreaded manner this + simply assumes it is on the "main" timeline. Since event_push_actions + are periodically cleared it is not possible to correctly re-calculate + the thread_id. + """ event_push_actions_done = progress.get("event_push_actions_done", False) def add_thread_id_txn( From 8bce65056a42eb652776186877ad0304ad56437a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 14 Sep 2022 10:11:31 -0400 Subject: [PATCH 5/8] Bump the schema version. --- synapse/storage/schema/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 256f745dc0e8..19d473b73fc1 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 72 # remember to update the list below when updating +SCHEMA_VERSION = 73 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -76,6 +76,10 @@ - event_edges.(room_id, is_state) are no longer written to. - Tables related to groups are dropped. - Unused column application_services_state.last_txn is dropped + +Changes in SCHEMA_VERSION = 73; + - thread_id column is added to event_push_actions, event_push_actions_staging + event_push_summary, receipts_linearized, and receipts_graph. """ From 0514093673fc7d05b274312e5c5540df6f4ca370 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 14 Sep 2022 10:23:00 -0400 Subject: [PATCH 6/8] Rename deltas from 72 -> 73. --- .../06thread_notifications.sql => 73/01thread_notifications.sql} | 0 .../02thread_receipts.sql.postgres} | 0 .../02thread_receipts.sql.sqlite} | 0 .../delta/{72/08thread_receipts.sql => 73/03thread_receipts.sql} | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename synapse/storage/schema/main/delta/{72/06thread_notifications.sql => 73/01thread_notifications.sql} (100%) rename synapse/storage/schema/main/delta/{72/07thread_receipts.sql.postgres => 73/02thread_receipts.sql.postgres} (100%) rename synapse/storage/schema/main/delta/{72/07thread_receipts.sql.sqlite => 73/02thread_receipts.sql.sqlite} (100%) rename synapse/storage/schema/main/delta/{72/08thread_receipts.sql => 73/03thread_receipts.sql} (100%) diff --git a/synapse/storage/schema/main/delta/72/06thread_notifications.sql b/synapse/storage/schema/main/delta/73/01thread_notifications.sql similarity index 100% rename from synapse/storage/schema/main/delta/72/06thread_notifications.sql rename to synapse/storage/schema/main/delta/73/01thread_notifications.sql diff --git a/synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres b/synapse/storage/schema/main/delta/73/02thread_receipts.sql.postgres similarity index 100% rename from synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres rename to synapse/storage/schema/main/delta/73/02thread_receipts.sql.postgres diff --git a/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite b/synapse/storage/schema/main/delta/73/02thread_receipts.sql.sqlite similarity index 100% rename from synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite rename to synapse/storage/schema/main/delta/73/02thread_receipts.sql.sqlite diff --git a/synapse/storage/schema/main/delta/72/08thread_receipts.sql b/synapse/storage/schema/main/delta/73/03thread_receipts.sql similarity index 100% rename from synapse/storage/schema/main/delta/72/08thread_receipts.sql rename to synapse/storage/schema/main/delta/73/03thread_receipts.sql From 8b937e527eba014a3b65d7b14ac4c1aeb71fb87f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 14 Sep 2022 10:58:53 -0400 Subject: [PATCH 7/8] Revert "Rename deltas from 72 -> 73." This reverts commit 0514093673fc7d05b274312e5c5540df6f4ca370. --- .../01thread_notifications.sql => 72/06thread_notifications.sql} | 0 .../07thread_receipts.sql.postgres} | 0 .../07thread_receipts.sql.sqlite} | 0 .../delta/{73/03thread_receipts.sql => 72/08thread_receipts.sql} | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename synapse/storage/schema/main/delta/{73/01thread_notifications.sql => 72/06thread_notifications.sql} (100%) rename synapse/storage/schema/main/delta/{73/02thread_receipts.sql.postgres => 72/07thread_receipts.sql.postgres} (100%) rename synapse/storage/schema/main/delta/{73/02thread_receipts.sql.sqlite => 72/07thread_receipts.sql.sqlite} (100%) rename synapse/storage/schema/main/delta/{73/03thread_receipts.sql => 72/08thread_receipts.sql} (100%) diff --git a/synapse/storage/schema/main/delta/73/01thread_notifications.sql b/synapse/storage/schema/main/delta/72/06thread_notifications.sql similarity index 100% rename from synapse/storage/schema/main/delta/73/01thread_notifications.sql rename to synapse/storage/schema/main/delta/72/06thread_notifications.sql diff --git a/synapse/storage/schema/main/delta/73/02thread_receipts.sql.postgres b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres similarity index 100% rename from synapse/storage/schema/main/delta/73/02thread_receipts.sql.postgres rename to synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres diff --git a/synapse/storage/schema/main/delta/73/02thread_receipts.sql.sqlite b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite similarity index 100% rename from synapse/storage/schema/main/delta/73/02thread_receipts.sql.sqlite rename to synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite diff --git a/synapse/storage/schema/main/delta/73/03thread_receipts.sql b/synapse/storage/schema/main/delta/72/08thread_receipts.sql similarity index 100% rename from synapse/storage/schema/main/delta/73/03thread_receipts.sql rename to synapse/storage/schema/main/delta/72/08thread_receipts.sql From 9312d1a28290e08a2ecff81a6b1838783d62b202 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 14 Sep 2022 11:54:32 -0400 Subject: [PATCH 8/8] Add event_stream_ordering column to receipts_linearized. --- .../schema/main/delta/72/07thread_receipts.sql.sqlite | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite index 226710122f70..232f67deb4ac 100644 --- a/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite +++ b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite @@ -26,6 +26,7 @@ CREATE TABLE receipts_linearized_new ( user_id TEXT NOT NULL, event_id TEXT NOT NULL, thread_id TEXT, + event_stream_ordering BIGINT, data TEXT NOT NULL, CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id), CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id) @@ -48,8 +49,8 @@ DROP INDEX IF EXISTS receipts_linearized_room_stream; DROP INDEX IF EXISTS receipts_linearized_user; -- Copy the data. -INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, data) - SELECT stream_id, room_id, receipt_type, user_id, event_id, data +INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data) + SELECT stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data FROM receipts_linearized; INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data) SELECT room_id, receipt_type, user_id, event_ids, data