From 19e82e0fa648d80c54e510b66b18bb31fed2fabe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2022 11:39:57 +0100 Subject: [PATCH 01/19] Refactor get_last_receipt_event_id_for_user --- synapse/storage/databases/main/receipts.py | 72 ++++++++++++---------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index b6106affa644..9caaae50f856 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -118,7 +118,7 @@ def get_max_receipt_stream_id(self) -> int: return self._receipts_id_gen.get_current_token() async def get_last_receipt_event_id_for_user( - self, user_id: str, room_id: str, receipt_types: Iterable[str] + self, user_id: str, room_id: str, receipt_types: Collection[str] ) -> Optional[str]: """ Fetch the event ID for the latest receipt in a room with one of the given receipt types. @@ -132,52 +132,59 @@ async def get_last_receipt_event_id_for_user( Returns: The latest receipt, if one exists. """ - latest_event_id: Optional[str] = None - latest_stream_ordering = 0 - for receipt_type in receipt_types: - result = await self._get_last_receipt_event_id_for_user( - user_id, room_id, receipt_type - ) - if result is None: - continue - event_id, stream_ordering = result - - if latest_event_id is None or latest_stream_ordering < stream_ordering: - latest_event_id = event_id - latest_stream_ordering = stream_ordering + result = await self.db_pool.runInteraction( + "get_last_receipt_event_id_for_user", + self.get_last_receipt_for_user_txn, + user_id, + room_id, + receipt_types, + ) + if not result: + return None - return latest_event_id + event_id, _ = result + return event_id - @cached() - async def _get_last_receipt_event_id_for_user( - self, user_id: str, room_id: str, receipt_type: str + def get_last_receipt_for_user_txn( + self, + txn: LoggingTransaction, + user_id: str, + room_id: str, + receipt_types: Collection[str], ) -> Optional[Tuple[str, int]]: """ - Fetch the event ID and stream ordering for the latest receipt. + Fetch the event ID and stream_ordering for the latest receipt in a room + with one of the given receipt types. Args: - user_id: The user to fetch receipts for. - room_id: The room ID to fetch the receipt for. - receipt_type: The receipt type to fetch. + user_id: The user to fetch receipts for. room_id: The room ID to + fetch the receipt for. receipt_type: The receipt types to fetch. + Earlier receipt types + are given priority if multiple receipts point to the same event. Returns: - The event ID and stream ordering of the latest receipt, if one exists; - otherwise `None`. + The latest receipt, if one exists. """ - sql = """ + + clause, args = make_in_list_sql_clause( + self.database_engine, "receipt_type", receipt_types + ) + + sql = f""" SELECT event_id, stream_ordering FROM receipts_linearized INNER JOIN events USING (room_id, event_id) - WHERE user_id = ? + WHERE {clause} + AND user_id = ? AND room_id = ? - AND receipt_type = ? + ORDER BY stream_ordering DESC + LIMIT 1 """ - def f(txn: LoggingTransaction) -> Optional[Tuple[str, int]]: - txn.execute(sql, (user_id, room_id, receipt_type)) - return cast(Optional[Tuple[str, int]], txn.fetchone()) + args.extend((user_id, room_id)) + txn.execute(sql, args) - return await self.db_pool.runInteraction("get_own_receipt_for_user", f) + return cast(Optional[Tuple[str, int]], txn.fetchone()) async def get_receipts_for_user( self, user_id: str, receipt_types: Iterable[str] @@ -577,9 +584,6 @@ def invalidate_caches_for_receipt( ) -> None: self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type)) self._get_linearized_receipts_for_room.invalidate((room_id,)) - self._get_last_receipt_event_id_for_user.invalidate( - (user_id, room_id, receipt_type) - ) def process_replication_rows( self, From 794d69153be5b196e16c383b18934c9b0a8df45c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2022 12:10:09 +0100 Subject: [PATCH 02/19] Fetch latest receipt inside get_unread_counts --- synapse/handlers/sync.py | 10 ++---- synapse/push/push_tools.py | 33 ++++++++----------- synapse/storage/databases/main/__init__.py | 4 +-- .../databases/main/event_push_actions.py | 27 ++++++++------- synapse/storage/databases/main/push_rule.py | 2 +- synapse/storage/databases/main/receipts.py | 6 ++++ .../replication/slave/storage/test_events.py | 14 ++++++-- 7 files changed, 49 insertions(+), 47 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b4ead79f9733..53fc42bda261 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -18,7 +18,7 @@ import attr from prometheus_client import Counter -from synapse.api.constants import EventTypes, Membership, ReceiptTypes +from synapse.api.constants import EventTypes, Membership from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS @@ -1054,14 +1054,10 @@ async def unread_notifs_for_room_id( self, room_id: str, sync_config: SyncConfig ) -> NotifCounts: with Measure(self.clock, "unread_notifs_for_room_id"): - last_unread_event_id = await self.store.get_last_receipt_event_id_for_user( - user_id=sync_config.user.to_string(), - room_id=room_id, - receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), - ) return await self.store.get_unread_event_push_actions_by_room_for_user( - room_id, sync_config.user.to_string(), last_unread_event_id + room_id, + sync_config.user.to_string(), ) async def generate_sync_result( diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 8397229ccb72..6661887d9f92 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -13,7 +13,6 @@ # limitations under the License. from typing import Dict -from synapse.api.constants import ReceiptTypes from synapse.events import EventBase from synapse.push.presentable_names import calculate_room_name, name_from_member_event from synapse.storage.controllers import StorageControllers @@ -24,30 +23,24 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) - invites = await store.get_invited_rooms_for_local_user(user_id) joins = await store.get_rooms_for_user(user_id) - my_receipts_by_room = await store.get_receipts_for_user( - user_id, (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE) - ) - badge = len(invites) for room_id in joins: - if room_id in my_receipts_by_room: - last_unread_event_id = my_receipts_by_room[room_id] - - notifs = await ( - store.get_unread_event_push_actions_by_room_for_user( - room_id, user_id, last_unread_event_id - ) + notifs = await ( + store.get_unread_event_push_actions_by_room_for_user( + room_id, + user_id, ) - if notifs.notify_count == 0: - continue + ) + if notifs.notify_count == 0: + continue - if group_by_room: - # return one badge count per conversation - badge += 1 - else: - # increment the badge count by the number of unread messages in the room - badge += notifs.notify_count + if group_by_room: + # return one badge count per conversation + badge += 1 + else: + # increment the badge count by the number of unread messages in the room + badge += notifs.notify_count return badge diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 11d9d16c1900..e8f53f1e88c6 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -105,13 +105,14 @@ class DataStore( PusherStore, PushRuleStore, ApplicationServiceTransactionStore, + EventPushActionsStore, + ServerMetricsStore, ReceiptsStore, EndToEndKeyStore, EndToEndRoomKeyStore, SearchStore, TagsStore, AccountDataStore, - EventPushActionsStore, OpenIdStore, ClientIpWorkerStore, DeviceStore, @@ -126,7 +127,6 @@ class DataStore( UIAuthStore, EventForwardExtremitiesStore, CacheInvalidationWorkerStore, - ServerMetricsStore, LockStore, SessionStore, ): diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index b019979350e3..b27d99463b3a 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -17,6 +17,7 @@ import attr +from synapse.api.constants import ReceiptTypes from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( @@ -24,6 +25,7 @@ LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -119,7 +121,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st return DEFAULT_NOTIF_ACTION -class EventPushActionsWorkerStore(SQLBaseStore): +class EventPushActionsWorkerStore(ReceiptsWorkerStore, SQLBaseStore): def __init__( self, database: DatabasePool, @@ -148,12 +150,11 @@ def __init__( self._rotate_notifs, 30 * 60 * 1000 ) - @cached(num_args=3, tree=True, max_entries=5000) + @cached(tree=True, max_entries=5000) async def get_unread_event_push_actions_by_room_for_user( self, room_id: str, user_id: str, - last_read_event_id: Optional[str], ) -> NotifCounts: """Get the notification count, the highlight count and the unread message count for a given user in a given room after the given read receipt. @@ -165,8 +166,6 @@ async def get_unread_event_push_actions_by_room_for_user( Args: room_id: The room to retrieve the counts in. user_id: The user to retrieve the counts for. - last_read_event_id: The event associated with the latest read receipt for - this user in this room. None if no receipt for this user in this room. Returns A dict containing the counts mentioned earlier in this docstring, @@ -178,7 +177,6 @@ async def get_unread_event_push_actions_by_room_for_user( self._get_unread_counts_by_receipt_txn, room_id, user_id, - last_read_event_id, ) def _get_unread_counts_by_receipt_txn( @@ -186,16 +184,17 @@ def _get_unread_counts_by_receipt_txn( txn: LoggingTransaction, room_id: str, user_id: str, - last_read_event_id: Optional[str], ) -> NotifCounts: - stream_ordering = None + result = self.get_last_receipt_for_user_txn( + txn, + user_id, + room_id, + receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), + ) - if last_read_event_id is not None: - stream_ordering = self.get_stream_id_for_event_txn( # type: ignore[attr-defined] - txn, - last_read_event_id, - allow_none=True, - ) + stream_ordering = None + if result: + _, stream_ordering = result if stream_ordering is None: # Either last_read_event_id is None, or it's an event we don't have (e.g. diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index d5aefe02b6dc..86649c1e6cd6 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -110,9 +110,9 @@ def _load_rules( # the abstract methods being implemented. class PushRulesWorkerStore( ApplicationServiceWorkerStore, - ReceiptsWorkerStore, PusherWorkerStore, RoomMemberWorkerStore, + ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta, diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 9caaae50f856..f78e11717427 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -585,6 +585,12 @@ def invalidate_caches_for_receipt( self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type)) self._get_linearized_receipts_for_room.invalidate((room_id,)) + # We use this method to invalidate so that we don't end up with circular + # dependencies between the receipts and push action stores. + self._attempt_to_invalidate_cache( + "get_unread_event_push_actions_by_room_for_user", (room_id,) + ) + def process_replication_rows( self, stream_name: str, diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 6d3d4afe52c7..9fc7d0a33d3c 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -16,6 +16,7 @@ from canonicaljson import encode_canonical_json +from synapse.api.constants import ReceiptTypes from synapse.api.room_versions import RoomVersions from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict from synapse.handlers.room import RoomEventSource @@ -164,9 +165,16 @@ def test_push_actions_for_user(self): ) event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello") self.replicate() + + self.get_success( + self.master_store.insert_receipt( + ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], {} + ) + ) + self.check( "get_unread_event_push_actions_by_room_for_user", - [ROOM_ID, USER_ID_2, event1.event_id], + [ROOM_ID, USER_ID_2], NotifCounts(highlight_count=0, unread_count=0, notify_count=0), ) @@ -179,7 +187,7 @@ def test_push_actions_for_user(self): self.replicate() self.check( "get_unread_event_push_actions_by_room_for_user", - [ROOM_ID, USER_ID_2, event1.event_id], + [ROOM_ID, USER_ID_2], NotifCounts(highlight_count=0, unread_count=0, notify_count=1), ) @@ -194,7 +202,7 @@ def test_push_actions_for_user(self): self.replicate() self.check( "get_unread_event_push_actions_by_room_for_user", - [ROOM_ID, USER_ID_2, event1.event_id], + [ROOM_ID, USER_ID_2], NotifCounts(highlight_count=1, unread_count=0, notify_count=2), ) From dd249c722a241bcfabf0c9e2cfc27541c228dbdb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jun 2022 10:18:35 +0100 Subject: [PATCH 03/19] Ignore rotated push actions when counting notifs --- .../databases/main/event_push_actions.py | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index b27d99463b3a..88c5764cf7b0 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -217,42 +217,52 @@ def _get_unread_counts_by_receipt_txn( def _get_unread_counts_by_pos_txn( self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int ) -> NotifCounts: - sql = ( - "SELECT" - " COUNT(CASE WHEN notif = 1 THEN 1 END)," - " COUNT(CASE WHEN highlight = 1 THEN 1 END)," - " COUNT(CASE WHEN unread = 1 THEN 1 END)" - " FROM event_push_actions ea" - " WHERE user_id = ?" - " AND room_id = ?" - " AND stream_ordering > ?" - ) - - txn.execute(sql, (user_id, room_id, stream_ordering)) - row = txn.fetchone() + """Get the number of unread messages for a user/room that have happened + since the given stream ordering. + """ (notif_count, highlight_count, unread_count) = (0, 0, 0) - if row: - (notif_count, highlight_count, unread_count) = row - + # First we pull the counts from the summary table txn.execute( """ - SELECT notif_count, unread_count FROM event_push_summary + SELECT stream_ordering, notif_count, COALESCE(unread_count, 0) + FROM event_push_summary WHERE room_id = ? AND user_id = ? AND stream_ordering > ? """, (room_id, user_id, stream_ordering), ) row = txn.fetchone() + summary_stream_ordering = 0 if row: - notif_count += row[0] + summary_stream_ordering = row[0] + notif_count += row[1] + unread_count += row[2] - if row[1] is not None: - # The unread_count column of event_push_summary is NULLable, so we need - # to make sure we don't try increasing the unread counts if it's NULL - # for this row. - unread_count += row[1] + # And then we need to count push actions that haven't been summarized + # yet. + sql = """ + SELECT + COUNT(CASE WHEN notif = 1 THEN 1 END), + COUNT(CASE WHEN highlight = 1 THEN 1 END), + COUNT(CASE WHEN unread = 1 THEN 1 END) + FROM event_push_actions ea + WHERE user_id = ? + AND room_id = ? + AND ea.stream_ordering > ? + """ + + # We only want to pull out push actions that we haven't summarized yet. + stream_ordering = max(summary_stream_ordering, stream_ordering) + + txn.execute(sql, (user_id, room_id, stream_ordering)) + row = txn.fetchone() + + if row: + notif_count += row[0] + highlight_count += row[1] + unread_count += row[2] return NotifCounts( notify_count=notif_count, From ef5ac19b130651f0cf0314376decef736c843b93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jun 2022 10:23:31 +0100 Subject: [PATCH 04/19] Split up summarizing and deleting old push actions --- .../databases/main/event_push_actions.py | 70 +++++++++++++++++-- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 88c5764cf7b0..31358e3033fd 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -763,6 +763,8 @@ async def _rotate_notifs(self) -> None: if caught_up: break await self.hs.get_clock().sleep(self._rotate_delay) + + await self._remove_old_push_actions_that_have_rotated() finally: self._doing_notif_rotation = False @@ -924,18 +926,72 @@ def _rotate_notifs_before_txn( ) txn.execute( - "DELETE FROM event_push_actions" - " WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0", - (old_rotate_stream_ordering, rotate_to_stream_ordering), + "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?", + (rotate_to_stream_ordering,), ) - logger.info("Rotating notifications, deleted %s push actions", txn.rowcount) + async def _remove_old_push_actions_that_have_rotated( + self, + ) -> None: + """Clear out old push actions that rotated been summarized.""" - txn.execute( - "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?", - (rotate_to_stream_ordering,), + # We want to clear out anything that older than a day that *has* already + # been rotated. + rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol( + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + + max_stream_ordering_to_delete = min( + rotated_upto_stream_ordering, self.stream_ordering_day_ago ) + def remove_old_push_actions_that_have_rotated_txn( + txn: LoggingTransaction, + ) -> bool: + # We don't want to clear out too much at a time, so we bound our + # deletes. + batch_size = 10000 + + txn.execute( + """ + SELECT stream_ordering FROM event_push_actions + WHERE stream_ordering < ? AND highlight = 0 + ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? + """, + ( + max_stream_ordering_to_delete, + batch_size, + ), + ) + stream_row = txn.fetchone() + + if stream_row: + (stream_ordering,) = stream_row + else: + stream_ordering = max_stream_ordering_to_delete + + txn.execute( + """ + DELETE FROM event_push_actions + WHERE stream_ordering < ? AND highlight = 0 + """, + (stream_ordering,), + ) + + logger.info("Rotating notifications, deleted %s push actions", txn.rowcount) + + return txn.rowcount < batch_size + + while True: + done = await self.db_pool.runInteraction( + "_remove_old_push_actions_that_have_rotated", + remove_old_push_actions_that_have_rotated_txn, + ) + if done: + break + def _remove_old_push_actions_before_txn( self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int ) -> None: From 6ff878df506a242bb9a1e98e4c3ae41e7dd83459 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jun 2022 10:32:21 +0100 Subject: [PATCH 05/19] Make push action summaries be up to date --- .../storage/databases/main/event_push_actions.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 31358e3033fd..8003ee91aa8d 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -25,6 +25,7 @@ LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -121,7 +122,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st return DEFAULT_NOTIF_ACTION -class EventPushActionsWorkerStore(ReceiptsWorkerStore, SQLBaseStore): +class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore): def __init__( self, database: DatabasePool, @@ -793,20 +794,16 @@ def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool: stream_row = txn.fetchone() if stream_row: (offset_stream_ordering,) = stream_row - assert self.stream_ordering_day_ago is not None - rotate_to_stream_ordering = min( - self.stream_ordering_day_ago, offset_stream_ordering - ) - caught_up = offset_stream_ordering >= self.stream_ordering_day_ago + rotate_to_stream_ordering = offset_stream_ordering + caught_up = False else: - rotate_to_stream_ordering = self.stream_ordering_day_ago + rotate_to_stream_ordering = self._stream_id_gen.get_current_token() caught_up = True logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering) self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering) - # We have caught up iff we were limited by `stream_ordering_day_ago` return caught_up def _rotate_notifs_before_txn( From 009307c3f325ea18fdbd8d62cd8b7d53e2056b1d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jun 2022 10:59:43 +0100 Subject: [PATCH 06/19] Add unique index to 'event_push_summary' table --- synapse/_scripts/synapse_port_db.py | 4 ++++ synapse/storage/database.py | 1 + .../databases/main/event_push_actions.py | 9 +++++++++ .../delta/70/02event_push_summary_unique.sql | 18 ++++++++++++++++++ 4 files changed, 32 insertions(+) create mode 100644 synapse/storage/schema/main/delta/70/02event_push_summary_unique.sql diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index c753dfa7cb79..191735fcf5ea 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -58,6 +58,9 @@ from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore from synapse.storage.databases.main.devices import DeviceBackgroundUpdateStore from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyBackgroundStore +from synapse.storage.databases.main.event_push_actions import ( + EventPushActionsWorkerStore, +) from synapse.storage.databases.main.events_bg_updates import ( EventsBackgroundUpdatesStore, ) @@ -200,6 +203,7 @@ class Store( + EventPushActionsWorkerStore, ClientIpBackgroundUpdateStore, DeviceInboxBackgroundUpdateStore, DeviceBackgroundUpdateStore, diff --git a/synapse/storage/database.py b/synapse/storage/database.py index a78d68a9d7fe..e8c63cf56779 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -92,6 +92,7 @@ "event_search": "event_search_event_id_idx", "local_media_repository_thumbnails": "local_media_repository_thumbnails_method_idx", "remote_media_cache_thumbnails": "remote_media_repository_thumbnails_method_idx", + "event_push_summary": "event_push_summary_unique_index", } diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 8003ee91aa8d..05126fb7b626 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -151,6 +151,15 @@ def __init__( self._rotate_notifs, 30 * 60 * 1000 ) + self.db_pool.updates.register_background_index_update( + "event_push_summary_unique_index", + index_name="event_push_summary_unique_index", + table="event_push_summary", + columns=["user_id", "room_id"], + unique=True, + replaces_index="event_push_summary_user_rm", + ) + @cached(tree=True, max_entries=5000) async def get_unread_event_push_actions_by_room_for_user( self, diff --git a/synapse/storage/schema/main/delta/70/02event_push_summary_unique.sql b/synapse/storage/schema/main/delta/70/02event_push_summary_unique.sql new file mode 100644 index 000000000000..9cdcea21aec5 --- /dev/null +++ b/synapse/storage/schema/main/delta/70/02event_push_summary_unique.sql @@ -0,0 +1,18 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a unique index to `event_push_summary` +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7002, 'event_push_summary_unique_index', '{}'); From b16d1c29ddb84db60bead0b371693fc932e4ad8c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jun 2022 15:06:17 +0100 Subject: [PATCH 07/19] Correctly handle highlights We need to count highlights separately from notifications as we don't summarize them. --- .../databases/main/event_push_actions.py | 44 +++++++++++-------- tests/storage/test_event_push_actions.py | 8 +++- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 05126fb7b626..346d40374ab8 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -82,15 +82,15 @@ class UserPushAction(EmailPushAction): profile_tag: str -@attr.s(slots=True, frozen=True, auto_attribs=True) +@attr.s(slots=True, auto_attribs=True) class NotifCounts: """ The per-user, per-room count of notifications. Used by sync and push. """ - notify_count: int - unread_count: int - highlight_count: int + notify_count: int = 0 + unread_count: int = 0 + highlight_count: int = 0 def _serialize_action(actions: List[Union[dict, str]], is_highlight: bool) -> str: @@ -231,7 +231,7 @@ def _get_unread_counts_by_pos_txn( since the given stream ordering. """ - (notif_count, highlight_count, unread_count) = (0, 0, 0) + counts = NotifCounts() # First we pull the counts from the summary table txn.execute( @@ -247,15 +247,27 @@ def _get_unread_counts_by_pos_txn( summary_stream_ordering = 0 if row: summary_stream_ordering = row[0] - notif_count += row[1] - unread_count += row[2] + counts.notify_count += row[1] + counts.unread_count += row[2] - # And then we need to count push actions that haven't been summarized + # Next we need to count highlights, which aren't summarized + sql = """ + SELECT COUNT(*) FROM event_push_actions + WHERE user_id = ? + AND room_id = ? + AND stream_ordering > ? + AND highlight = 1 + """ + txn.execute(sql, (user_id, room_id, stream_ordering)) + row = txn.fetchone() + if row: + counts.highlight_count += row[0] + + # Finally we need to count push actions that haven't been summarized # yet. sql = """ SELECT COUNT(CASE WHEN notif = 1 THEN 1 END), - COUNT(CASE WHEN highlight = 1 THEN 1 END), COUNT(CASE WHEN unread = 1 THEN 1 END) FROM event_push_actions ea WHERE user_id = ? @@ -270,15 +282,10 @@ def _get_unread_counts_by_pos_txn( row = txn.fetchone() if row: - notif_count += row[0] - highlight_count += row[1] - unread_count += row[2] - - return NotifCounts( - notify_count=notif_count, - unread_count=unread_count, - highlight_count=highlight_count, - ) + counts.notify_count += row[0] + counts.unread_count += row[1] + + return counts async def get_push_action_users_in_range( self, min_stream_ordering: int, max_stream_ordering: int @@ -836,7 +843,6 @@ def _rotate_notifs_before_txn( max(stream_ordering) as stream_ordering FROM event_push_actions WHERE ? <= stream_ordering AND stream_ordering < ? - AND highlight = 0 AND %s = 1 GROUP BY user_id, room_id ) AS upd diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 0f9add48417a..a8b3a14c3d0b 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -144,8 +144,12 @@ def _mark_read(stream, depth): _assert_counts(1, 1) _rotate(9) _assert_counts(1, 1) - _rotate(10) - _assert_counts(1, 1) + + # Check that adding another notification and rotating after highlight + # works. + _inject_actions(10, PlAIN_NOTIF) + _rotate(11) + _assert_counts(2, 1) def test_find_first_stream_ordering_after_ts(self): def add_event(so, ts): From ba5cac5d9d83c29afd38c2c3ef97e619c2dc2ea8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jun 2022 16:29:09 +0100 Subject: [PATCH 08/19] Recalculate summary counts on new receipt --- .../databases/main/event_push_actions.py | 66 +++++++++++++++---- tests/storage/test_event_push_actions.py | 7 ++ 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 346d40374ab8..7cd4268c8176 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -265,7 +265,34 @@ def _get_unread_counts_by_pos_txn( # Finally we need to count push actions that haven't been summarized # yet. - sql = """ + # We only want to pull out push actions that we haven't summarized yet. + stream_ordering = max(stream_ordering, summary_stream_ordering) + notify_count, unread_count = self._get_notif_unread_count_for_user_room( + txn, room_id, user_id, stream_ordering + ) + + counts.notify_count += notify_count + counts.unread_count += unread_count + + return counts + + def _get_notif_unread_count_for_user_room( + self, + txn: LoggingTransaction, + room_id: str, + user_id: str, + stream_ordering: int, + max_stream_ordering: Optional[int] = None, + ) -> Tuple[int, int]: + """Returns the notify and unread counts for the given user/room.""" + + clause = "" + args = [user_id, room_id, stream_ordering] + if max_stream_ordering is not None: + clause = "AND ea.stream_ordering <= ?" + args.append(max_stream_ordering) + + sql = f""" SELECT COUNT(CASE WHEN notif = 1 THEN 1 END), COUNT(CASE WHEN unread = 1 THEN 1 END) @@ -273,19 +300,16 @@ def _get_unread_counts_by_pos_txn( WHERE user_id = ? AND room_id = ? AND ea.stream_ordering > ? + {clause} """ - # We only want to pull out push actions that we haven't summarized yet. - stream_ordering = max(summary_stream_ordering, stream_ordering) - - txn.execute(sql, (user_id, room_id, stream_ordering)) + txn.execute(sql, args) row = txn.fetchone() if row: - counts.notify_count += row[0] - counts.unread_count += row[1] + return cast(Tuple[int, int], row) - return counts + return 0, 0 async def get_push_action_users_in_range( self, min_stream_ordering: int, max_stream_ordering: int @@ -1042,12 +1066,26 @@ def _remove_old_push_actions_before_txn( (user_id, room_id, stream_ordering, self.stream_ordering_month_ago), ) - txn.execute( - """ - DELETE FROM event_push_summary - WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? - """, - (room_id, user_id, stream_ordering), + old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + + notif_count, unread_count = self._get_notif_unread_count_for_user_room( + txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering + ) + + self.db_pool.simple_upsert_txn( + txn, + table="event_push_summary", + keyvalues={"room_id": room_id, "user_id": user_id}, + values={ + "notif_count": notif_count, + "unread_count": unread_count, + "stream_ordering": stream_ordering, + }, ) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index a8b3a14c3d0b..d0fcd8309797 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -151,6 +151,13 @@ def _mark_read(stream, depth): _rotate(11) _assert_counts(2, 1) + # Check that sending read receipts at different points results in the + # right counts. + _mark_read(8, 8) + _assert_counts(1, 0) + _mark_read(10, 10) + _assert_counts(0, 0) + def test_find_first_stream_ordering_after_ts(self): def add_event(so, ts): self.get_success( From 599ec64bc04fc8f9abf41fdfa3df0320f3178f83 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jun 2022 09:41:56 +0100 Subject: [PATCH 09/19] Fix tests --- tests/storage/test_event_push_actions.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index d0fcd8309797..4273524c4c3f 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -51,10 +51,16 @@ def test_count_aggregation(self): room_id = "!foo:example.com" user_id = "@user1235:example.com" + last_read_stream_ordering = [0] + def _assert_counts(noitf_count, highlight_count): counts = self.get_success( self.store.db_pool.runInteraction( - "", self.store._get_unread_counts_by_pos_txn, room_id, user_id, 0 + "", + self.store._get_unread_counts_by_pos_txn, + room_id, + user_id, + last_read_stream_ordering[0], ) ) self.assertEqual( @@ -98,6 +104,7 @@ def _rotate(stream): ) def _mark_read(stream, depth): + last_read_stream_ordering[0] = stream self.get_success( self.store.db_pool.runInteraction( "", From 6d69eb72df8277011e682dd377317ddb86b0ce61 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jun 2022 09:57:45 +0100 Subject: [PATCH 10/19] Newsfile --- changelog.d/13005.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13005.misc diff --git a/changelog.d/13005.misc b/changelog.d/13005.misc new file mode 100644 index 000000000000..3bb51962e79e --- /dev/null +++ b/changelog.d/13005.misc @@ -0,0 +1 @@ +Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room. From f72464eac2ab5bc9807c09e894ed58c42e55a0aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jun 2022 13:54:05 +0100 Subject: [PATCH 11/19] Change tests to expect badge count when no read receipt --- tests/push/test_http.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/push/test_http.py b/tests/push/test_http.py index ba158f5d93e4..d9c68cdd2d22 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -577,7 +577,7 @@ def test_push_unread_count_group_by_room(self) -> None: # Carry out our option-value specific test # # This push should still only contain an unread count of 1 (for 1 unread room) - self._check_push_attempt(6, 1) + self._check_push_attempt(7, 1) @override_config({"push": {"group_unread_count_by_room": False}}) def test_push_unread_count_message_count(self) -> None: @@ -591,7 +591,7 @@ def test_push_unread_count_message_count(self) -> None: # # We're counting every unread message, so there should now be 3 since the # last read receipt - self._check_push_attempt(6, 3) + self._check_push_attempt(7, 3) def _test_push_unread_count(self) -> None: """ @@ -641,18 +641,18 @@ def _test_push_unread_count(self) -> None: response = self.helper.send( room_id, body="Hello there!", tok=other_access_token ) - # To get an unread count, the user who is getting notified has to have a read - # position in the room. We'll set the read position to this event in a moment + first_message_event_id = response["event_id"] expected_push_attempts = 1 - self._check_push_attempt(expected_push_attempts, 0) + self._check_push_attempt(expected_push_attempts, 1) self._send_read_request(access_token, first_message_event_id, room_id) - # Unread count has not changed. Therefore, ensure that read request does not - # trigger a push notification. - self.assertEqual(len(self.push_attempts), 1) + # Unread count has changed. Therefore, ensure that read request triggers + # a push notification. + expected_push_attempts += 1 + self.assertEqual(len(self.push_attempts), expected_push_attempts) # Send another message response2 = self.helper.send( From d5bc282d224342ead8398a2c77329f0bb35ad553 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Jun 2022 16:15:17 +0100 Subject: [PATCH 12/19] Apply suggestions from code review Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/storage/databases/main/event_push_actions.py | 2 +- synapse/storage/databases/main/receipts.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 7cd4268c8176..6626d609d1f0 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -969,7 +969,7 @@ def _rotate_notifs_before_txn( async def _remove_old_push_actions_that_have_rotated( self, ) -> None: - """Clear out old push actions that rotated been summarized.""" + """Clear out old push actions that have been summarized.""" # We want to clear out anything that older than a day that *has* already # been rotated. diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index f78e11717427..f5b81bb4c401 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -157,9 +157,9 @@ def get_last_receipt_for_user_txn( with one of the given receipt types. Args: - user_id: The user to fetch receipts for. room_id: The room ID to - fetch the receipt for. receipt_type: The receipt types to fetch. - Earlier receipt types + user_id: The user to fetch receipts for. + room_id: The room ID to fetch the receipt for. + receipt_type: The receipt types to fetch. Earlier receipt types are given priority if multiple receipts point to the same event. Returns: From 1d5410265dd17cad2a0574eac4e8d88a93c5b63b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Jun 2022 09:49:33 +0100 Subject: [PATCH 13/19] Update 'event_push_summary' comment --- .../storage/schema/main/delta/40/event_push_summary.sql | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/storage/schema/main/delta/40/event_push_summary.sql b/synapse/storage/schema/main/delta/40/event_push_summary.sql index 3918f0b79494..499bf6017860 100644 --- a/synapse/storage/schema/main/delta/40/event_push_summary.sql +++ b/synapse/storage/schema/main/delta/40/event_push_summary.sql @@ -13,9 +13,10 @@ * limitations under the License. */ --- Aggregate of old notification counts that have been deleted out of the --- main event_push_actions table. This count does not include those that were --- highlights, as they remain in the event_push_actions table. +-- Aggregate of notification counts up to `stream_ordering`, including those +-- that may have been deleted out of the main event_push_actions table. This +-- count does not include those that were highlights, as they remain in the +-- event_push_actions table. CREATE TABLE event_push_summary ( user_id TEXT NOT NULL, room_id TEXT NOT NULL, From cff602748e2c80423b95603add8fc87aba52ee33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Jun 2022 09:51:40 +0100 Subject: [PATCH 14/19] Better comment --- synapse/storage/databases/main/event_push_actions.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 6626d609d1f0..1305cf114293 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -284,7 +284,12 @@ def _get_notif_unread_count_for_user_room( stream_ordering: int, max_stream_ordering: Optional[int] = None, ) -> Tuple[int, int]: - """Returns the notify and unread counts for the given user/room.""" + """Returns the notify and unread counts from `event_push_actions` for + the given user/room in the given range. + + Does not consult `event_push_summary` table, which may include push + actions that have been deleted from `event_push_actions` table. + """ clause = "" args = [user_id, room_id, stream_ordering] From ecaafd0db8afd6d9f822c00fa72092cc1998d347 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Jun 2022 09:55:01 +0100 Subject: [PATCH 15/19] Fixup comment --- synapse/storage/databases/main/receipts.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index f5b81bb4c401..06879ef6ff20 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -159,8 +159,7 @@ def get_last_receipt_for_user_txn( Args: user_id: The user to fetch receipts for. room_id: The room ID to fetch the receipt for. - receipt_type: The receipt types to fetch. Earlier receipt types - are given priority if multiple receipts point to the same event. + receipt_type: The receipt types to fetch. Returns: The latest receipt, if one exists. From 6e8f49d70dbf51129570e88c39fe2577ac1ec597 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Jun 2022 10:22:15 +0100 Subject: [PATCH 16/19] Add test with and without receipt --- tests/replication/slave/storage/test_events.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 9fc7d0a33d3c..531a0db2d071 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -15,6 +15,7 @@ from typing import Iterable, Optional from canonicaljson import encode_canonical_json +from parameterized import parameterized from synapse.api.constants import ReceiptTypes from synapse.api.room_versions import RoomVersions @@ -157,20 +158,22 @@ def test_invites(self): ], ) - def test_push_actions_for_user(self): + @parameterized.expand([(True,), (False,)]) + def test_push_actions_for_user(self, send_receipt: bool): self.persist(type="m.room.create", key="", creator=USER_ID) - self.persist(type="m.room.join", key=USER_ID, membership="join") + self.persist(type="m.room.member", key=USER_ID, membership="join") self.persist( - type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join" + type="m.room.member", sender=USER_ID, key=USER_ID_2, membership="join" ) event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello") self.replicate() - self.get_success( - self.master_store.insert_receipt( - ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], {} + if send_receipt: + self.get_success( + self.master_store.insert_receipt( + ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], {} + ) ) - ) self.check( "get_unread_event_push_actions_by_room_for_user", From 840bc9161c59af632e1c690a88502c4e7f64118a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Jun 2022 13:52:56 +0100 Subject: [PATCH 17/19] Use old_rotate_stream_ordering --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 1305cf114293..ae705889a5ad 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1089,7 +1089,7 @@ def _remove_old_push_actions_before_txn( values={ "notif_count": notif_count, "unread_count": unread_count, - "stream_ordering": stream_ordering, + "stream_ordering": old_rotate_stream_ordering, }, ) From 5e066158c65b297570e997f3fe19d47a6caec66d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Jun 2022 15:22:04 +0100 Subject: [PATCH 18/19] Update docstring --- synapse/storage/databases/main/receipts.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 06879ef6ff20..bec6d60577b5 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -126,8 +126,7 @@ async def get_last_receipt_event_id_for_user( Args: user_id: The user to fetch receipts for. room_id: The room ID to fetch the receipt for. - receipt_type: The receipt types to fetch. Earlier receipt types - are given priority if multiple receipts point to the same event. + receipt_type: The receipt types to fetch. Returns: The latest receipt, if one exists. From 01bdf3a91251c0aea91af505a3c57a97b365a05e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Jun 2022 15:22:36 +0100 Subject: [PATCH 19/19] Move delta file to correct location --- .../schema/main/delta/{70 => 71}/02event_push_summary_unique.sql | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename synapse/storage/schema/main/delta/{70 => 71}/02event_push_summary_unique.sql (100%) diff --git a/synapse/storage/schema/main/delta/70/02event_push_summary_unique.sql b/synapse/storage/schema/main/delta/71/02event_push_summary_unique.sql similarity index 100% rename from synapse/storage/schema/main/delta/70/02event_push_summary_unique.sql rename to synapse/storage/schema/main/delta/71/02event_push_summary_unique.sql