Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugins): order and poll without downloading #1437

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eodag/plugins/apis/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def download(
logger.debug(f"Downloading {req_url}")
ssl_verify = getattr(self.config, "ssl_verify", True)

@self._download_retry(product, wait, timeout)
@self._order_download_retry(product, wait, timeout)
def download_request(
product: EOProduct,
fs_path: str,
Expand Down
22 changes: 11 additions & 11 deletions eodag/plugins/download/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,14 +589,14 @@ def download_all(

return paths

def _download_retry(
def _order_download_retry(
self, product: EOProduct, wait: int, timeout: int
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""
Download retry decorator.
Order download retry decorator.

Retries the wrapped download method after `wait` minutes if a NotAvailableError
exception is thrown until `timeout` minutes.
Retries the wrapped order_download method after ``wait`` minutes if a
``NotAvailableError`` exception is thrown until ``timeout`` minutes.

:param product: The EO product to download
:param wait: If download fails, wait time in minutes between two download tries
Expand All @@ -605,7 +605,7 @@ def _download_retry(
:returns: decorator
"""

def decorator(download: Callable[..., T]) -> Callable[..., T]:
def decorator(order_download: Callable[..., T]) -> Callable[..., T]:
def download_and_retry(*args: Any, **kwargs: Unpack[DownloadConf]) -> T:
# initiate retry loop
start_time = datetime.now()
Expand All @@ -622,7 +622,7 @@ def download_and_retry(*args: Any, **kwargs: Unpack[DownloadConf]) -> T:
if datetime_now >= product.next_try:
product.next_try += timedelta(minutes=wait)
try:
return download(*args, **kwargs)
return order_download(*args, **kwargs)

except NotAvailableError as e:
if not getattr(self.config, "order_enabled", False):
Expand All @@ -638,7 +638,7 @@ def download_and_retry(*args: Any, **kwargs: Unpack[DownloadConf]) -> T:
).seconds
retry_count += 1
retry_info = (
f"[Retry #{retry_count}] Waited {wait_seconds}s, trying again to download ordered product"
f"[Retry #{retry_count}] Waited {wait_seconds}s, checking order status again"
f" (retry every {wait}' for {timeout}')"
)
logger.info(not_available_info)
Expand All @@ -660,8 +660,8 @@ def download_and_retry(*args: Any, **kwargs: Unpack[DownloadConf]) -> T:
).microseconds / 1e6
retry_count += 1
retry_info = (
f"[Retry #{retry_count}] Waiting {wait_seconds}s until next download try"
f" for ordered product (retry every {wait}' for {timeout}')"
f"[Retry #{retry_count}] Waiting {wait_seconds}s until next order status check"
f" (retry every {wait}' for {timeout}')"
)
logger.info(not_available_info)
# Retry-After info from Response header
Expand All @@ -682,12 +682,12 @@ def download_and_retry(*args: Any, **kwargs: Unpack[DownloadConf]) -> T:
logger.info(not_available_info)
raise NotAvailableError(
f"{product.properties['title']} is not available ({product.properties['storageStatus']})"
f" and could not be downloaded, timeout reached"
f" and order was not successfull, timeout reached"
)
elif datetime_now >= stop_time:
raise NotAvailableError(not_available_info)

return download(*args, **kwargs)
return order_download(*args, **kwargs)

return download_and_retry

Expand Down
61 changes: 45 additions & 16 deletions eodag/plugins/download/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class HTTPDownload(Download):
def __init__(self, provider: str, config: PluginConfig) -> None:
super(HTTPDownload, self).__init__(provider, config)

def order_download(
def _order(
self,
product: EOProduct,
auth: Optional[AuthBase] = None,
Expand Down Expand Up @@ -273,7 +273,7 @@ def order_response_process(

return json_response

def order_download_status(
def _order_status(
self,
product: EOProduct,
auth: Optional[AuthBase] = None,
Expand Down Expand Up @@ -627,7 +627,7 @@ def download(

url = product.remote_location

@self._download_retry(product, wait, timeout)
@self._order_download_retry(product, wait, timeout)
def download_request(
product: EOProduct,
auth: AuthBase,
Expand Down Expand Up @@ -902,6 +902,44 @@ def _process_exception(
else:
logger.error("Error while getting resource :\n%s", tb.format_exc())

def _order_request(
self,
product: EOProduct,
auth: Optional[AuthBase],
) -> None:
if (
"orderLink" in product.properties
and product.properties.get("storageStatus") == OFFLINE_STATUS
and not product.properties.get("orderStatus")
):
self._order(product=product, auth=auth)

if (
product.properties.get("orderStatusLink", None)
and product.properties.get("storageStatus") != ONLINE_STATUS
):
self._order_status(product=product, auth=auth)

def order(
self,
product: EOProduct,
auth: Optional[Union[AuthBase, Dict[str, str]]] = None,
wait: int = DEFAULT_DOWNLOAD_WAIT,
timeout: int = DEFAULT_DOWNLOAD_TIMEOUT,
) -> None:
"""
Order product and poll to check its status

:param product: The EO product to download
:param auth: (optional) authenticated object
:param wait: (optional) Wait time in minutes between two order status check
:param timeout: (optional) Maximum time in minutes before stop checking
order status
"""
self._order_download_retry(product, wait, timeout)(self._order_request)(
product, auth
)

def _stream_download(
self,
product: EOProduct,
Expand All @@ -910,8 +948,9 @@ def _stream_download(
**kwargs: Unpack[DownloadConf],
) -> Iterator[Any]:
"""
fetches a zip file containing the assets of a given product as a stream
Fetches a zip file containing the assets of a given product as a stream
and returns a generator yielding the chunks of the file

:param product: product for which the assets should be downloaded
:param auth: The configuration of a plugin of type Authentication
:param progress_callback: A method or a callable object
Expand All @@ -928,18 +967,8 @@ def _stream_download(
ssl_verify = getattr(self.config, "ssl_verify", True)

ordered_message = ""
if (
"orderLink" in product.properties
and product.properties.get("storageStatus") == OFFLINE_STATUS
and not product.properties.get("orderStatus")
):
self.order_download(product=product, auth=auth)

if (
product.properties.get("orderStatusLink", None)
and product.properties.get("storageStatus") != ONLINE_STATUS
):
self.order_download_status(product=product, auth=auth)
# retry handled at download level
self._order_request(product, auth)

params = kwargs.pop("dl_url_params", None) or getattr(
self.config, "dl_url_params", {}
Expand Down
8 changes: 3 additions & 5 deletions eodag/plugins/download/s3rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ def download(
and "storageStatus" in product.properties
and product.properties["storageStatus"] != ONLINE_STATUS
):
self.http_download_plugin.order_download(product=product, auth=auth)
self.http_download_plugin._order(product=product, auth=auth)

@self._download_retry(product, wait, timeout)
@self._order_download_retry(product, wait, timeout)
def download_request(
product: EOProduct,
auth: AuthBase,
Expand All @@ -142,9 +142,7 @@ def download_request(
):
# check order status
if product.properties.get("orderStatusLink", None):
self.http_download_plugin.order_download_status(
product=product, auth=auth
)
self.http_download_plugin._order_status(product=product, auth=auth)

# get bucket urls
bucket_name, prefix = get_bucket_name_and_prefix(
Expand Down
10 changes: 4 additions & 6 deletions eodag/rest/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,11 @@ def _order_and_update(
if (
product.properties.get("storageStatus") != ONLINE_STATUS
and NOT_AVAILABLE in product.properties.get("orderStatusLink", "")
and hasattr(product.downloader, "order_download")
and hasattr(product.downloader, "_order")
):
# first order
logger.debug("Order product")
order_status_dict = product.downloader.order_download(
product=product, auth=auth
)
order_status_dict = product.downloader._order(product=product, auth=auth)
query_args.update(order_status_dict or {})

if (
Expand All @@ -344,11 +342,11 @@ def _order_and_update(
product.properties["storageStatus"] = STAGING_STATUS

if product.properties.get("storageStatus") == STAGING_STATUS and hasattr(
product.downloader, "order_download_status"
product.downloader, "_order_status"
):
# check order status if needed
logger.debug("Checking product order status")
product.downloader.order_download_status(product=product, auth=auth)
product.downloader._order_status(product=product, auth=auth)

if product.properties.get("storageStatus") != ONLINE_STATUS:
raise NotAvailableError("Product is not available yet")
Expand Down
Loading
Loading