Skip to content

Commit

Permalink
New take on retrying order asset downloads
Browse files Browse the repository at this point in the history
Wraps the entire orders.download_asset() in a retry, eliminating
the double download and concentrating the retry complexity instead
of spreading it over several modules.

Resolves #1010
  • Loading branch information
sgillies committed Aug 1, 2023
1 parent 39ff354 commit 1950393
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 33 deletions.
79 changes: 50 additions & 29 deletions planet/clients/orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
# the License.
"""Functionality for interacting with the orders api"""
import asyncio
import hashlib
import json
import logging
from pathlib import Path
import re
import time
from typing import AsyncIterator, Callable, List, Optional
import uuid
import json
import hashlib
from pathlib import Path
from typing import AsyncIterator, Callable, List, Optional

import stamina
from tqdm.asyncio import tqdm

from .. import exceptions
from ..constants import PLANET_BASE_URL
from ..http import Session
from ..http import RETRY_EXCEPTIONS, Session
from ..models import Paged

BASE_URL = f'{PLANET_BASE_URL}/compute/ops'
Expand Down Expand Up @@ -232,6 +234,7 @@ async def aggregated_order_stats(self) -> dict:
response = await self._session.request(method='GET', url=url)
return response.json()

@stamina.retry(on=tuple(RETRY_EXCEPTIONS))
async def download_asset(self,
location: str,
filename: Optional[str] = None,
Expand All @@ -257,31 +260,49 @@ async def download_asset(self,
limit is exceeded.
"""
response = await self._session.request(method='GET', url=location)
filename = filename or response.filename
length = response.length
if not filename:
raise exceptions.ClientError(
f'Could not determine filename at {location}')
async with self._session._limiter, self._session._client.stream('GET', location) as resp:
content_length = int(resp.headers.get('content-length'))

# Fall back to content-disposition for a filename.
if not filename:
try:
content_disposition = resp.headers['content-disposition']
match = re.search('filename="?([^"]+)"?',
content_disposition)
filename = match.group(1) # type: ignore
except (AttributeError, KeyError) as err:
raise exceptions.ClientError(
f'Could not determine filename at {location}') from err

dl_path = Path(directory, filename)
dl_path.parent.mkdir(exist_ok=True, parents=True)
LOGGER.info(f'Downloading {dl_path}')

dl_path = Path(directory, filename)
dl_path.parent.mkdir(exist_ok=True, parents=True)
LOGGER.info(f'Downloading {dl_path}')

try:
mode = 'wb' if overwrite else 'xb'
with open(dl_path, mode) as fp:
with tqdm(total=length,
unit_scale=True,
unit_divisor=1024 * 1024,
unit='B',
desc=str(filename),
disable=not progress_bar) as progress:
await self._session.write(location, fp, progress.update)
except FileExistsError:
LOGGER.info(f'File {dl_path} exists, not overwriting')

return dl_path
try:
mode = 'wb' if overwrite else 'xb'
with dl_path.open(mode) as fp:
with tqdm(total=content_length,
unit_scale=True,
unit_divisor=1024 * 1024,
unit='B',
desc=str(filename),
disable=not progress_bar) as progress:

previous = resp.num_bytes_downloaded

# Size from boto3.s3.transfer.TransferConfig
# io_chunksize. Planet assets are generally
# several MB or more.
async for chunk in resp.aiter_bytes(chunk_size=262144):
fp.write(chunk)
current = resp.num_bytes_downloaded
progress.update(current - previous)
previous = current

except FileExistsError:
LOGGER.info(f'File {dl_path} exists, not overwriting')

return dl_path

async def download_order(self,
order_id: str,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
'httpx>=0.23.0',
'jsonschema',
'pyjwt>=2.1',
'stamina',
'tqdm>=4.56',
'typing-extensions',
]
Expand Down
6 changes: 2 additions & 4 deletions tests/integration/test_orders_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ async def test_download_asset_md(tmpdir, session):

@respx.mock
@pytest.mark.anyio
async def test_download_asset_img(tmpdir, open_test_img, session):
async def test_download_asset_img_with_retry(tmpdir, open_test_img, session):
dl_url = TEST_DOWNLOAD_URL + '/1?token=IAmAToken'

img_headers = {
Expand All @@ -587,9 +587,7 @@ async def _stream_img():
# an error caused by respx and not this code
# https://github.com/lundberg/respx/issues/130
respx.get(dl_url).side_effect = [
httpx.Response(HTTPStatus.OK,
headers=img_headers,
request='donotcloneme'),
httpx.ReadError("no can do!"),
httpx.Response(HTTPStatus.OK,
stream=_stream_img(),
headers=img_headers,
Expand Down

0 comments on commit 1950393

Please sign in to comment.