From 4ce9b9dfeab1408d334ed969d76609f2523fe830 Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Tue, 7 Nov 2023 17:01:09 -0500 Subject: [PATCH] add catch for 429 error from CDSE also download in paralel may work to help https://github.com/dbekaert/RAiDER/issues/610 --- eof/cli.py | 8 ++++++++ eof/dataspace_client.py | 44 +++++++++++++++++++++++++++++++++-------- eof/download.py | 15 +++++++++++--- 3 files changed, 56 insertions(+), 11 deletions(-) diff --git a/eof/cli.py b/eof/cli.py index 2479250..d7d771c 100644 --- a/eof/cli.py +++ b/eof/cli.py @@ -92,6 +92,12 @@ is_flag=True, help="save credentials provided interactively in the ~/.netrc file if necessary", ) +@click.option( + "--max-workers", + type=int, + default=3, + help="Number of parallel downloads to run. Note that CDSE has a limit of 4", +) def cli( search_path: str, save_dir: str, @@ -107,6 +113,7 @@ def cli( cdse_password: str = "", ask_password: bool = False, update_netrc: bool = False, + max_workers: int = 3, ): """Download Sentinel precise orbit files. @@ -136,4 +143,5 @@ def cli( asf_password=asf_password, cdse_user=cdse_user, cdse_password=cdse_password, + max_workers=max_workers, ) diff --git a/eof/dataspace_client.py b/eof/dataspace_client.py index a963d62..d9b16fe 100644 --- a/eof/dataspace_client.py +++ b/eof/dataspace_client.py @@ -1,6 +1,7 @@ """Client to get orbit files from dataspace.copernicus.eu .""" from __future__ import annotations +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from pathlib import Path from typing import Optional @@ -164,13 +165,19 @@ def query_orbit_by_dt( logger.warning("The following dates were not found: %s", remaining_dates) return all_results - def download_all(self, query_results: list[dict], output_directory: Filename): + def download_all( + self, + query_results: list[dict], + output_directory: Filename, + max_workers: int = 3, + ): """Download all the specified orbit products.""" return download_all( query_results, output_directory=output_directory, username=self._username, password=self._password, + max_workers=max_workers, ) @@ -362,6 +369,7 @@ def download_orbit_file( if chunk: outfile.write(chunk) + logger.info(f"Orbit file downloaded to {output_orbit_file_path}") return output_orbit_file_path @@ -370,6 +378,7 @@ def download_all( output_directory: Filename, username: str = "", password: str = "", + max_workers: int = 3, ) -> list[Path]: """Download all the specified orbit products. @@ -383,6 +392,9 @@ def download_all( CDSE username password : str CDSE password + max_workers : int, default = 3 + Maximum parallel downloads from CDSE. + Note that >4 connections will result in a HTTP 429 Error """ downloaded_paths: list[Path] = [] @@ -391,22 +403,38 @@ def download_all( # query_results, start_time, stop_time # ) # Obtain an access token the download request from the provided credentials + access_token = get_access_token(username, password) + output_names = [] + download_urls = [] for query_result in query_results: - orbit_file_name = query_result["Name"] orbit_file_request_id = query_result["Id"] # Construct the URL used to download the Orbit file download_url = f"{DOWNLOAD_URL}({orbit_file_request_id})/$value" + download_urls.append(download_url) - logger.info( + orbit_file_name = query_result["Name"] + output_names.append(orbit_file_name) + + logger.debug( f"Downloading Orbit file {orbit_file_name} from service endpoint " f"{download_url}" ) - output_orbit_file_path = download_orbit_file( - download_url, output_directory, orbit_file_name, access_token - ) - logger.info(f"Orbit file downloaded to {output_orbit_file_path}") - downloaded_paths.append(output_orbit_file_path) + downloaded_paths = [] + with ThreadPoolExecutor(max_workers=max_workers) as exc: + futures = [ + exc.submit( + download_orbit_file, + request_url=u, + output_directory=output_directory, + orbit_file_name=n, + access_token=access_token, + ) + for (u, n) in zip(download_urls, output_names) + ] + for f in futures: + downloaded_paths.append(f.result()) + return downloaded_paths diff --git a/eof/download.py b/eof/download.py index 60f6814..85440f7 100644 --- a/eof/download.py +++ b/eof/download.py @@ -30,6 +30,7 @@ from pathlib import Path from dateutil.parser import parse +from requests.exceptions import HTTPError from .asf_client import ASFClient from .dataspace_client import DataspaceClient @@ -102,9 +103,17 @@ def download_eofs( if query: logger.info("Attempting download from SciHub") - results = client.download_all(query, output_directory=save_dir) - filenames.extend(results) - dataspace_successful = True + try: + results = client.download_all( + query, output_directory=save_dir, max_workers=max_workers + ) + filenames.extend(results) + dataspace_successful = True + except HTTPError as e: + assert e.response is not None + if e.response.status_code == 429: + logger.warning(f"Failed due to too many requests: {e.args}") + # Dataspace failed -> try asf # For failures from scihub, try ASF if not dataspace_successful: