From 0469b0b0dbbce10d650a6ed0aaa238a6eac79496 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Sep 2023 13:50:29 +0100 Subject: [PATCH 1/3] Recheck if remote device is cached before requesting it This fixes a bug where we could get stuck re-requesting the device over replication again and again. --- synapse/handlers/device.py | 13 +++++++++--- synapse/replication/http/devices.py | 4 ++-- synapse/storage/databases/main/devices.py | 26 +++++++++++++++-------- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 763f56dfc16a..6b17d9162210 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -43,6 +43,7 @@ ) from synapse.types import ( JsonDict, + JsonMapping, StrCollection, StreamKeyType, StreamToken, @@ -982,7 +983,7 @@ def __init__(self, hs: "HomeServer"): async def multi_user_device_resync( self, user_ids: List[str], mark_failed_as_stale: bool = True - ) -> Dict[str, Optional[JsonDict]]: + ) -> Dict[str, Optional[JsonMapping]]: """ Like `user_device_resync` but operates on multiple users **from the same origin** at once. @@ -1253,7 +1254,7 @@ async def _maybe_retry_device_resync(self) -> None: async def multi_user_device_resync( self, user_ids: List[str], mark_failed_as_stale: bool = True - ) -> Dict[str, Optional[JsonDict]]: + ) -> Dict[str, Optional[JsonMapping]]: """ Like `user_device_resync` but operates on multiple users **from the same origin** at once. @@ -1287,7 +1288,7 @@ async def multi_user_device_resync( async def _user_device_resync_returning_failed( self, user_id: str - ) -> Tuple[Optional[JsonDict], bool]: + ) -> Tuple[Optional[JsonMapping], bool]: """Fetches all devices for a user and updates the device cache with them. Args: @@ -1300,6 +1301,12 @@ async def _user_device_resync_returning_failed( e.g. due to a connection problem. - True iff the resync failed and the device list should be marked as stale. """ + # Check that we haven't gone and fetched the devices since we last + # checked if we needed to resync these device lists. + if await self.store.get_users_whose_devices_are_cached([user_id]): + cached = await self.store.get_cached_devices_for_user(user_id) + return cached, False + logger.debug("Attempting to resync the device list for %s", user_id) log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index 209833d28753..b8198e059c97 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -20,7 +20,7 @@ from synapse.http.server import HttpServer from synapse.logging.opentracing import active_span from synapse.replication.http._base import ReplicationEndpoint -from synapse.types import JsonDict +from synapse.types import JsonDict, JsonMapping if TYPE_CHECKING: from synapse.server import HomeServer @@ -82,7 +82,7 @@ async def _serialize_payload(user_ids: List[str]) -> JsonDict: # type: ignore[o async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict - ) -> Tuple[int, Dict[str, Optional[JsonDict]]]: + ) -> Tuple[int, Dict[str, Optional[JsonMapping]]]: user_ids: List[str] = content["user_ids"] logger.info("Resync for %r", user_ids) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index fa69a4a29852..2d0ae3b11a36 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -760,18 +760,10 @@ async def get_user_devices_from_cache( mapping of user_id -> device_id -> device_info. """ unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids} - user_map = await self.get_device_list_last_stream_id_for_remotes( - list(unique_user_ids) - ) - # We go and check if any of the users need to have their device lists - # resynced. If they do then we remove them from the cached list. - users_needing_resync = await self.get_user_ids_requiring_device_list_resync( + user_ids_in_cache = await self.get_users_whose_devices_are_cached( unique_user_ids ) - user_ids_in_cache = { - user_id for user_id, stream_id in user_map.items() if stream_id - } - users_needing_resync user_ids_not_in_cache = unique_user_ids - user_ids_in_cache # First fetch all the users which all devices are to be returned. @@ -793,6 +785,22 @@ async def get_user_devices_from_cache( return user_ids_not_in_cache, results + async def get_users_whose_devices_are_cached( + self, user_ids: StrCollection + ) -> Set[str]: + """Checks which of the given users we have cached the devices for.""" + user_map = await self.get_device_list_last_stream_id_for_remotes(user_ids) + + # We go and check if any of the users need to have their device lists + # resynced. If they do then we remove them from the cached list. + users_needing_resync = await self.get_user_ids_requiring_device_list_resync( + user_ids + ) + user_ids_in_cache = { + user_id for user_id, stream_id in user_map.items() if stream_id + } - users_needing_resync + return user_ids_in_cache + @cached(num_args=2, tree=True) async def _get_cached_user_device(self, user_id: str, device_id: str) -> JsonDict: content = await self.db_pool.simple_select_one_onecol( From 94ca43fcc37a23d55263784e21384bc262c28861 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Sep 2023 13:53:06 +0100 Subject: [PATCH 2/3] Newsfile --- changelog.d/16252.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16252.bugfix diff --git a/changelog.d/16252.bugfix b/changelog.d/16252.bugfix new file mode 100644 index 000000000000..881bc00e6153 --- /dev/null +++ b/changelog.d/16252.bugfix @@ -0,0 +1 @@ +Fix bug when using workers where Synapse could end up re-requesting the same remote device repeatedly. From e05ccb51b369df7db640bea6403d9f2dd0d85a78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Sep 2023 13:55:31 +0100 Subject: [PATCH 3/3] Linearize requests to fetch remote devices --- synapse/handlers/device.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 6b17d9162210..3dd5709aae4e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -1012,6 +1012,7 @@ def __init__(self, hs: "HomeServer", device_handler: DeviceHandler): self._notifier = hs.get_notifier() self._remote_edu_linearizer = Linearizer(name="remote_device_list") + self._resync_linearizer = Linearizer(name="remote_device_resync") # user_id -> list of updates waiting to be handled. self._pending_updates: Dict[ @@ -1274,9 +1275,11 @@ async def multi_user_device_resync( failed = set() # TODO(Perf): Actually batch these up for user_id in user_ids: - user_result, user_failed = await self._user_device_resync_returning_failed( - user_id - ) + async with self._resync_linearizer.queue(user_id): + ( + user_result, + user_failed, + ) = await self._user_device_resync_returning_failed(user_id) result[user_id] = user_result if user_failed: failed.add(user_id)