From 763a65e97e25945c33631caadabbce6cf7ae45ec Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 23 Aug 2022 16:49:49 +0100 Subject: [PATCH 1/6] Add txn to get receipts and stream ordering by room for user --- .../databases/main/event_push_actions.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index eabf9c973964..892bdfa5c78d 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -459,6 +459,32 @@ def f(txn: LoggingTransaction) -> List[str]: return await self.db_pool.runInteraction("get_push_action_users_in_range", f) + def _get_receipts_by_room_txn( + self, txn: LoggingTransaction, user_id: str + ) -> List[Tuple[str, int]]: + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + ( + ReceiptTypes.READ, + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ), + ) + + sql = f""" + SELECT room_id, MAX(stream_ordering) + FROM receipts_linearized + INNER JOIN events USING (room_id, event_id) + WHERE {receipt_types_clause} + AND user_id = ? + GROUP BY room_id + """ + + args.extend((user_id,)) + txn.execute(sql, args) + return cast(List[Tuple[str, int]], txn.fetchall()) + async def get_unread_push_actions_for_user_in_range_for_http( self, user_id: str, @@ -482,6 +508,14 @@ async def get_unread_push_actions_for_user_in_range_for_http( The list will have between 0~limit entries. """ + receipts_by_room = dict( + await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_http_receipts", + self._get_receipts_by_room_txn, + user_id=user_id, + ), + ) + # find rooms that have a read receipt in them and return the next # push actions def get_after_receipt( @@ -617,6 +651,14 @@ async def get_unread_push_actions_for_user_in_range_for_email( The list will have between 0~limit entries. """ + receipts_by_room = dict( + await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_email_receipts", + self._get_receipts_by_room_txn, + user_id=user_id, + ), + ) + # find rooms that have a read receipt in them and return the most recent # push actions def get_after_receipt( From 520843ba1f90dbdabb21812ab2a71c988ea1f258 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 23 Aug 2022 16:50:19 +0100 Subject: [PATCH 2/6] Get push actions in single query using new receipts by room --- .../databases/main/event_push_actions.py | 52 ++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 892bdfa5c78d..4eaffc9638d5 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -516,6 +516,26 @@ async def get_unread_push_actions_for_user_in_range_for_http( ), ) + def get_push_actions( + txn: LoggingTransaction, + ) -> List[Tuple[str, str, int, str, bool]]: + sql = """ + SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight + FROM event_push_actions AS ep + WHERE + ep.user_id = ? + AND ep.stream_ordering > ? + AND ep.stream_ordering <= ? + AND ep.notif = 1 + ORDER BY ep.stream_ordering ASC LIMIT ? + """ + txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit)) + return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) + + push_actions = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_http", get_push_actions + ) + # find rooms that have a read receipt in them and return the next # push actions def get_after_receipt( @@ -615,7 +635,10 @@ def get_no_receipt( stream_ordering=row[2], actions=_deserialize_action(row[3], row[4]), ) - for row in after_read_receipt + no_read_receipt + for row in push_actions + # Only include push actions with a stream ordering after any receipt, or without any + # receipt present (invited to but never read rooms). + if row[2] > receipts_by_room.get(row[1], 0) ] # Now sort it so it's ordered correctly, since currently it will @@ -659,6 +682,28 @@ async def get_unread_push_actions_for_user_in_range_for_email( ), ) + def get_push_actions( + txn: LoggingTransaction, + ) -> List[Tuple[str, str, int, str, bool, int]]: + sql = """ + SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, + ep.highlight, e.received_ts + FROM event_push_actions AS ep + INNER JOIN events AS e USING (room_id, event_id) + WHERE + ep.user_id = ? + AND ep.stream_ordering > ? + AND ep.stream_ordering <= ? + AND ep.notif = 1 + ORDER BY ep.stream_ordering DESC LIMIT ? + """ + txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit)) + return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) + + push_actions = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_email", get_push_actions + ) + # find rooms that have a read receipt in them and return the most recent # push actions def get_after_receipt( @@ -758,7 +803,10 @@ def get_no_receipt( actions=_deserialize_action(row[3], row[4]), received_ts=row[5], ) - for row in after_read_receipt + no_read_receipt + for row in push_actions + # Only include push actions with a stream ordering after any receipt, or without any + # receipt present (invited to but never read rooms). + if row[2] > receipts_by_room.get(row[1], 0) ] # Now sort it so it's ordered correctly, since currently it will From 5c6dce67d40262e19b7fc550c238aff45e8b8039 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 23 Aug 2022 16:51:18 +0100 Subject: [PATCH 3/6] Remove old get push actions queries --- .../databases/main/event_push_actions.py | 182 ------------------ 1 file changed, 182 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 4eaffc9638d5..751270f3dafd 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -536,98 +536,6 @@ def get_push_actions( "get_unread_push_actions_for_user_in_range_http", get_push_actions ) - # find rooms that have a read receipt in them and return the next - # push actions - def get_after_receipt( - txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool]]: - # find rooms that have a read receipt in them and return the next - # push actions - - receipt_types_clause, args = make_in_list_sql_clause( - self.database_engine, - "receipt_type", - ( - ReceiptTypes.READ, - ReceiptTypes.READ_PRIVATE, - ReceiptTypes.UNSTABLE_READ_PRIVATE, - ), - ) - - sql = f""" - SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, - ep.highlight - FROM ( - SELECT room_id, - MAX(stream_ordering) as stream_ordering - FROM events - INNER JOIN receipts_linearized USING (room_id, event_id) - WHERE {receipt_types_clause} AND user_id = ? - GROUP BY room_id - ) AS rl, - event_push_actions AS ep - WHERE - ep.room_id = rl.room_id - AND ep.stream_ordering > rl.stream_ordering - AND ep.user_id = ? - AND ep.stream_ordering > ? - AND ep.stream_ordering <= ? - AND ep.notif = 1 - ORDER BY ep.stream_ordering ASC LIMIT ? - """ - args.extend( - (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) - ) - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) - - after_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt - ) - - # There are rooms with push actions in them but you don't have a read receipt in - # them e.g. rooms you've been invited to, so get push actions for rooms which do - # not have read receipts in them too. - def get_no_receipt( - txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool]]: - receipt_types_clause, args = make_in_list_sql_clause( - self.database_engine, - "receipt_type", - ( - ReceiptTypes.READ, - ReceiptTypes.READ_PRIVATE, - ReceiptTypes.UNSTABLE_READ_PRIVATE, - ), - ) - - sql = f""" - SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, - ep.highlight - FROM event_push_actions AS ep - INNER JOIN events AS e USING (room_id, event_id) - WHERE - ep.room_id NOT IN ( - SELECT room_id FROM receipts_linearized - WHERE {receipt_types_clause} AND user_id = ? - GROUP BY room_id - ) - AND ep.user_id = ? - AND ep.stream_ordering > ? - AND ep.stream_ordering <= ? - AND ep.notif = 1 - ORDER BY ep.stream_ordering ASC LIMIT ? - """ - args.extend( - (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) - ) - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) - - no_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt - ) - notifs = [ HttpPushAction( event_id=row[0], @@ -704,96 +612,6 @@ def get_push_actions( "get_unread_push_actions_for_user_in_range_email", get_push_actions ) - # find rooms that have a read receipt in them and return the most recent - # push actions - def get_after_receipt( - txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool, int]]: - receipt_types_clause, args = make_in_list_sql_clause( - self.database_engine, - "receipt_type", - ( - ReceiptTypes.READ, - ReceiptTypes.READ_PRIVATE, - ReceiptTypes.UNSTABLE_READ_PRIVATE, - ), - ) - - sql = f""" - SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, - ep.highlight, e.received_ts - FROM ( - SELECT room_id, - MAX(stream_ordering) as stream_ordering - FROM events - INNER JOIN receipts_linearized USING (room_id, event_id) - WHERE {receipt_types_clause} AND user_id = ? - GROUP BY room_id - ) AS rl, - event_push_actions AS ep - INNER JOIN events AS e USING (room_id, event_id) - WHERE - ep.room_id = rl.room_id - AND ep.stream_ordering > rl.stream_ordering - AND ep.user_id = ? - AND ep.stream_ordering > ? - AND ep.stream_ordering <= ? - AND ep.notif = 1 - ORDER BY ep.stream_ordering DESC LIMIT ? - """ - args.extend( - (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) - ) - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) - - after_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt - ) - - # There are rooms with push actions in them but you don't have a read receipt in - # them e.g. rooms you've been invited to, so get push actions for rooms which do - # not have read receipts in them too. - def get_no_receipt( - txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool, int]]: - receipt_types_clause, args = make_in_list_sql_clause( - self.database_engine, - "receipt_type", - ( - ReceiptTypes.READ, - ReceiptTypes.READ_PRIVATE, - ReceiptTypes.UNSTABLE_READ_PRIVATE, - ), - ) - - sql = f""" - SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, - ep.highlight, e.received_ts - FROM event_push_actions AS ep - INNER JOIN events AS e USING (room_id, event_id) - WHERE - ep.room_id NOT IN ( - SELECT room_id FROM receipts_linearized - WHERE {receipt_types_clause} AND user_id = ? - GROUP BY room_id - ) - AND ep.user_id = ? - AND ep.stream_ordering > ? - AND ep.stream_ordering <= ? - AND ep.notif = 1 - ORDER BY ep.stream_ordering DESC LIMIT ? - """ - args.extend( - (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) - ) - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) - - no_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt - ) - # Make a list of dicts from the two sets of results. notifs = [ EmailPushAction( From 2531eee928dcba40c1c7a1c8c84f4f3662594c65 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 23 Aug 2022 17:16:35 +0100 Subject: [PATCH 4/6] Add changelog file --- changelog.d/13597.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13597.misc diff --git a/changelog.d/13597.misc b/changelog.d/13597.misc new file mode 100644 index 000000000000..eb5e97100809 --- /dev/null +++ b/changelog.d/13597.misc @@ -0,0 +1 @@ + Optimise push action fetching queries. Contributed by Nick @ Beeper (@fizzadar). From 55f587c6be3ad4acdb96c1b5746522830fea27b2 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 23 Aug 2022 17:41:15 +0100 Subject: [PATCH 5/6] Rename txn functions with `_txn` suffix --- synapse/storage/databases/main/event_push_actions.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 751270f3dafd..16fa53838535 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -516,7 +516,7 @@ async def get_unread_push_actions_for_user_in_range_for_http( ), ) - def get_push_actions( + def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, int, str, bool]]: sql = """ @@ -533,7 +533,7 @@ def get_push_actions( return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) push_actions = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_http", get_push_actions + "get_unread_push_actions_for_user_in_range_http", get_push_actions_txn ) notifs = [ @@ -590,7 +590,7 @@ async def get_unread_push_actions_for_user_in_range_for_email( ), ) - def get_push_actions( + def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, int, str, bool, int]]: sql = """ @@ -609,7 +609,7 @@ def get_push_actions( return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) push_actions = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email", get_push_actions + "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn ) # Make a list of dicts from the two sets of results. From b7a66e330afa97d686a0f38e12caaf1308f583d6 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 23 Aug 2022 17:41:35 +0100 Subject: [PATCH 6/6] Destructure output tuples into named variables --- .../databases/main/event_push_actions.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 16fa53838535..8dfa545c2771 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -538,15 +538,15 @@ def get_push_actions_txn( notifs = [ HttpPushAction( - event_id=row[0], - room_id=row[1], - stream_ordering=row[2], - actions=_deserialize_action(row[3], row[4]), + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + actions=_deserialize_action(actions, highlight), ) - for row in push_actions + for event_id, room_id, stream_ordering, actions, highlight in push_actions # Only include push actions with a stream ordering after any receipt, or without any # receipt present (invited to but never read rooms). - if row[2] > receipts_by_room.get(row[1], 0) + if stream_ordering > receipts_by_room.get(room_id, 0) ] # Now sort it so it's ordered correctly, since currently it will @@ -615,16 +615,16 @@ def get_push_actions_txn( # Make a list of dicts from the two sets of results. notifs = [ EmailPushAction( - event_id=row[0], - room_id=row[1], - stream_ordering=row[2], - actions=_deserialize_action(row[3], row[4]), - received_ts=row[5], + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + actions=_deserialize_action(actions, highlight), + received_ts=received_ts, ) - for row in push_actions + for event_id, room_id, stream_ordering, actions, highlight, received_ts in push_actions # Only include push actions with a stream ordering after any receipt, or without any # receipt present (invited to but never read rooms). - if row[2] > receipts_by_room.get(row[1], 0) + if stream_ordering > receipts_by_room.get(room_id, 0) ] # Now sort it so it's ordered correctly, since currently it will