From 95feed5a3493da5e14e91b938e643df2b1f466ba Mon Sep 17 00:00:00 2001 From: Peter McEvoy Date: Mon, 25 Mar 2024 17:00:54 +0100 Subject: [PATCH 1/5] pyproject.toml: Use py311 and add tomli dependency --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index eec1f6f..1ce6a5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,8 +6,8 @@ build-backend = "setuptools.build_meta" [project] name = "cdsapi_helper" version = "0.0.1" -requires-python = ">=3.10" -dependencies = ["cdsapi", "click", "pandas"] +requires-python = ">=3.11" +dependencies = ["cdsapi", "click", "pandas", "tomli"] [project.scripts] download_era5 = "cdsapi_helper.cli:download_era5" download_cds = "cdsapi_helper.cli:download_cds" From a28ea3632073f91e89f6dd89dcf682f5f7c4aff1 Mon Sep 17 00:00:00 2001 From: Peter McEvoy Date: Thu, 11 Apr 2024 11:22:01 +0200 Subject: [PATCH 2/5] Add cache based on request hash Alter the download_cds command to download files to a user specified cache directory where files are named after `get_json_sem_hash(request)`. The `RequestEntry` class is introduce to keep the data necessary for producing the hash collected. Output according to `filename_spec` are created using symlinks to the cache. Main motivation for this is to allow multiple projects that might have overlapping data to re-use previously downloaded requests. --- README.md | 2 +- cdsapi_helper/cli.py | 162 +++++++++++++++++++++++++++++++++++--- cdsapi_helper/download.py | 52 +++++++++--- cdsapi_helper/utils.py | 53 +++++++++++-- 4 files changed, 237 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 8bae845..173f956 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Filenames are based on the parameters of the request, see `filename_spec` in the ```toml dataset = "reanalysis-era5-pressure-levels" looping_variables = ["variable", "year"] -filename_spec = ["variable", "year", "time"] +filename_spec = "{dataset}/{variable}-{year}" [request] product_type = "reanalysis" diff --git a/cdsapi_helper/cli.py b/cdsapi_helper/cli.py index 669649f..831b50d 100755 --- a/cdsapi_helper/cli.py +++ b/cdsapi_helper/cli.py @@ -1,15 +1,32 @@ #!/usr/bin/env python +import os +import sys from copy import deepcopy from itertools import product +from pathlib import Path from time import sleep +from typing import List, Optional +import datetime import cdsapi import click import pandas as pd import tomli -from .download import download_request, send_request, update_request -from .utils import build_request +from .download import ( + download_request, + get_json_sem_hash, + send_request, + update_request, + RequestEntry, +) +from .utils import ( + build_filename, + build_request, + resolve_and_get_local_cache, + print_files_and_size, + format_bytes, +) @click.command() @@ -60,7 +77,23 @@ def download_era5(variable: str, year: str, month: str, dry_run: bool) -> None: is_flag=True, show_default=True, default=False, - help="Dry run, no download.", + help="Dry run: no download and no symlinks.", +) +@click.option( + "--list-dangling", + "list_dangling", + is_flag=True, + show_default=True, + default=False, + help="List files in stdout for files that are not accounted for by spec file and exit", +) +@click.option( + "--list-files", + "list_files", + is_flag=True, + show_default=True, + default=False, + help="List cache files expected by the specification and exit.", ) @click.option( "--n-jobs", @@ -76,8 +109,22 @@ def download_era5(variable: str, year: str, month: str, dry_run: bool) -> None: type=click.BOOL, help="Keep running, waiting for requests to be processed.", ) +@click.option( + "--cache-dir", + "cache_dir", + show_default=True, + default=Path("./cache"), + type=Path, + help="Directory for local cache where downloads are stored and output files are linked to", +) def download_cds( - spec_path: str, n_jobs: int = 5, wait: bool = False, dry_run: bool = False + spec_path: str, + n_jobs: int, + wait: bool, + list_dangling: bool, + list_files: bool, + dry_run: bool, + cache_dir: Path, ) -> None: click.echo(f"Reading specification: {click.format_filename(spec_path)}") with open(spec_path, mode="rb") as fp: @@ -87,8 +134,9 @@ def download_cds( request = spec["request"] click.echo(f"Requesting variable: {request['variable']}.") + # Collect request entries generated by input spec file + request_entries = [] to_permutate = [request[var] for var in spec["looping_variables"]] - requests = [] for perm_spec in product(*to_permutate): perm_spec = { spec["looping_variables"][i]: perm_spec[i] @@ -97,15 +145,65 @@ def download_cds( sub_request = deepcopy(request) for key, value in perm_spec.items(): sub_request[key] = value - requests.append(sub_request) + request_entries.append( + RequestEntry( + dataset=dataset, + request=sub_request, + filename_spec=spec["filename_spec"], + ) + ) + + click.echo( + f"{len(request_entries)} request(s) generated by specification.", err=True + ) + + # Remove requests with invalid dates + def has_valid_date(req: RequestEntry): + year = int(req.request["year"]) + month = int(req.request["month"]) + day = int(req.request["day"]) - # Send the request - send_request(dataset, requests, dry_run) - # # # Update request + try: + _ = datetime.datetime(year=year, month=month, day=day) + except ValueError: + return False + + return True + + request_entries = list(filter(has_valid_date, request_entries)) + click.echo( + f"{len(request_entries)} request(s) remain after removing invalid dates.", + err=True, + ) + + local_cache = resolve_and_get_local_cache(cache_dir) + remaining_requests = list( + filter( + lambda r: r.get_sha256() not in local_cache, + request_entries, + ) + ) + + click.echo( + f"{len(request_entries)-len(remaining_requests)} local cache hits", err=True + ) + click.echo(f"{len(remaining_requests)} local cache misses", err=True) + + if list_dangling: + print_files_and_size([cache_dir / file for file in dangling_cache_files]) + sys.exit(0) + + if list_files: + print_files_and_size([cache_dir / r.get_sha256() for r in request_entries]) + sys.exit(0) + + send_request(dataset, remaining_requests, dry_run) + + # Check or wait for remaining_requests check_request_again = True while check_request_again: # First we try to download, likely in queue. - download_request(spec["filename_spec"], n_jobs=n_jobs, dry_run=dry_run) + download_request(cache_dir, n_jobs=n_jobs, dry_run=dry_run) # Then we update the request. update_request(dry_run) # How should we wait? @@ -127,3 +225,47 @@ def download_cds( check_request_again = False else: check_request_again = False + + # Check that all requests are downloaded. + for req_entry in request_entries: + cache_file = cache_dir / req_entry.get_sha256() + if not cache_file.exists(): + print(f"All requests are not downloaded. Exiting.") + print( + f"Missing expected cache file {cache_file} for request {req_entry.request}" + ) + sys.exit(1) + + # Create links to cached files according to filename_spec + count_missing = 0 + for req_entry in request_entries: + output_file = Path( + build_filename( + req_entry.dataset, req_entry.request, req_entry.filename_spec + ) + ) + cache_file = cache_dir / req_entry.get_sha256() + + if not cache_file.exists(): + print(f"Warning: Missing entry {cache_file} for {req_entry.request}") + count_missing = count_missing + 1 + + output_file.parent.mkdir(parents=True, exist_ok=True) + if output_file.exists(): + os.remove(output_file) + + os.symlink(cache_file.absolute(), output_file) + + assert count_missing == 0, "There were missing files!" + + # List summary of files not declared by input specs + local_cache = resolve_and_get_local_cache(cache_dir) + dangling_cache_files = set(local_cache) - {r.get_sha256() for r in request_entries} + if len(dangling_cache_files) > 0: + dangling_bytes = 0 + for file in dangling_cache_files: + dangling_bytes += (cache_dir / file).stat().st_size + print( + f"There are {len(dangling_cache_files)} ({format_bytes(dangling_bytes)}) dangling cache files not accounted for by input spec files." + ) + print(f"Use `--list-dangling` to display these files.") diff --git a/cdsapi_helper/download.py b/cdsapi_helper/download.py index 5e33efc..70287a6 100644 --- a/cdsapi_helper/download.py +++ b/cdsapi_helper/download.py @@ -1,15 +1,42 @@ from functools import partial from multiprocessing.pool import ThreadPool +from pathlib import Path from typing import Union import cdsapi import pandas as pd from requests.exceptions import HTTPError -from .utils import build_filename, get_json_sem_hash, request_to_df +from .utils import get_json_sem_hash, request_to_df -def send_request(dataset: str, request: Union[dict, list[dict]], dry_run: bool) -> None: +class RequestEntry: + def __init__(self, dataset, request, filename_spec): + self.dataset = dataset + self.request = request + self.filename_spec = filename_spec + + def get_sha256(self): + return get_json_sem_hash({"dataset": self.dataset, "request": self.request}) + + +# Check to ensure hash stability: +# fmt: off +expected_hash = "23cf15695d9f9396a8d39ee97f86e894bae0fa09e9c6ca86db619384428acda9" +assert ( + RequestEntry(dataset='reanalysis-era5-pressure-levels', request={'product_type': 'reanalysis', 'format': 'netcdf', 'variable': 'temperature', 'year': '2015', 'month': '01', 'day': '01', 'pressure_level': ['1', '2', '3', '5', '7', '10', '20', '30', '50', '70', '100', '125', '150', '175', '200', '225', '250', '300', '350', '400', '450', '500', '550', '600', '650', '700', '750', '775', '800', '825', '850', '875', '900', '925', '950', '975', '1000'], 'time': ['00:00', '01:00', '02:00', '03:00', '04:00', '05:00', '06:00', '07:00', '08:00', '09:00', '10:00', '11:00', '12:00', '13:00', '14:00', '15:00', '16:00', '17:00', '18:00', '19:00', '20:00', '21:00', '22:00', '23:00']}, filename_spec='not_relevant').get_sha256() + == expected_hash +), "RequestEntry.get_sha256() did not produce the expected hash!" +assert ( + RequestEntry(dataset='reanalysis-era5-pressure-levels', request={'format': 'netcdf', 'product_type': 'reanalysis', 'variable': 'temperature', 'year': '2015', 'month': '01', 'day': '01', 'pressure_level': ['1', '2', '3', '5', '7', '10', '20', '30', '50', '70', '100', '125', '150', '175', '200', '225', '250', '300', '350', '400', '450', '500', '550', '600', '650', '700', '750', '775', '800', '825', '850', '875', '900', '925', '950', '975', '1000'], 'time': ['00:00', '01:00', '02:00', '03:00', '04:00', '05:00', '06:00', '07:00', '08:00', '09:00', '10:00', '11:00', '12:00', '13:00', '14:00', '15:00', '16:00', '17:00', '18:00', '19:00', '20:00', '21:00', '22:00', '23:00']}, filename_spec='not_relevant').get_sha256() + == expected_hash +), "RequestEntry.get_sha256() did not produce the expected hash!" +# fmt: on + + +def send_request( + dataset: str, request_entries: list[RequestEntry], dry_run: bool +) -> None: client = cdsapi.Client(wait_until_complete=False, delete=False) try: @@ -17,22 +44,21 @@ def send_request(dataset: str, request: Union[dict, list[dict]], dry_run: bool) except FileNotFoundError: df = pd.DataFrame() - if isinstance(request, dict): - request = [request] - - for req in request: - req_hash = get_json_sem_hash(req) + for req_entry in request_entries: + req_hash = req_entry.get_sha256() try: duplicate = df["request_hash"].isin([req_hash]).any() except KeyError: duplicate = False if not duplicate: if not dry_run: - result = client.retrieve(dataset, req) + result = client.retrieve(dataset, req_entry.request) reply = result.reply else: + print(f"Would have sent request for {dataset}, {req_entry.request}") + # TODO: This causes issues when doing dry-run... reply = {"state": "test_state", "request_id": "test_id"} - r_df = request_to_df(req, reply, req_hash) + r_df = request_to_df(req_entry.request, reply, req_hash) df = pd.concat([df, r_df]) else: print("Request already sent.") @@ -72,7 +98,7 @@ def update_request(dry_run: bool) -> None: def download_request( - filename_spec: list, n_jobs: int = 5, dry_run: bool = False + output_folder: Path, n_jobs: int = 5, dry_run: bool = False ) -> None: try: df = pd.read_csv("./cds_requests.csv", index_col=0, dtype=str) @@ -82,7 +108,7 @@ def download_request( print("Downloading completed requests...") # Some parallel downloads. download_helper_p = partial( - download_helper, filename_spec=filename_spec, client=client, dry_run=dry_run + download_helper, output_folder=output_folder, client=client, dry_run=dry_run ) with ThreadPool(processes=n_jobs) as p: results = p.map(download_helper_p, df.itertuples()) @@ -95,7 +121,7 @@ def download_request( def download_helper( request: pd.core.frame.pandas, - filename_spec, + output_folder: Path, client: cdsapi.Client, dry_run: bool = False, ) -> str: @@ -103,7 +129,7 @@ def download_helper( try: result = cdsapi.api.Result(client, {"request_id": request.request_id}) result.update() - filename = build_filename(request, filename_spec) + filename = output_folder / request.request_hash if not dry_run: result.download(filename) return "downloaded" diff --git a/cdsapi_helper/utils.py b/cdsapi_helper/utils.py index fb48fa3..9eb0f02 100644 --- a/cdsapi_helper/utils.py +++ b/cdsapi_helper/utils.py @@ -1,10 +1,31 @@ import hashlib import os +import sys +import re +from pathlib import Path from typing import Dict, List, Union import pandas as pd +def format_bytes(size): + power = 2**10 + n = 0 + power_labels = {0: "B", 1: "KB", 2: "MB", 3: "GB", 4: "TB"} + while size > power: + size /= power + n += 1 + return f"{size:.2f} {power_labels[n]}" + + +def print_files_and_size(filepaths: List[Path]): + num_bytes = 0 + for filepath in filepaths: + print(f"{filepath}", file=sys.stdout) + num_bytes += filepath.stat().st_size + print(f"Files amount to {format_bytes(num_bytes)}.", file=sys.stderr) + + def build_request( variable: str, year: str, @@ -87,15 +108,21 @@ def request_to_df(request: dict, reply: dict, req_hash: str) -> pd.DataFrame: return df -def build_filename(request: dict, filename_spec: list) -> str: - filetype = ".nc" if request.format == "netcdf" else ".grib" - filename_parts = [] - for var in filename_spec: - part = str_to_list(getattr(request, var)) - part = "_".join(part) - filename_parts.append(part) +RE_FILENAMESPEC = re.compile(r"\{(\w+)\}") + + +def build_filename(dataset: str, request: dict, filename_spec: str) -> str: + flattened_request = dict(request) + flattened_request["dataset"] = dataset + + def replace_filespec(match): + tag = match.group(1) + return flattened_request[tag] - filename = "-".join(filename_parts) + filetype + filetype = ".nc" if request["format"] == "netcdf" else ".grib" + + filename = RE_FILENAMESPEC.sub(replace_filespec, filename_spec) + filename += filetype filename = os.path.join(os.path.curdir, filename) return filename @@ -122,3 +149,13 @@ def get_json_sem_hash(data: JsonTree, hasher=hashlib.sha256) -> str: def str_to_list(string: str) -> list: return string.strip("[]").replace("'", "").replace(" ", "").split(",") + + +def resolve_and_get_local_cache(cache_dir: Path): + cache_dir.mkdir(exist_ok=True) + + local_cache_entries = {f: cache_dir / f for f in os.listdir(cache_dir)} + + # TODO: Consider having a list of ready-only cache_dirs that we can symlink or copy from. + + return local_cache_entries From 6287d885af3ce4b8078311efe425c0c0fe7092d5 Mon Sep 17 00:00:00 2001 From: Peter McEvoy Date: Thu, 11 Apr 2024 12:45:20 +0200 Subject: [PATCH 3/5] Allow multiple input spec files --- cdsapi_helper/cli.py | 69 +++++++++++++++++++++++---------------- cdsapi_helper/download.py | 10 +++--- 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/cdsapi_helper/cli.py b/cdsapi_helper/cli.py index 831b50d..759d966 100755 --- a/cdsapi_helper/cli.py +++ b/cdsapi_helper/cli.py @@ -70,7 +70,7 @@ def download_era5(variable: str, year: str, month: str, dry_run: bool) -> None: @click.command() -@click.argument("spec_path", type=click.Path(exists=True)) +@click.argument("spec_paths", type=click.Path(exists=True), nargs=-1) @click.option( "--dry-run", "dry_run", @@ -118,7 +118,7 @@ def download_era5(variable: str, year: str, month: str, dry_run: bool) -> None: help="Directory for local cache where downloads are stored and output files are linked to", ) def download_cds( - spec_path: str, + spec_paths: List[str], n_jobs: int, wait: bool, list_dangling: bool, @@ -126,36 +126,42 @@ def download_cds( dry_run: bool, cache_dir: Path, ) -> None: - click.echo(f"Reading specification: {click.format_filename(spec_path)}") - with open(spec_path, mode="rb") as fp: - spec = tomli.load(fp) - - dataset = spec["dataset"] - request = spec["request"] - click.echo(f"Requesting variable: {request['variable']}.") - - # Collect request entries generated by input spec file request_entries = [] - to_permutate = [request[var] for var in spec["looping_variables"]] - for perm_spec in product(*to_permutate): - perm_spec = { - spec["looping_variables"][i]: perm_spec[i] - for i in range(len(spec["looping_variables"])) - } - sub_request = deepcopy(request) - for key, value in perm_spec.items(): - sub_request[key] = value - request_entries.append( - RequestEntry( - dataset=dataset, - request=sub_request, - filename_spec=spec["filename_spec"], + filename_specs = [] + for spec_path in spec_paths: + num_request_for_spec = 0 + click.echo( + f"Reading specification: {click.format_filename(spec_path)}", err=True + ) + with open(spec_path, mode="rb") as fp: + spec = tomli.load(fp) + + dataset = spec["dataset"] + request = spec["request"] + filename_specs.append(spec["filename_spec"]) + to_permutate = [request[var] for var in spec["looping_variables"]] + for perm_spec in product(*to_permutate): + perm_spec = { + spec["looping_variables"][i]: perm_spec[i] + for i in range(len(spec["looping_variables"])) + } + sub_request = deepcopy(request) + for key, value in perm_spec.items(): + sub_request[key] = value + request_entries.append( + RequestEntry( + dataset=dataset, + request=sub_request, + filename_spec=spec["filename_spec"], + ) ) + num_request_for_spec += 1 + + click.echo( + f"{num_request_for_spec} request(s) generated by {spec_path}.", err=True ) - click.echo( - f"{len(request_entries)} request(s) generated by specification.", err=True - ) + click.echo(f"{len(request_entries)} request(s) generated in total.", err=True) # Remove requests with invalid dates def has_valid_date(req: RequestEntry): @@ -197,7 +203,7 @@ def has_valid_date(req: RequestEntry): print_files_and_size([cache_dir / r.get_sha256() for r in request_entries]) sys.exit(0) - send_request(dataset, remaining_requests, dry_run) + send_request(remaining_requests, dry_run) # Check or wait for remaining_requests check_request_again = True @@ -237,6 +243,7 @@ def has_valid_date(req: RequestEntry): sys.exit(1) # Create links to cached files according to filename_spec + num_links_per_spec = {s: 0 for s in filename_specs} count_missing = 0 for req_entry in request_entries: output_file = Path( @@ -255,6 +262,10 @@ def has_valid_date(req: RequestEntry): os.remove(output_file) os.symlink(cache_file.absolute(), output_file) + num_links_per_spec[req_entry.filename_spec] += 1 + + for spec, num in num_links_per_spec.items(): + print(f'Created {num} symlinks for filename_spec "{spec}"') assert count_missing == 0, "There were missing files!" diff --git a/cdsapi_helper/download.py b/cdsapi_helper/download.py index 70287a6..a4b8854 100644 --- a/cdsapi_helper/download.py +++ b/cdsapi_helper/download.py @@ -34,9 +34,7 @@ def get_sha256(self): # fmt: on -def send_request( - dataset: str, request_entries: list[RequestEntry], dry_run: bool -) -> None: +def send_request(request_entries: list[RequestEntry], dry_run: bool) -> None: client = cdsapi.Client(wait_until_complete=False, delete=False) try: @@ -52,10 +50,12 @@ def send_request( duplicate = False if not duplicate: if not dry_run: - result = client.retrieve(dataset, req_entry.request) + result = client.retrieve(req_entry.dataset, req_entry.request) reply = result.reply else: - print(f"Would have sent request for {dataset}, {req_entry.request}") + print( + f"Would have sent request for {req_entry.dataset}, {req_entry.request}" + ) # TODO: This causes issues when doing dry-run... reply = {"state": "test_state", "request_id": "test_id"} r_df = request_to_df(req_entry.request, reply, req_hash) From f03beb74edc21740d65e78049c0532011f252087 Mon Sep 17 00:00:00 2001 From: Peter McEvoy Date: Thu, 11 Apr 2024 13:55:50 +0200 Subject: [PATCH 4/5] Split download_cds into subcommands The mutually exclusive options to download_cds are split into click subcommands: - download - list-files - list-dangling --- README.md | 2 +- cdsapi_helper/cli.py | 237 ++++++++++++++++++++++++++----------------- 2 files changed, 146 insertions(+), 93 deletions(-) diff --git a/README.md b/README.md index 173f956..c045dfa 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ pressure_level = [ time = ["00:00", "06:00", "12:00", "18:00"] ``` The request should be a standard cdspi request, and it will be expanded according to `looping_variables` when run. -This means that when run with the above in `example_spec.toml` e.g. `download_cds ./example_spec.toml`, one request will be sent for each combination of the entries is `variable` and `year` (9 requests/files in total). +This means that when run with the above in `example_spec.toml` e.g. `download_cds download ./example_spec.toml`, one request will be sent for each combination of the entries is `variable` and `year` (9 requests/files in total). ## Installation The easiest way to install is to clone this repository, `cd cdsapi_helper` and diff --git a/cdsapi_helper/cli.py b/cdsapi_helper/cli.py index 759d966..0c81b81 100755 --- a/cdsapi_helper/cli.py +++ b/cdsapi_helper/cli.py @@ -69,76 +69,15 @@ def download_era5(variable: str, year: str, month: str, dry_run: bool) -> None: print(request) -@click.command() -@click.argument("spec_paths", type=click.Path(exists=True), nargs=-1) -@click.option( - "--dry-run", - "dry_run", - is_flag=True, - show_default=True, - default=False, - help="Dry run: no download and no symlinks.", -) -@click.option( - "--list-dangling", - "list_dangling", - is_flag=True, - show_default=True, - default=False, - help="List files in stdout for files that are not accounted for by spec file and exit", -) -@click.option( - "--list-files", - "list_files", - is_flag=True, - show_default=True, - default=False, - help="List cache files expected by the specification and exit.", -) -@click.option( - "--n-jobs", - "n_jobs", - show_default=True, - default=5, - type=click.INT, -) -@click.option( - "--wait", - "wait", - is_flag=True, - type=click.BOOL, - help="Keep running, waiting for requests to be processed.", -) -@click.option( - "--cache-dir", - "cache_dir", - show_default=True, - default=Path("./cache"), - type=Path, - help="Directory for local cache where downloads are stored and output files are linked to", -) -def download_cds( - spec_paths: List[str], - n_jobs: int, - wait: bool, - list_dangling: bool, - list_files: bool, - dry_run: bool, - cache_dir: Path, -) -> None: +def generate_request_entries_from_specs(spec_paths): request_entries = [] - filename_specs = [] for spec_path in spec_paths: num_request_for_spec = 0 - click.echo( - f"Reading specification: {click.format_filename(spec_path)}", err=True - ) with open(spec_path, mode="rb") as fp: spec = tomli.load(fp) dataset = spec["dataset"] request = spec["request"] - filename_specs.append(spec["filename_spec"]) to_permutate = [request[var] for var in spec["looping_variables"]] for perm_spec in product(*to_permutate): perm_spec = { @@ -157,12 +96,6 @@ def download_cds( ) num_request_for_spec += 1 - click.echo( - f"{num_request_for_spec} request(s) generated by {spec_path}.", err=True - ) - - click.echo(f"{len(request_entries)} request(s) generated in total.", err=True) - # Remove requests with invalid dates def has_valid_date(req: RequestEntry): year = int(req.request["year"]) @@ -177,10 +110,134 @@ def has_valid_date(req: RequestEntry): return True request_entries = list(filter(has_valid_date, request_entries)) + + return request_entries + + +@click.group() +@click.option( + "--cache-dir", + "cache_dir", + show_default=True, + default=Path("./cache"), + type=Path, + help="Directory for local cache where downloads are stored and output files are linked to", +) +@click.pass_context +def download_cds( + ctx, + cache_dir: Path, +) -> None: + ctx.ensure_object(dict) + ctx.obj["cache_dir"] = cache_dir + + +@download_cds.command( + help="List files in stdout for files that are not accounted for by spec files and exit", +) +@click.argument("spec_paths", type=click.Path(exists=True), nargs=-1) +@click.pass_context +def list_dangling( + ctx, + spec_paths: List[str], +) -> None: + cache_dir = ctx.obj["cache_dir"] + request_entries = generate_request_entries_from_specs(spec_paths) + local_cache = resolve_and_get_local_cache(cache_dir) + dangling_cache_files = set(local_cache) - {r.get_sha256() for r in request_entries} + print_files_and_size([cache_dir / file for file in dangling_cache_files]) + sys.exit(0) + + +@download_cds.command( + help="List cache files in stdout expected by the specifications and exit. Exit success if all files exist.", +) +@click.argument("spec_paths", type=click.Path(exists=True), nargs=-1) +@click.pass_context +def list_files( + ctx, + spec_paths: List[str], +) -> None: + cache_dir = ctx.obj["cache_dir"] + + request_entries = generate_request_entries_from_specs(spec_paths) + expected_files = {cache_dir / file.get_sha256() for file in request_entries} + + for file in expected_files: + click.echo(file) + + # Summary about size and potentially missing files + local_cache = {cache_dir / file for file in resolve_and_get_local_cache(cache_dir)} + expected_existing = local_cache.intersection(expected_files) + + num_bytes_existing = sum((f.stat().st_size for f in expected_existing)) + click.echo(f"Existing files amount to {format_bytes(num_bytes_existing)}", err=True) + + expected_missing = expected_files - expected_existing click.echo( - f"{len(request_entries)} request(s) remain after removing invalid dates.", + f"There are {len(expected_existing)} expected files that exist.", err=True, ) + click.echo( + click.style( + f"There are {len(expected_missing)} expected files that are missing.", + fg="green" if len(expected_missing) == 0 else "red", + ), + err=True, + ) + if len(expected_missing) > 0: + sys.exit(1) + + sys.exit(0) + + +@download_cds.command( + help="Download files and create output directories according to spec files." +) +@click.pass_context +@click.argument("spec_paths", type=click.Path(exists=True), nargs=-1) +@click.option( + "--dry-run", + "dry_run", + is_flag=True, + show_default=True, + default=False, + help="Dry run: no download and no symlinks.", +) +@click.option( + "--n-jobs", + "n_jobs", + show_default=True, + default=5, + type=click.INT, +) +@click.option( + "--wait", + "wait", + is_flag=True, + type=click.BOOL, + help="Keep running, waiting for requests to be processed.", +) +@click.option( + "--output-dir", + "output_dir", + show_default=True, + default=Path("./output"), + type=Path, + help="Directory from which to create files according to filename_spec in spec files.", +) +def download( + ctx, + spec_paths: List[str], + n_jobs: int, + wait: bool, + dry_run: bool, + output_dir: Path, +) -> None: + cache_dir = ctx.obj["cache_dir"] + + request_entries = generate_request_entries_from_specs(spec_paths) + click.echo(f"{len(request_entries)} request(s) generated in total.", err=True) local_cache = resolve_and_get_local_cache(cache_dir) remaining_requests = list( @@ -195,14 +252,6 @@ def has_valid_date(req: RequestEntry): ) click.echo(f"{len(remaining_requests)} local cache misses", err=True) - if list_dangling: - print_files_and_size([cache_dir / file for file in dangling_cache_files]) - sys.exit(0) - - if list_files: - print_files_and_size([cache_dir / r.get_sha256() for r in request_entries]) - sys.exit(0) - send_request(remaining_requests, dry_run) # Check or wait for remaining_requests @@ -236,25 +285,29 @@ def has_valid_date(req: RequestEntry): for req_entry in request_entries: cache_file = cache_dir / req_entry.get_sha256() if not cache_file.exists(): - print(f"All requests are not downloaded. Exiting.") - print( - f"Missing expected cache file {cache_file} for request {req_entry.request}" + click.echo(f"All requests are not downloaded. Exiting.", err=True) + click.echo( + click.style( + f"Missing expected cache file {cache_file} for request {req_entry.request}", + fg="red", + ), + err=True, ) sys.exit(1) # Create links to cached files according to filename_spec - num_links_per_spec = {s: 0 for s in filename_specs} + num_links = 0 count_missing = 0 for req_entry in request_entries: - output_file = Path( - build_filename( - req_entry.dataset, req_entry.request, req_entry.filename_spec - ) + output_file = output_dir / build_filename( + req_entry.dataset, req_entry.request, req_entry.filename_spec ) cache_file = cache_dir / req_entry.get_sha256() if not cache_file.exists(): - print(f"Warning: Missing entry {cache_file} for {req_entry.request}") + click.echo( + f"Warning: Missing entry {cache_file} for {req_entry.request}", err=True + ) count_missing = count_missing + 1 output_file.parent.mkdir(parents=True, exist_ok=True) @@ -262,10 +315,9 @@ def has_valid_date(req: RequestEntry): os.remove(output_file) os.symlink(cache_file.absolute(), output_file) - num_links_per_spec[req_entry.filename_spec] += 1 + num_links += 1 - for spec, num in num_links_per_spec.items(): - print(f'Created {num} symlinks for filename_spec "{spec}"') + click.echo(f"Created {num_links} symlinks.", err=True) assert count_missing == 0, "There were missing files!" @@ -276,7 +328,8 @@ def has_valid_date(req: RequestEntry): dangling_bytes = 0 for file in dangling_cache_files: dangling_bytes += (cache_dir / file).stat().st_size - print( - f"There are {len(dangling_cache_files)} ({format_bytes(dangling_bytes)}) dangling cache files not accounted for by input spec files." + click.echo( + f"There are {len(dangling_cache_files)} ({format_bytes(dangling_bytes)}) dangling cache files not accounted for by input spec files.", + err=True, ) - print(f"Use `--list-dangling` to display these files.") + click.echo(f"Use `list-dangling` subcommand to display these files.", err=True) From 9dbfaa917ada2c1c3f67c75b21b42d7f3ccf406e Mon Sep 17 00:00:00 2001 From: Peter McEvoy Date: Fri, 12 Apr 2024 11:00:28 +0200 Subject: [PATCH 5/5] Fix 'Pandas doesn't allow columns to be created via a new attribute name' warning --- cdsapi_helper/download.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdsapi_helper/download.py b/cdsapi_helper/download.py index a4b8854..a4a8d1f 100644 --- a/cdsapi_helper/download.py +++ b/cdsapi_helper/download.py @@ -114,7 +114,7 @@ def download_request( results = p.map(download_helper_p, df.itertuples()) # Write new states. - df.state = results + df['state'] = results # Save them. df.to_csv("./cds_requests.csv")