From 2d7bc1738168e1da2c31c9a06fd2b21b481f7708 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 4 Oct 2023 16:44:35 -0400 Subject: [PATCH 1/4] Convert simple_select_list_paginate_txn to return tuples. --- synapse/api/presence.py | 16 ++++++-- synapse/federation/send_queue.py | 2 +- synapse/rest/admin/federation.py | 8 +++- synapse/storage/database.py | 4 +- synapse/storage/databases/main/presence.py | 39 ++++++++++--------- .../storage/databases/main/transactions.py | 27 +++++++------ 6 files changed, 60 insertions(+), 36 deletions(-) diff --git a/synapse/api/presence.py b/synapse/api/presence.py index b78f41999456..0a5f58c0a954 100644 --- a/synapse/api/presence.py +++ b/synapse/api/presence.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Optional +from typing import Any, Optional, Tuple import attr @@ -81,8 +81,18 @@ def as_dict(self) -> JsonDict: return attr.asdict(self) @staticmethod - def from_dict(d: JsonDict) -> "UserPresenceState": - return UserPresenceState(**d) + def from_db( + row: Tuple[str, str, int, int, int, Optional[str], bool] + ) -> "UserPresenceState": + return UserPresenceState( + user_id=row[0], + state=row[1], + last_active_ts=row[2], + last_federation_update_ts=row[3], + last_user_sync_ts=row[4], + status_msg=row[5], + currently_active=bool(row[6]), + ) def copy_and_replace(self, **kwargs: Any) -> "UserPresenceState": return attr.evolve(self, **kwargs) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 652079563557..525968bcba71 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -395,7 +395,7 @@ class PresenceDestinationsRow(BaseFederationRow): @staticmethod def from_data(data: JsonDict) -> "PresenceDestinationsRow": return PresenceDestinationsRow( - state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"] + state=UserPresenceState(**data["state"]), destinations=data["dests"] ) def to_data(self) -> JsonDict: diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py index e0ee55bd0eb6..8a617af599bb 100644 --- a/synapse/rest/admin/federation.py +++ b/synapse/rest/admin/federation.py @@ -198,7 +198,13 @@ async def on_GET( rooms, total = await self._store.get_destination_rooms_paginate( destination, start, limit, direction ) - response = {"rooms": rooms, "total": total} + response = { + "rooms": [ + {"room_id": room_id, "stream_ordering": stream_ordering} + for room_id, stream_ordering in rooms + ], + "total": total, + } if (start + limit) < total: response["next_token"] = str(start + len(rooms)) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index ca894edd5ad3..0f151e7228e1 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2418,7 +2418,7 @@ def simple_select_list_paginate_txn( keyvalues: Optional[Dict[str, Any]] = None, exclude_keyvalues: Optional[Dict[str, Any]] = None, order_direction: str = "ASC", - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[Any, ...]]: """ Executes a SELECT query on the named table with start and limit, of row numbers, which may return zero or number of rows from start to limit, @@ -2474,7 +2474,7 @@ def simple_select_list_paginate_txn( ) txn.execute(sql, arg_list + [limit, start]) - return cls.cursor_to_dict(txn) + return txn.fetchall() async def simple_search_list( self, diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 194b4e031f73..137f6a4bc4bf 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -385,28 +385,31 @@ async def get_presence_for_all_users( limit = 100 offset = 0 while True: - rows = await self.db_pool.runInteraction( - "get_presence_for_all_users", - self.db_pool.simple_select_list_paginate_txn, - "presence_stream", - orderby="stream_id", - start=offset, - limit=limit, - exclude_keyvalues=exclude_keyvalues, - retcols=( - "user_id", - "state", - "last_active_ts", - "last_federation_update_ts", - "last_user_sync_ts", - "status_msg", - "currently_active", + rows = cast( + List[Tuple[str, str, int, int, int, Optional[str], bool]], + await self.db_pool.runInteraction( + "get_presence_for_all_users", + self.db_pool.simple_select_list_paginate_txn, + "presence_stream", + orderby="stream_id", + start=offset, + limit=limit, + exclude_keyvalues=exclude_keyvalues, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + order_direction="ASC", ), - order_direction="ASC", ) for row in rows: - users_to_state[row["user_id"]] = UserPresenceState(**row) + users_to_state[row[0]] = UserPresenceState.from_db(row) # We've run out of updates to query if len(rows) < limit: diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 8f70eff80916..f35757280dda 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -526,7 +526,7 @@ async def get_destination_rooms_paginate( start: int, limit: int, direction: Direction = Direction.FORWARDS, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: """Function to retrieve a paginated list of destination's rooms. This will return a json list of rooms and the total number of rooms. @@ -537,12 +537,14 @@ async def get_destination_rooms_paginate( limit: number of rows to retrieve direction: sort ascending or descending by room_id Returns: - A tuple of a dict of rooms and a count of total rooms. + A tuple of a list of room tuples and a count of total rooms. + + Each room tuple is room_id, stream_ordering. """ def get_destination_rooms_paginate_txn( txn: LoggingTransaction, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: if direction == Direction.BACKWARDS: order = "DESC" else: @@ -556,14 +558,17 @@ def get_destination_rooms_paginate_txn( txn.execute(sql, [destination]) count = cast(Tuple[int], txn.fetchone())[0] - rooms = self.db_pool.simple_select_list_paginate_txn( - txn=txn, - table="destination_rooms", - orderby="room_id", - start=start, - limit=limit, - retcols=("room_id", "stream_ordering"), - order_direction=order, + rooms = cast( + List[Tuple[str, int]], + self.db_pool.simple_select_list_paginate_txn( + txn=txn, + table="destination_rooms", + orderby="room_id", + start=start, + limit=limit, + retcols=("room_id", "stream_ordering"), + order_direction=order, + ), ) return rooms, count From 994ea9d8db089d8c1f2e0030b277cf83c5e67abc Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 5 Oct 2023 10:06:33 -0400 Subject: [PATCH 2/4] Newsfragment --- changelog.d/16433.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16433.misc diff --git a/changelog.d/16433.misc b/changelog.d/16433.misc new file mode 100644 index 000000000000..bd7cdd42af84 --- /dev/null +++ b/changelog.d/16433.misc @@ -0,0 +1 @@ +Reduce memory allocations. From b082d1a5f70664a500646ca62f06577547261529 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 5 Oct 2023 10:56:10 -0400 Subject: [PATCH 3/4] Fix docstring. --- synapse/storage/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0f151e7228e1..7d8af5c61075 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2447,7 +2447,7 @@ def simple_select_list_paginate_txn( order_direction: Whether the results should be ordered "ASC" or "DESC". Returns: - The result as a list of dictionaries. + The result as a list of tuples. """ if order_direction not in ["ASC", "DESC"]: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") From 3124e53de65c25a73eb88ea054c009f7498702d0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 6 Oct 2023 07:44:12 -0400 Subject: [PATCH 4/4] Inline creating the UserPresenceState. --- synapse/api/presence.py | 16 +-------------- synapse/storage/databases/main/presence.py | 23 +++++++++++++++++++--- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/synapse/api/presence.py b/synapse/api/presence.py index 0a5f58c0a954..afef6712e129 100644 --- a/synapse/api/presence.py +++ b/synapse/api/presence.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Optional, Tuple +from typing import Any, Optional import attr @@ -80,20 +80,6 @@ class UserPresenceState: def as_dict(self) -> JsonDict: return attr.asdict(self) - @staticmethod - def from_db( - row: Tuple[str, str, int, int, int, Optional[str], bool] - ) -> "UserPresenceState": - return UserPresenceState( - user_id=row[0], - state=row[1], - last_active_ts=row[2], - last_federation_update_ts=row[3], - last_user_sync_ts=row[4], - status_msg=row[5], - currently_active=bool(row[6]), - ) - def copy_and_replace(self, **kwargs: Any) -> "UserPresenceState": return attr.evolve(self, **kwargs) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 64abcd06260b..519f05fb6074 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -20,6 +20,7 @@ Mapping, Optional, Tuple, + Union, cast, ) @@ -386,7 +387,7 @@ async def get_presence_for_all_users( offset = 0 while True: rows = cast( - List[Tuple[str, str, int, int, int, Optional[str], bool]], + List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]], await self.db_pool.runInteraction( "get_presence_for_all_users", self.db_pool.simple_select_list_paginate_txn, @@ -408,8 +409,24 @@ async def get_presence_for_all_users( ), ) - for row in rows: - users_to_state[row[0]] = UserPresenceState.from_db(row) + for ( + user_id, + state, + last_active_ts, + last_federation_update_ts, + last_user_sync_ts, + status_msg, + currently_active, + ) in rows: + users_to_state[user_id] = UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) # We've run out of updates to query if len(rows) < limit: