Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
media/{download,thumbnail}: support timeout_ms parameter
Browse files Browse the repository at this point in the history
Signed-off-by: Sumner Evans <sumner@beeper.com>
  • Loading branch information
sumnerevans committed Jun 9, 2023
1 parent 28fefba commit a126708
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 43 deletions.
1 change: 1 addition & 0 deletions synapse/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Codes(str, Enum):
WEAK_PASSWORD = "M_WEAK_PASSWORD"
INVALID_SIGNATURE = "M_INVALID_SIGNATURE"
USER_DEACTIVATED = "M_USER_DEACTIVATED"
NOT_YET_UPLOADED = "M_NOT_YET_UPLOADED"

# Part of MSC3848
# https://github.com/matrix-org/matrix-spec-proposals/pull/3848
Expand Down
6 changes: 6 additions & 0 deletions synapse/media/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
"text/xml",
]

# Default timeout_ms for download and thumbnail requests
DEFAULT_MAX_TIMEOUT_MS = 20_000

# Maximum allowed timeout_ms for download and thumbnail requests
MAXIMUM_ALLOWED_MAX_TIMEOUT_MS = 60_000


def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]:
"""Parses the server name, media ID and optional file name from the request URI
Expand Down
98 changes: 83 additions & 15 deletions synapse/media/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import os
import shutil
from io import BytesIO
from typing import IO, TYPE_CHECKING, Dict, List, Optional, Set, Tuple
from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple

from matrix_common.types.mxc_uri import MXCUri

Expand All @@ -32,8 +32,10 @@
NotFoundError,
RequestSendFailed,
SynapseError,
cs_error,
)
from synapse.config.repository import ThumbnailRequirement
from synapse.http.server import respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread
from synapse.media._base import (
Expand Down Expand Up @@ -301,8 +303,62 @@ async def create_content(

return MXCUri(self.server_name, media_id)

def respond_not_yet_uploaded(self, request: SynapseRequest) -> None:
respond_with_json(
request,
404,
cs_error("Media has not been uploaded yet", code=Codes.NOT_YET_UPLOADED),
send_cors=True,
)

async def get_local_media_info(
self, request: SynapseRequest, media_id: str, max_timeout_ms: int
) -> Optional[Dict[str, Any]]:
"""Gets the info dictionary for given local media ID. If the media has
not been uploaded yet, this function will wait up to ``max_timeout_ms``
milliseconds for the media to be uploaded.
Args:
request: The incoming request.
media_id: The media ID of the content. (This is the same as
the file_id for local content.)
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
Either the info dictionary for the given local media ID or
``None``. If ``None``, then no further processing is necessary as
this function will send the necessary JSON response.
"""
wait_until = self.clock.time_msec() + max_timeout_ms
while True:
# Get the info for the media
media_info = await self.store.get_local_media(media_id)
if not media_info:
respond_404(request)
return None

if media_info["quarantined_by"]:
logger.info("Media is quarantined")
respond_404(request)
return None

# The file has been uploaded, so stop looping
if media_info.get("media_length") is not None:
return media_info

if self.clock.time_msec() >= wait_until:
break

await self.clock.sleep(0.5)

self.respond_not_yet_uploaded(request)
return None

async def get_local_media(
self, request: SynapseRequest, media_id: str, name: Optional[str]
self,
request: SynapseRequest,
media_id: str,
name: Optional[str],
max_timeout_ms: int,
) -> None:
"""Responds to requests for local media, if exists, or returns 404.
Expand All @@ -312,13 +368,14 @@ async def get_local_media(
the file_id for local content.)
name: Optional name that, if specified, will be used as
the filename in the Content-Disposition header of the response.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
Resolves once a response has successfully been written to request
"""
media_info = await self.store.get_local_media(media_id)
if not media_info or media_info["quarantined_by"]:
respond_404(request)
media_info = await self.get_local_media_info(request, media_id, max_timeout_ms)
if not media_info:
return

self.mark_recently_accessed(None, media_id)
Expand All @@ -343,6 +400,7 @@ async def get_remote_media(
server_name: str,
media_id: str,
name: Optional[str],
max_timeout_ms: int,
) -> None:
"""Respond to requests for remote media.
Expand All @@ -352,6 +410,8 @@ async def get_remote_media(
media_id: The media ID of the content (as defined by the remote server).
name: Optional name that, if specified, will be used as
the filename in the Content-Disposition header of the response.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
Resolves once a response has successfully been written to request
Expand All @@ -377,27 +437,31 @@ async def get_remote_media(
key = (server_name, media_id)
async with self.remote_media_linearizer.queue(key):
responder, media_info = await self._get_remote_media_impl(
server_name, media_id
server_name, media_id, max_timeout_ms
)

# We deliberately stream the file outside the lock
if responder:
if responder and media_info:
media_type = media_info["media_type"]
media_length = media_info["media_length"]
upload_name = name if name else media_info["upload_name"]
await respond_with_responder(
request, responder, media_type, media_length, upload_name
)
else:
respond_404(request)
self.respond_not_yet_uploaded(request)

async def get_remote_media_info(self, server_name: str, media_id: str) -> dict:
async def get_remote_media_info(
self, server_name: str, media_id: str, max_timeout_ms: int
) -> dict:
"""Gets the media info associated with the remote file, downloading
if necessary.
Args:
server_name: Remote server_name where the media originated.
media_id: The media ID of the content (as defined by the remote server).
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
The media info of the file
Expand All @@ -413,7 +477,7 @@ async def get_remote_media_info(self, server_name: str, media_id: str) -> dict:
key = (server_name, media_id)
async with self.remote_media_linearizer.queue(key):
responder, media_info = await self._get_remote_media_impl(
server_name, media_id
server_name, media_id, max_timeout_ms
)

# Ensure we actually use the responder so that it releases resources
Expand All @@ -424,7 +488,7 @@ async def get_remote_media_info(self, server_name: str, media_id: str) -> dict:
return media_info

async def _get_remote_media_impl(
self, server_name: str, media_id: str
self, server_name: str, media_id: str, max_timeout_ms: int
) -> Tuple[Optional[Responder], dict]:
"""Looks for media in local cache, if not there then attempt to
download from remote server.
Expand All @@ -433,6 +497,8 @@ async def _get_remote_media_impl(
server_name: Remote server_name where the media originated.
media_id: The media ID of the content (as defined by the
remote server).
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
A tuple of responder and the media info of the file.
Expand Down Expand Up @@ -463,8 +529,7 @@ async def _get_remote_media_impl(

try:
media_info = await self._download_remote_file(
server_name,
media_id,
server_name, media_id, max_timeout_ms
)
except SynapseError:
raise
Expand Down Expand Up @@ -497,6 +562,7 @@ async def _download_remote_file(
self,
server_name: str,
media_id: str,
max_timeout_ms: int,
) -> dict:
"""Attempt to download the remote file from the given server name,
using the given file_id as the local id.
Expand All @@ -506,7 +572,8 @@ async def _download_remote_file(
media_id: The media ID of the content (as defined by the
remote server). This is different than the file_id, which is
locally generated.
file_id: Local file ID
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
The media info of the file.
Expand All @@ -530,7 +597,8 @@ async def _download_remote_file(
# tell the remote server to 404 if it doesn't
# recognise the server_name, to make sure we don't
# end up with a routing loop.
"allow_remote": "false"
"allow_remote": "false",
"timeout_ms": str(max_timeout_ms),
},
)
except RequestSendFailed as e:
Expand Down
27 changes: 19 additions & 8 deletions synapse/rest/media/download_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
set_corp_headers,
set_cors_headers,
)
from synapse.http.servlet import parse_boolean
from synapse.http.servlet import parse_boolean, parse_integer
from synapse.http.site import SynapseRequest
from synapse.media._base import parse_media_id, respond_404
from synapse.media._base import (
DEFAULT_MAX_TIMEOUT_MS,
MAXIMUM_ALLOWED_MAX_TIMEOUT_MS,
parse_media_id,
respond_404,
)

if TYPE_CHECKING:
from synapse.media.media_repository import MediaRepository
Expand Down Expand Up @@ -54,13 +59,17 @@ async def _async_render_GET(self, request: SynapseRequest) -> None:
)
# Limited non-standard form of CSP for IE11
request.setHeader(b"X-Content-Security-Policy", b"sandbox;")
request.setHeader(
b"Referrer-Policy",
b"no-referrer",
)
request.setHeader(b"Referrer-Policy", b"no-referrer")
server_name, media_id, name = parse_media_id(request)
max_timeout_ms = parse_integer(
request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
)
max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS)

if self._is_mine_server_name(server_name):
await self.media_repo.get_local_media(request, media_id, name)
await self.media_repo.get_local_media(
request, media_id, name, max_timeout_ms
)
else:
allow_remote = parse_boolean(request, "allow_remote", default=True)
if not allow_remote:
Expand All @@ -72,4 +81,6 @@ async def _async_render_GET(self, request: SynapseRequest) -> None:
respond_404(request)
return

await self.media_repo.get_remote_media(request, server_name, media_id, name)
await self.media_repo.get_remote_media(
request, server_name, media_id, name, max_timeout_ms
)
Loading

0 comments on commit a126708

Please sign in to comment.