From 205aa5c40226b51541bf2e511fa5ecde39947440 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 29 Jul 2020 12:40:30 -0400 Subject: [PATCH 01/10] Convert crypto keyring to async. --- synapse/crypto/keyring.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 443cde0b6d09..154940d8af61 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -223,8 +223,7 @@ def process(verify_request): return results - @defer.inlineCallbacks - def _start_key_lookups(self, verify_requests): + async def _start_key_lookups(self, verify_requests): """Sets off the key fetches for each verify request Once each fetch completes, verify_request.key_ready will be resolved. @@ -245,7 +244,7 @@ def _start_key_lookups(self, verify_requests): server_to_request_ids.setdefault(server_name, set()).add(request_id) # Wait for any previous lookups to complete before proceeding. - yield self.wait_for_previous_lookups(server_to_request_ids.keys()) + await self.wait_for_previous_lookups(server_to_request_ids.keys()) # take out a lock on each of the servers by sticking a Deferred in # key_downloads From 50c31cad1d772b2a7ffde94078f99fd9816ffde2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 29 Jul 2020 12:41:31 -0400 Subject: [PATCH 02/10] Convert wait_for_previous_lookups --- synapse/crypto/keyring.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 154940d8af61..80c21462eeda 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -282,15 +282,14 @@ def lookup_done(res, verify_request): except Exception: logger.exception("Error starting key lookups") - @defer.inlineCallbacks - def wait_for_previous_lookups(self, server_names): + async def wait_for_previous_lookups(self, server_names) -> None: """Waits for any previous key lookups for the given servers to finish. Args: server_names (Iterable[str]): list of servers which we want to look up Returns: - Deferred[None]: resolves once all key lookups for the given servers have + Resolves once all key lookups for the given servers have completed. Follows the synapse rules of logcontext preservation. """ loop_count = 1 @@ -308,7 +307,7 @@ def wait_for_previous_lookups(self, server_names): loop_count, ) with PreserveLoggingContext(): - yield defer.DeferredList((w[1] for w in wait_on)) + await defer.DeferredList((w[1] for w in wait_on)) loop_count += 1 From f4c6b1d43298219445b337c6409ef2c2d7fce821 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 29 Jul 2020 12:43:07 -0400 Subject: [PATCH 03/10] Convert get_keys --- synapse/crypto/keyring.py | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 80c21462eeda..991f3eadf1aa 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -388,7 +388,7 @@ def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests): verify_request.minimum_valid_until_ts, ) - results = yield fetcher.get_keys(missing_keys) + results = yield defer.ensureDeferred(fetcher.get_keys(missing_keys)) completed = [] for verify_request in remaining_requests: @@ -421,7 +421,7 @@ def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests): class KeyFetcher(object): - def get_keys(self, keys_to_fetch): + async def get_keys(self, keys_to_fetch): """ Args: keys_to_fetch (dict[str, dict[str, int]]): @@ -440,8 +440,7 @@ class StoreKeyFetcher(KeyFetcher): def __init__(self, hs): self.store = hs.get_datastore() - @defer.inlineCallbacks - def get_keys(self, keys_to_fetch): + async def get_keys(self, keys_to_fetch): """see KeyFetcher.get_keys""" keys_to_fetch = ( @@ -450,7 +449,7 @@ def get_keys(self, keys_to_fetch): for key_id in keys_for_server.keys() ) - res = yield self.store.get_server_verify_keys(keys_to_fetch) + res = await self.store.get_server_verify_keys(keys_to_fetch) keys = {} for (server_name, key_id), key in res.items(): keys.setdefault(server_name, {})[key_id] = key @@ -565,14 +564,12 @@ def __init__(self, hs): self.client = hs.get_http_client() self.key_servers = self.config.key_servers - @defer.inlineCallbacks - def get_keys(self, keys_to_fetch): + async def get_keys(self, keys_to_fetch): """see KeyFetcher.get_keys""" - @defer.inlineCallbacks - def get_key(key_server): + async def get_key(key_server): try: - result = yield self.get_server_verify_key_v2_indirect( + result = await self.get_server_verify_key_v2_indirect( keys_to_fetch, key_server ) return result @@ -590,7 +587,7 @@ def get_key(key_server): return {} - results = yield make_deferred_yieldable( + results = await make_deferred_yieldable( defer.gatherResults( [run_in_background(get_key, server) for server in self.key_servers], consumeErrors=True, @@ -739,24 +736,23 @@ def __init__(self, hs): self.clock = hs.get_clock() self.client = hs.get_http_client() - def get_keys(self, keys_to_fetch): + async def get_keys(self, keys_to_fetch): """ Args: keys_to_fetch (dict[str, iterable[str]]): the keys to be fetched. server_name -> key_ids Returns: - Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]: + dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]: map from server_name -> key_id -> FetchKeyResult """ results = {} - @defer.inlineCallbacks - def get_key(key_to_fetch_item): + async def get_key(key_to_fetch_item): server_name, key_ids = key_to_fetch_item try: - keys = yield self.get_server_verify_key_v2_direct(server_name, key_ids) + keys = await self.get_server_verify_key_v2_direct(server_name, key_ids) results[server_name] = keys except KeyLookupError as e: logger.warning( @@ -765,9 +761,9 @@ def get_key(key_to_fetch_item): except Exception: logger.exception("Error getting keys %s from %s", key_ids, server_name) - return yieldable_gather_results(get_key, keys_to_fetch.items()).addCallback( - lambda _: results - ) + return await yieldable_gather_results( + get_key, keys_to_fetch.items() + ).addCallback(lambda _: results) @defer.inlineCallbacks def get_server_verify_key_v2_direct(self, server_name, key_ids): From 49e5971849b5d023c8650fa6420218d94b0a1a35 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 29 Jul 2020 12:46:19 -0400 Subject: [PATCH 04/10] Convert get_server_* --- synapse/crypto/keyring.py | 81 ++++++++++++++++-------------------- tests/crypto/test_keyring.py | 39 +++++++---------- 2 files changed, 53 insertions(+), 67 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 991f3eadf1aa..1a5b614cf622 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -461,8 +461,7 @@ def __init__(self, hs): self.store = hs.get_datastore() self.config = hs.get_config() - @defer.inlineCallbacks - def process_v2_response(self, from_server, response_json, time_added_ms): + async def process_v2_response(self, from_server, response_json, time_added_ms): """Parse a 'Server Keys' structure from the result of a /key request This is used to parse either the entirety of the response from @@ -534,7 +533,7 @@ def process_v2_response(self, from_server, response_json, time_added_ms): key_json_bytes = encode_canonical_json(response_json) - yield make_deferred_yieldable( + await make_deferred_yieldable( defer.gatherResults( [ run_in_background( @@ -601,8 +600,7 @@ async def get_key(key_server): return union_of_keys - @defer.inlineCallbacks - def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server): + async def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server): """ Args: keys_to_fetch (dict[str, dict[str, int]]): @@ -612,7 +610,7 @@ def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server): the keys Returns: - Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]]: map + dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]: map from server_name -> key_id -> FetchKeyResult Raises: @@ -627,20 +625,18 @@ def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server): ) try: - query_response = yield defer.ensureDeferred( - self.client.post_json( - destination=perspective_name, - path="/_matrix/key/v2/query", - data={ - "server_keys": { - server_name: { - key_id: {"minimum_valid_until_ts": min_valid_ts} - for key_id, min_valid_ts in server_keys.items() - } - for server_name, server_keys in keys_to_fetch.items() + query_response = await self.client.post_json( + destination=perspective_name, + path="/_matrix/key/v2/query", + data={ + "server_keys": { + server_name: { + key_id: {"minimum_valid_until_ts": min_valid_ts} + for key_id, min_valid_ts in server_keys.items() } - }, - ) + for server_name, server_keys in keys_to_fetch.items() + } + }, ) except (NotRetryingDestination, RequestSendFailed) as e: # these both have str() representations which we can't really improve upon @@ -665,7 +661,7 @@ def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server): try: self._validate_perspectives_response(key_server, response) - processed_response = yield self.process_v2_response( + processed_response = await self.process_v2_response( perspective_name, response, time_added_ms=time_now_ms ) except KeyLookupError as e: @@ -684,7 +680,7 @@ def get_server_verify_key_v2_indirect(self, keys_to_fetch, key_server): ) keys.setdefault(server_name, {}).update(processed_response) - yield self.store.store_server_verify_keys( + await self.store.store_server_verify_keys( perspective_name, time_now_ms, added_keys ) @@ -765,8 +761,7 @@ async def get_key(key_to_fetch_item): get_key, keys_to_fetch.items() ).addCallback(lambda _: results) - @defer.inlineCallbacks - def get_server_verify_key_v2_direct(self, server_name, key_ids): + async def get_server_verify_key_v2_direct(self, server_name, key_ids): """ Args: @@ -788,25 +783,23 @@ def get_server_verify_key_v2_direct(self, server_name, key_ids): time_now_ms = self.clock.time_msec() try: - response = yield defer.ensureDeferred( - self.client.get_json( - destination=server_name, - path="/_matrix/key/v2/server/" - + urllib.parse.quote(requested_key_id), - ignore_backoff=True, - # we only give the remote server 10s to respond. It should be an - # easy request to handle, so if it doesn't reply within 10s, it's - # probably not going to. - # - # Furthermore, when we are acting as a notary server, we cannot - # wait all day for all of the origin servers, as the requesting - # server will otherwise time out before we can respond. - # - # (Note that get_json may make 4 attempts, so this can still take - # almost 45 seconds to fetch the headers, plus up to another 60s to - # read the response). - timeout=10000, - ) + response = await self.client.get_json( + destination=server_name, + path="/_matrix/key/v2/server/" + + urllib.parse.quote(requested_key_id), + ignore_backoff=True, + # we only give the remote server 10s to respond. It should be an + # easy request to handle, so if it doesn't reply within 10s, it's + # probably not going to. + # + # Furthermore, when we are acting as a notary server, we cannot + # wait all day for all of the origin servers, as the requesting + # server will otherwise time out before we can respond. + # + # (Note that get_json may make 4 attempts, so this can still take + # almost 45 seconds to fetch the headers, plus up to another 60s to + # read the response). + timeout=10000, ) except (NotRetryingDestination, RequestSendFailed) as e: # these both have str() representations which we can't really improve @@ -821,12 +814,12 @@ def get_server_verify_key_v2_direct(self, server_name, key_ids): % (server_name, response["server_name"]) ) - response_keys = yield self.process_v2_response( + response_keys = await self.process_v2_response( from_server=server_name, response_json=response, time_added_ms=time_now_ms, ) - yield self.store.store_server_verify_keys( + await self.store.store_server_verify_keys( server_name, time_now_ms, ((server_name, key_id, key) for key_id, key in response_keys.items()), diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index e0ad8e8a773b..0d4b05304b2c 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -40,6 +40,7 @@ from synapse.storage.keys import FetchKeyResult from tests import unittest +from tests.test_utils import make_awaitable class MockPerspectiveServer(object): @@ -201,7 +202,7 @@ def test_verify_json_for_server_with_null_valid_until_ms(self): with a null `ts_valid_until_ms` """ mock_fetcher = keyring.KeyFetcher() - mock_fetcher.get_keys = Mock(return_value=defer.succeed({})) + mock_fetcher.get_keys = Mock(return_value=make_awaitable({})) kr = keyring.Keyring( self.hs, key_fetchers=(StoreKeyFetcher(self.hs), mock_fetcher) @@ -244,17 +245,15 @@ def test_verify_json_dedupes_key_requests(self): """Two requests for the same key should be deduped.""" key1 = signedjson.key.generate_signing_key(1) - def get_keys(keys_to_fetch): + async def get_keys(keys_to_fetch): # there should only be one request object (with the max validity) self.assertEqual(keys_to_fetch, {"server1": {get_key_id(key1): 1500}}) - return defer.succeed( - { - "server1": { - get_key_id(key1): FetchKeyResult(get_verify_key(key1), 1200) - } + return { + "server1": { + get_key_id(key1): FetchKeyResult(get_verify_key(key1), 1200) } - ) + } mock_fetcher = keyring.KeyFetcher() mock_fetcher.get_keys = Mock(side_effect=get_keys) @@ -281,25 +280,19 @@ def test_verify_json_falls_back_to_other_fetchers(self): """If the first fetcher cannot provide a recent enough key, we fall back""" key1 = signedjson.key.generate_signing_key(1) - def get_keys1(keys_to_fetch): + async def get_keys1(keys_to_fetch): self.assertEqual(keys_to_fetch, {"server1": {get_key_id(key1): 1500}}) - return defer.succeed( - { - "server1": { - get_key_id(key1): FetchKeyResult(get_verify_key(key1), 800) - } - } - ) + return { + "server1": {get_key_id(key1): FetchKeyResult(get_verify_key(key1), 800)} + } - def get_keys2(keys_to_fetch): + async def get_keys2(keys_to_fetch): self.assertEqual(keys_to_fetch, {"server1": {get_key_id(key1): 1500}}) - return defer.succeed( - { - "server1": { - get_key_id(key1): FetchKeyResult(get_verify_key(key1), 1200) - } + return { + "server1": { + get_key_id(key1): FetchKeyResult(get_verify_key(key1), 1200) } - ) + } mock_fetcher1 = keyring.KeyFetcher() mock_fetcher1.get_keys = Mock(side_effect=get_keys1) From a94773a1b4f11287fa6b84a1701e3174dbb6608b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 30 Jul 2020 15:09:43 -0400 Subject: [PATCH 05/10] Convert the background function. --- synapse/crypto/keyring.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 1a5b614cf622..0bef7dc76be7 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -829,22 +829,18 @@ async def get_server_verify_key_v2_direct(self, server_name, key_ids): return keys -@defer.inlineCallbacks -def _handle_key_deferred(verify_request): +async def _handle_key_deferred(verify_request) -> None: """Waits for the key to become available, and then performs a verification Args: verify_request (VerifyJsonRequest): - Returns: - Deferred[None] - Raises: SynapseError if there was a problem performing the verification """ server_name = verify_request.server_name with PreserveLoggingContext(): - _, key_id, verify_key = yield verify_request.key_ready + _, key_id, verify_key = await verify_request.key_ready json_object = verify_request.json_object From 4dfa676ac29b713b2300392f91522f36c7c1049b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 30 Jul 2020 15:13:36 -0400 Subject: [PATCH 06/10] Convert _attempt_key_fetches_with_fetcher --- synapse/crypto/keyring.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 0bef7dc76be7..dc1b72c2bacd 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -330,7 +330,9 @@ def do_iterations(): for f in self._key_fetchers: if not remaining_requests: return - yield self._attempt_key_fetches_with_fetcher(f, remaining_requests) + yield defer.ensureDeferred( + self._attempt_key_fetches_with_fetcher(f, remaining_requests) + ) # look for any requests which weren't satisfied with PreserveLoggingContext(): @@ -360,8 +362,7 @@ def on_err(err): run_in_background(do_iterations).addErrback(on_err) - @defer.inlineCallbacks - def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests): + async def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests): """Use a key fetcher to attempt to satisfy some key requests Args: @@ -388,7 +389,7 @@ def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests): verify_request.minimum_valid_until_ts, ) - results = yield defer.ensureDeferred(fetcher.get_keys(missing_keys)) + results = await fetcher.get_keys(missing_keys) completed = [] for verify_request in remaining_requests: From 8ebefdd6505e28d1c2d276527720ccd2ae738786 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 30 Jul 2020 15:14:57 -0400 Subject: [PATCH 07/10] Convert do_iterations. --- synapse/crypto/keyring.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index dc1b72c2bacd..f18a154d368e 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -324,15 +324,12 @@ def _get_server_verify_keys(self, verify_requests): remaining_requests = {rq for rq in verify_requests if not rq.key_ready.called} - @defer.inlineCallbacks - def do_iterations(): + async def do_iterations(): with Measure(self.clock, "get_server_verify_keys"): for f in self._key_fetchers: if not remaining_requests: return - yield defer.ensureDeferred( - self._attempt_key_fetches_with_fetcher(f, remaining_requests) - ) + await self._attempt_key_fetches_with_fetcher(f, remaining_requests) # look for any requests which weren't satisfied with PreserveLoggingContext(): From 2e40256a20a0ce1913aaa5596d04fe4aad2e27e0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 30 Jul 2020 15:16:28 -0400 Subject: [PATCH 08/10] Convert error handling. --- synapse/crypto/keyring.py | 60 +++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index f18a154d368e..1f5f84053b39 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -325,39 +325,39 @@ def _get_server_verify_keys(self, verify_requests): remaining_requests = {rq for rq in verify_requests if not rq.key_ready.called} async def do_iterations(): - with Measure(self.clock, "get_server_verify_keys"): - for f in self._key_fetchers: - if not remaining_requests: - return - await self._attempt_key_fetches_with_fetcher(f, remaining_requests) - - # look for any requests which weren't satisfied + try: + with Measure(self.clock, "get_server_verify_keys"): + for f in self._key_fetchers: + if not remaining_requests: + return + await self._attempt_key_fetches_with_fetcher(f, remaining_requests) + + # look for any requests which weren't satisfied + with PreserveLoggingContext(): + for verify_request in remaining_requests: + verify_request.key_ready.errback( + SynapseError( + 401, + "No key for %s with ids in %s (min_validity %i)" + % ( + verify_request.server_name, + verify_request.key_ids, + verify_request.minimum_valid_until_ts, + ), + Codes.UNAUTHORIZED, + ) + ) + except Exception as err: + # we don't really expect to get here, because any errors should already + # have been caught and logged. But if we do, let's log the error and make + # sure that all of the deferreds are resolved. + logger.error("Unexpected error in _get_server_verify_keys: %s", err) with PreserveLoggingContext(): for verify_request in remaining_requests: - verify_request.key_ready.errback( - SynapseError( - 401, - "No key for %s with ids in %s (min_validity %i)" - % ( - verify_request.server_name, - verify_request.key_ids, - verify_request.minimum_valid_until_ts, - ), - Codes.UNAUTHORIZED, - ) - ) - - def on_err(err): - # we don't really expect to get here, because any errors should already - # have been caught and logged. But if we do, let's log the error and make - # sure that all of the deferreds are resolved. - logger.error("Unexpected error in _get_server_verify_keys: %s", err) - with PreserveLoggingContext(): - for verify_request in remaining_requests: - if not verify_request.key_ready.called: - verify_request.key_ready.errback(err) + if not verify_request.key_ready.called: + verify_request.key_ready.errback(err) - run_in_background(do_iterations).addErrback(on_err) + run_in_background(do_iterations) async def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests): """Use a key fetcher to attempt to satisfy some key requests From 29735a796650b0abb6fa493456d1264df20a4fc5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 30 Jul 2020 15:50:44 -0400 Subject: [PATCH 09/10] Add changelog. --- changelog.d/8003.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8003.misc diff --git a/changelog.d/8003.misc b/changelog.d/8003.misc new file mode 100644 index 000000000000..dfe4c03171d6 --- /dev/null +++ b/changelog.d/8003.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. From 8fac512078a7dfa8630ad14f52074bed3331092f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 30 Jul 2020 15:54:24 -0400 Subject: [PATCH 10/10] Lint --- synapse/crypto/keyring.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 1f5f84053b39..28ef7cfdb9d1 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -330,7 +330,9 @@ async def do_iterations(): for f in self._key_fetchers: if not remaining_requests: return - await self._attempt_key_fetches_with_fetcher(f, remaining_requests) + await self._attempt_key_fetches_with_fetcher( + f, remaining_requests + ) # look for any requests which weren't satisfied with PreserveLoggingContext():