From d6d736c352f31ae02e21e344b52d68aed216a667 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 2 Jul 2020 15:07:36 -0400 Subject: [PATCH 1/8] Convert base media methods to async/await. --- synapse/rest/media/v1/_base.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index 595849f9d55e..7b52a05e6efa 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -77,8 +77,7 @@ def respond_404(request): ) -@defer.inlineCallbacks -def respond_with_file(request, media_type, file_path, file_size=None, upload_name=None): +async def respond_with_file(request, media_type, file_path, file_size=None, upload_name=None): logger.debug("Responding with %r", file_path) if os.path.isfile(file_path): @@ -89,7 +88,7 @@ def respond_with_file(request, media_type, file_path, file_size=None, upload_nam add_file_headers(request, media_type, file_size, upload_name) with open(file_path, "rb") as f: - yield make_deferred_yieldable(FileSender().beginFileTransfer(f, request)) + await make_deferred_yieldable(FileSender().beginFileTransfer(f, request)) finish_request(request) else: @@ -198,8 +197,7 @@ def _can_encode_filename_as_token(x): return True -@defer.inlineCallbacks -def respond_with_responder(request, responder, media_type, file_size, upload_name=None): +async def respond_with_responder(request, responder, media_type, file_size, upload_name=None): """Responds to the request with given responder. If responder is None then returns 404. @@ -218,7 +216,7 @@ def respond_with_responder(request, responder, media_type, file_size, upload_nam add_file_headers(request, media_type, file_size, upload_name) try: with responder: - yield responder.write_to_consumer(request) + await responder.write_to_consumer(request) except Exception as e: # The majority of the time this will be due to the client having gone # away. Unfortunately, Twisted simply throws a generic exception at us From b835cafb5379e6161d40b7f3b034d9cbf632d6c5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 2 Jul 2020 15:14:10 -0400 Subject: [PATCH 2/8] Convert the media storage layer. --- synapse/rest/media/v1/media_storage.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 79cb0dddbe1e..5682b35cff73 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -46,8 +46,7 @@ def __init__(self, hs, local_media_directory, filepaths, storage_providers): self.filepaths = filepaths self.storage_providers = storage_providers - @defer.inlineCallbacks - def store_file(self, source, file_info): + async def store_file(self, source, file_info): """Write `source` to the on disk media store, and also any other configured storage providers @@ -61,10 +60,10 @@ def store_file(self, source, file_info): with self.store_into_file(file_info) as (f, fname, finish_cb): # Write to the main repository - yield defer_to_thread( + await defer_to_thread( self.hs.get_reactor(), _write_file_synchronously, source, f ) - yield finish_cb() + await finish_cb() return fname @@ -75,7 +74,7 @@ def store_into_file(self, file_info): Actually yields a 3-tuple (file, fname, finish_cb), where file is a file like object that can be written to, fname is the absolute path of file - on disk, and finish_cb is a function that returns a Deferred. + on disk, and finish_cb is a function that returns an awaitable. fname can be used to read the contents from after upload, e.g. to generate thumbnails. @@ -91,7 +90,7 @@ def store_into_file(self, file_info): with media_storage.store_into_file(info) as (f, fname, finish_cb): # .. write into f ... - yield finish_cb() + await finish_cb() """ path = self._file_info_to_path(file_info) @@ -103,10 +102,9 @@ def store_into_file(self, file_info): finished_called = [False] - @defer.inlineCallbacks - def finish(): + async def finish(): for provider in self.storage_providers: - yield provider.store_file(path, file_info) + await provider.store_file(path, file_info) finished_called[0] = True From ab26227a05971db96db980b51890b9a06f967340 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 16 Jul 2020 12:06:08 -0400 Subject: [PATCH 3/8] Add changelog. --- changelog.d/7873.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7873.misc diff --git a/changelog.d/7873.misc b/changelog.d/7873.misc new file mode 100644 index 000000000000..58260764e7fa --- /dev/null +++ b/changelog.d/7873.misc @@ -0,0 +1 @@ +Convert more media code to async/await. From 80f478b5f6b0f35d16324ae7cff38f7952ba7ab2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 16 Jul 2020 12:07:37 -0400 Subject: [PATCH 4/8] Lint --- synapse/rest/media/v1/_base.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index 7b52a05e6efa..9a847130c0c9 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -18,7 +18,6 @@ import os import urllib -from twisted.internet import defer from twisted.protocols.basic import FileSender from synapse.api.errors import Codes, SynapseError, cs_error @@ -77,7 +76,9 @@ def respond_404(request): ) -async def respond_with_file(request, media_type, file_path, file_size=None, upload_name=None): +async def respond_with_file( + request, media_type, file_path, file_size=None, upload_name=None +): logger.debug("Responding with %r", file_path) if os.path.isfile(file_path): @@ -197,7 +198,9 @@ def _can_encode_filename_as_token(x): return True -async def respond_with_responder(request, responder, media_type, file_size, upload_name=None): +async def respond_with_responder( + request, responder, media_type, file_size, upload_name=None +): """Responds to the request with given responder. If responder is None then returns 404. From 6b437bbbfe1ed98beae3e5719920906a02578259 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 17 Jul 2020 12:28:44 -0400 Subject: [PATCH 5/8] Update a comment. --- synapse/rest/media/v1/media_storage.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 5682b35cff73..c943baacfb37 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -24,7 +24,7 @@ from synapse.logging.context import defer_to_thread, make_deferred_yieldable from synapse.util.file_consumer import BackgroundFileConsumer -from ._base import Responder +from ._base import FileInfo, Responder logger = logging.getLogger(__name__) @@ -46,16 +46,16 @@ def __init__(self, hs, local_media_directory, filepaths, storage_providers): self.filepaths = filepaths self.storage_providers = storage_providers - async def store_file(self, source, file_info): + async def store_file(self, source, file_info: FileInfo) -> str: """Write `source` to the on disk media store, and also any other configured storage providers Args: source: A file like object that should be written - file_info (FileInfo): Info about the file to store + file_info: Info about the file to store Returns: - Deferred[str]: the file path written to in the primary media store + the file path written to in the primary media store """ with self.store_into_file(file_info) as (f, fname, finish_cb): From af4f079bf5ae5bec16868da61c4858b1c68bf672 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 17 Jul 2020 12:33:19 -0400 Subject: [PATCH 6/8] Guard against improper implementations. --- synapse/rest/media/v1/media_storage.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index c943baacfb37..87fe06cf56f6 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -14,6 +14,7 @@ # limitations under the License. import contextlib +import inspect import logging import os import shutil @@ -104,7 +105,11 @@ def store_into_file(self, file_info): async def finish(): for provider in self.storage_providers: - await provider.store_file(path, file_info) + # store_file is supposed to return an Awaitable or Deferred, but + # guard against improper implementations. + result = provider.store_file(path, file_info) + if inspect.isawaitable(result) or isinstance(result, defer.Deferred): + await result finished_called[0] = True From c9adbb705fc02f582c420ac533048e1005a692e1 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 17 Jul 2020 12:54:17 -0400 Subject: [PATCH 7/8] Deferreds are Awaitable. --- synapse/rest/media/v1/media_storage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 87fe06cf56f6..a87147f3aa10 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -105,10 +105,10 @@ def store_into_file(self, file_info): async def finish(): for provider in self.storage_providers: - # store_file is supposed to return an Awaitable or Deferred, but - # guard against improper implementations. + # store_file is supposed to return an Awaitable, but guard + # against improper implementations. result = provider.store_file(path, file_info) - if inspect.isawaitable(result) or isinstance(result, defer.Deferred): + if inspect.isawaitable(result): await result finished_called[0] = True From a031797f7188a86cac4ff6abaa2dfff0e1a863b6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 17 Jul 2020 13:26:47 -0400 Subject: [PATCH 8/8] Convert the rest of the media_storage to async/await. --- synapse/rest/media/v1/media_storage.py | 33 +++++++++++++---------- tests/rest/media/v1/test_media_storage.py | 5 +++- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index a87147f3aa10..66bc1c336088 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -18,8 +18,8 @@ import logging import os import shutil +from typing import Optional -from twisted.internet import defer from twisted.protocols.basic import FileSender from synapse.logging.context import defer_to_thread, make_deferred_yieldable @@ -126,17 +126,15 @@ async def finish(): if not finished_called: raise Exception("Finished callback not called") - @defer.inlineCallbacks - def fetch_media(self, file_info): + async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]: """Attempts to fetch media described by file_info from the local cache and configured storage providers. Args: - file_info (FileInfo) + file_info Returns: - Deferred[Responder|None]: Returns a Responder if the file was found, - otherwise None. + Returns a Responder if the file was found, otherwise None. """ path = self._file_info_to_path(file_info) @@ -145,23 +143,26 @@ def fetch_media(self, file_info): return FileResponder(open(local_path, "rb")) for provider in self.storage_providers: - res = yield provider.fetch(path, file_info) + res = provider.fetch(path, file_info) + # Fetch is supposed to return an Awaitable, but guard against + # improper implementations. + if inspect.isawaitable(res): + res = await res if res: logger.debug("Streaming %s from %s", path, provider) return res return None - @defer.inlineCallbacks - def ensure_media_is_in_local_cache(self, file_info): + async def ensure_media_is_in_local_cache(self, file_info: FileInfo) -> str: """Ensures that the given file is in the local cache. Attempts to download it from storage providers if it isn't. Args: - file_info (FileInfo) + file_info Returns: - Deferred[str]: Full path to local file + Full path to local file """ path = self._file_info_to_path(file_info) local_path = os.path.join(self.local_media_directory, path) @@ -173,14 +174,18 @@ def ensure_media_is_in_local_cache(self, file_info): os.makedirs(dirname) for provider in self.storage_providers: - res = yield provider.fetch(path, file_info) + res = provider.fetch(path, file_info) + # Fetch is supposed to return an Awaitable, but guard against + # improper implementations. + if inspect.isawaitable(res): + res = await res if res: with res: consumer = BackgroundFileConsumer( open(local_path, "wb"), self.hs.get_reactor() ) - yield res.write_to_consumer(consumer) - yield consumer.wait() + await res.write_to_consumer(consumer) + await consumer.wait() return local_path raise Exception("file could not be found") diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py index 66fa5978b2fd..f4f3e5677791 100644 --- a/tests/rest/media/v1/test_media_storage.py +++ b/tests/rest/media/v1/test_media_storage.py @@ -26,6 +26,7 @@ from parameterized import parameterized_class from PIL import Image as Image +from twisted.internet import defer from twisted.internet.defer import Deferred from synapse.logging.context import make_deferred_yieldable @@ -77,7 +78,9 @@ def test_ensure_media_is_in_local_cache(self): # This uses a real blocking threadpool so we have to wait for it to be # actually done :/ - x = self.media_storage.ensure_media_is_in_local_cache(file_info) + x = defer.ensureDeferred( + self.media_storage.ensure_media_is_in_local_cache(file_info) + ) # Hotloop until the threadpool does its job... self.wait_on_thread(x)