Skip to content

Commit

Permalink
Add module for computing river flood footprints from GloFAS river dis…
Browse files Browse the repository at this point in the history
…charge data (#64)

* Add util functions for downloading GloFAS data

Use cdsapi to download GloFAS data from Copernicus Data Store. So far,
the actual download is not tested.

* Add util functions for downloading data.
* Add unit tests for request handling.

* Add 'cdsapi' to requirements

* Allow `date_to=None` to download only a single file

* [draft] Add class for processing GloFAS river flood

* Working on glofas flood stuff [revise!]

* Add tests for dantro operations

* Update operations to fix issues found when writing the tests.
* Add unit test case for dantro operations.
* Tweak CDS GloFAS downloader.

* Add dantro to requirements

* Update GloFAS river flood pipeline

* Add option to set countries instead of lat/lon limits when downloading
  GloFAS data.
* Return pandas Series of Hazards with multi index.
* Use discharge dataset for lat/lon slicing of all other datasets.
* Add unit tests.

* Make CDS Downloader support skipping downloads

Downloads will be skipped if the target file exists with the same
request dict.

* Place the request as YAML file next to the target file for request
  comparison.
* Add option to control using the "cached" results or always downloading
  the data.
* Update unit tests.
* Explicitly list ruamel.yaml as requirement (already required by
  dantro).

* Fix an issue where ruamel.yaml could not dump numbers

* Add 'max_from_isel' operation

NOTE: Commented code would be an alternative to define the select
dimension based on values instead of indices.

* Add operation
* Add test case for operation

* Handle NaNs in flood depth interpolation

* Update unit tests accordingly.
* Add core dimension checks to flood depth unit tests.

* Add routines for computing default files for GloFAS flood module

* Add operations and config for computing the GEV fits and merging flood
  maps, which are both used for computing a flood footprint.
* Update affected operations and configs.
* Remove GloFASRiverFlood class in favor of two functions.
* Update tests

* Overhaul GloFAS flood pipeline

* Move respective files into their own subdirectory.
* Adapt configuration files to latest dantro version.
* Add 'transform_ops.py' containing only dantro transformations.
* Expose user functions via dedicated __init__.py
* Add option to run tasks in parallel

* Rename 'test_glofas_rf.py' to 'rest_rf_glofas.py'

* Rename 'rf_glofas_util.yml' to 'setup.yml' and expose dantro_transform function

* Add rioxarray as dependency

Used for reading GeoTIFF with xarray.

* Update test_rf_glofas.py imports to new module structure

* Rework user experience of rf_glofas module and write full documentation

* Fix formatting in glofas_rf docs

* Fix type hints and data manager container type

* Add operation for including FLOPROS database

* Add tutorial for GloFAS river flood module

* WIP: Add bootstrap resampling to return period computation

* Fix glofas_rf unit tests, formatting, and docstrings

* Fix return period sampling and update tests

* Add tests for 'save_file' and 'finalize'

* Fix return period resampling

Actually do the resampling instead of resampling once and copying the
new value. Update tests accordingly.

* Add module docstrings and fix linter warnings in rf_glofas

* Improve docstrings and try to please linter

* Add links to ETH research collection datasets

* Update env_docs.yml

* Fix env_docs.yml

* Fix order in doc toctree

* Upgrade matplotlib in doc environment

* Revert "Upgrade matplotlib in doc environment"

This reverts commit 94c210f.

* Mock dantro when building docs

* Comply to latest climada_python/develop

* readthedocs: add glofas to tutorials section of the navigation bar

* jenkinsfile: install new dependencies on the fly

* fix error in jenkins file

* Add xesmf regrid operation

* Add 'xesmf' to requirements

* Fix bug in test file

* Add new requirements to Jenkinsfile

* Mock modules when building docs to make the automated build succeed

* Fix typos in linter instructions

* Only use pip to install missing dependencies

* Fix pip install in Jenkinsfile

* conda-env: install xesmf

* Try 'pip install --upgrade' in Jenkinsfile

* Do not upgrade to matplotlib 3.6 in Jenkinsfile

* Fix an issue where the GeoDataFrame loader is not available

* Fix an issue where infinite return periods would lead to NaN flood depths

* rf_glofas: try to circumvent "ImportError: The ESMFMKFILE environment variable is not available."

* rf_glofas: try to circumvent "ImportError: The ESMFMKFILE environment variable is not available.", bis

* Apply suggestions from code review

Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>

* Improve performance of inundation computation

* Vectorize computations of return period and inundation.
* Define maximum return period to avoid inf.
* Return Hazard instead of RiverFlood instances.

* Working on new version without dantro

* Make parallelization of 'setup_gumbel_fit' external

* Fix bug in calling 'download_glofas_discharge'

* Make sure netcdf4 engine is used to store data

* Various changes

* Improve file opening and closing with custom context manager.
* Set new default data directory.
* Add option to download reanalysis.

* Improve compute algorithm and add docstrings

* Do not store everything, avoid zlib

* Update transform operations tests

* Remove unused 'finalize' function

* Remove dantro pipeline config files

* Remove dantro-related functions and classes

* Rework tutorial, docstrings, improve usability

* Rework tutorial, docstrings, improve usability

* Remove pipeline config files for tutorial

* Add 'xesmf' to conda environment specs

* Update requirement handling in Jenkinsfile

* RiverFlood.__init__: re-merge from dev

* RiverFlood.__init__: re-merge from dev

* Add ruamel.yaml to requirements

* Move CDS downloader into rf_glofas folder

Adapt imports, tests, docs

* Update docs and docstrings

* Fix logger for river_flood_computation.py

* Fix name for _RiverFloodCachePaths

* Fix linter issues

* Add tests for RiverFloodInundation

* Add cdsapi as dependency

* env_docs: include additional dependencies

* Add rioxarray to requirements

* Apply suggestions for test_preprocess

Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>

* Update climada_petals/hazard/rf_glofas/test/test_river_flood_computation.py

Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>

* Apply suggestions for cds_glofas_downloader.py

Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>

* Revert changes to river_flood.py

* Remove import mocking in doc/conf.py

* Update climada_petals/hazard/rf_glofas/test/test_rf_glofas.py

Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>

* Make sure tests operate on floats

* Avoid division by zero in return period computation

Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>

* Improve testing of return_period_resample

* Rename test_rf_glofas.py to test_transform_ops.py

* Add tests for rf_glofas.py

* Fix a bug where unit tests were not executed if called directly

* Clean up temporary directories explicitly

* Update climada_petals/hazard/rf_glofas/setup.py

Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>

* Update climada_petals/hazard/rf_glofas/transform_ops.py

Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>

* Only use dimensions with time information for events

* Render docs with 'myst_nb'

* Add myst-nb parser to doc requirements

* Mock xesmf in sphinx build

Installing xesmf would require to reload the environment,
which does not happen online.

* Avoid squeezing of 'time' dimension when opening discharge data

* Use hashed download paths for CDS data

This avoids overwriting data downloaded for the same day (forecast)
or year (reanalysis/historical).

* Update tutorial

* Apply suggestions from code review

Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>

* update glofas tutorial after code review

* update glofas tutorial to make selection in multiindex more stable

* fix pylint

* Update docstrings

---------

Co-authored-by: Lukas Riedel <lukas.riedel@meteoswiss.ch>
Co-authored-by: emanuel-schmid <schmide@ethz.ch>
Co-authored-by: Thomas Vogt <57705593+tovogt@users.noreply.github.com>
Co-authored-by: Thomas Roosli <thomas.roeoesli@meteoswiss.ch>
  • Loading branch information
5 people authored Mar 5, 2024
1 parent 594f7ef commit c118b6c
Show file tree
Hide file tree
Showing 22 changed files with 7,285 additions and 17 deletions.
25 changes: 25 additions & 0 deletions climada_petals/hazard/rf_glofas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Export functions of the GloFAS River Flood Module
"""

from .setup import setup_all
from .river_flood_computation import RiverFloodInundation
from .rf_glofas import hazard_series_from_dataset
from .transform_ops import save_file
291 changes: 291 additions & 0 deletions climada_petals/hazard/rf_glofas/cds_glofas_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Functions for downloading GloFAS river discharge data from the Copernicus Climate Data
Store (CDS).
"""

from pathlib import Path
import multiprocessing as mp
from copy import deepcopy
from typing import Iterable, Mapping, Any, Optional, List, Union
from itertools import repeat
from datetime import date, datetime
import logging
import hashlib

from cdsapi import Client
from ruamel.yaml import YAML
from ruamel.yaml.compat import StringIO
import pandas as pd
import numpy as np

from climada.util.constants import SYSTEM_DIR

LOGGER = logging.getLogger(__name__)

CDS_DOWNLOAD_DIR = Path(SYSTEM_DIR, "cds-download")

DEFAULT_REQUESTS = {
"historical": {
"variable": "river_discharge_in_the_last_24_hours",
"product_type": "consolidated",
"system_version": "version_3_1",
"hydrological_model": "lisflood",
"format": "grib",
"hyear": "1979",
"hmonth": [
"january",
"february",
"march",
"april",
"may",
"june",
"july",
"august",
"september",
"october",
"november",
"december",
],
"hday": [f"{day:02}" for day in range(1, 32)],
},
"forecast": {
"variable": "river_discharge_in_the_last_24_hours",
"product_type": "ensemble_perturbed_forecasts",
"system_version": "version_3_1",
"hydrological_model": "lisflood",
"format": "grib",
"year": "2022",
"month": "08",
"day": "01",
"leadtime_hour": (np.arange(1, 31) * 24).astype(str).tolist(),
},
}
"""Default request keyword arguments to be updated by the user requests"""


def request_to_md5(request: Mapping[Any, Any]) -> str:
"""Hash a string with the MD5 algorithm"""
yaml = YAML()
stream = StringIO()
yaml.dump(request, stream)
return hashlib.md5(stream.getvalue().encode("utf-8")).hexdigest()


def cleanup_download_dir(
download_dir: Union[Path, str] = CDS_DOWNLOAD_DIR, dry_run: bool = False
):
"""Delete the contents of the download directory"""
for filename in Path(download_dir).glob("*"):
LOGGER.debug("Removing file: %s", filename)
if not dry_run:
filename.unlink()
if dry_run:
LOGGER.debug("Dry run. No files removed")


def glofas_request_single(
product: str,
request: Mapping[str, Any],
outpath: Union[Path, str],
use_cache: bool = True,
client_kw: Optional[Mapping[str, Any]] = None,
) -> Path:
"""Perform a single request for data from the Copernicus data store
This will skip the download if a file was found at the target location with the same
request. The request will be stored as YAML file alongside the target file and used
for comparison. This behavior can be adjusted with the ``use_cache`` parameter.
Parameters
----------
product : str
The string identifier of the product in the Copernicus data store
request : dict
The download request as dictionary
outpath : str or Path
The file path to store the download into (including extension)
use_cache : bool (optional)
Skip downloading if the target file exists and the accompanying request file
contains the same request
client_kw : dict (optional)
Dictionary with keyword arguments for the ``cdsapi.Client`` used for downloading
"""
# Define output file
outpath = Path(outpath)
request_hash = request_to_md5(request)
outfile = outpath / (
datetime.today().strftime("%y%m%d-%H%M%S") + f"-{request_hash}"
)
extension = ".grib" if request["format"] == "grib" else ".nc"
outfile = outfile.with_suffix(extension)

# Check if request was issued before
if use_cache:
for filename in outpath.glob(f"*{extension}"):
if request_hash == filename.stem.split("-")[-1]:
LOGGER.info(
"Skipping request for file '%s' because it already exists", outfile
)
return filename.resolve()

# Set up client and retrieve data
LOGGER.info("Downloading file: %s", outfile)
client_kw_default = dict(quiet=False, debug=False)
if client_kw is not None:
client_kw_default.update(client_kw)
client = Client(**client_kw_default)
client.retrieve(product, request, outfile)

# Dump request
yaml = YAML()
yaml.dump(request, outfile.with_suffix(".yml"))

# Return file path
return outfile.resolve()


def glofas_request_multiple(
product: str,
requests: Iterable[Mapping[str, str]],
outdir: Union[Path, str],
num_proc: int,
use_cache: bool,
client_kw: Optional[Mapping[str, Any]] = None,
) -> List[Path]:
"""Execute multiple requests to the Copernicus data store in parallel"""
with mp.Pool(num_proc) as pool:
return pool.starmap(
glofas_request_single,
zip(
repeat(product),
requests,
repeat(outdir),
repeat(use_cache),
repeat(client_kw),
),
)


def glofas_request(

Check warning on line 186 in climada_petals/hazard/rf_glofas/cds_glofas_downloader.py

View check run for this annotation

Jenkins - WCR / Pylint

too-many-arguments

LOW: Too many arguments (8/7)
Raw output
Used when a function or method takes too many arguments.
product: str,
date_from: str,
date_to: Optional[str],
output_dir: Union[Path, str],
num_proc: int = 1,
use_cache: bool = True,
request_kw: Optional[Mapping[str, str]] = None,
client_kw: Optional[Mapping[str, Any]] = None,
) -> List[Path]:
"""Request download of GloFAS data products from the Copernicus Data Store (CDS)
Uses the Copernicus Data Store API (cdsapi) Python module. The interpretation of the
``date`` parameters and the grouping of the downloaded data depends on the type of
``product`` requested.
Available ``products``:
- ``historical``: Historical reanalysis discharge data. ``date_from`` and ``date_to``
are interpreted as integer years. Data for each year is placed into a single file.
- ``forecast``: Forecast discharge data. ``date_from`` and ``date_to`` are
interpreted as ISO date format strings. Data for each day is placed into a single
file.
Notes
-----
Downloading data from the CDS requires authentication via a user key which is granted
to each user upon registration. Do the following **before calling this function**:
- Create an account at the Copernicus Data Store website:
https://cds.climate.copernicus.eu/
- Follow the instructions to install the CDS API key:
https://cds.climate.copernicus.eu/api-how-to#install-the-cds-api-key
Parameters
----------
product : str
The indentifier for the CMS product to download. See below for available options.
date_from : str
First date to download data for. Interpretation varies based on ``product``.
date_to : str or None
Last date to download data for. Interpretation varies based on ``product``. If
``None``, or the same date as ``date_from``, only download data for ``date_from``
output_dir : Path
Output directory for the downloaded data
num_proc : int
Number of processes used for parallel requests
use_cache : bool (optional)
Skip downloading if the target file exists and the accompanying request file
contains the same request
request_kw : dict(str: str)
Dictionary to update the default request for the given product
client_kw : dict (optional)
Dictionary with keyword arguments for the ``cdsapi.Client`` used for downloading
Returns
-------
list of Path
Paths of the downloaded files
"""
# Check if product exists
try:
default_request = deepcopy(DEFAULT_REQUESTS[product])
except KeyError as err:
raise NotImplementedError(
f"product = {product}. Choose from {list(DEFAULT_REQUESTS.keys())}"
) from err

# Update with request_kw
if request_kw is not None:
default_request.update(**request_kw)

if product == "historical":
# Interpret dates as years only
year_from = int(date_from)
year_to = int(date_to) if date_to is not None else year_from

# List up all requests
requests = [
{"hyear": str(year)} for year in list(range(year_from, year_to + 1))
]

elif product == "forecast":
# Download single date if 'date_to' is 'None'
date_from: date = date.fromisoformat(date_from)
date_to: date = (
date.fromisoformat(date_to) if date_to is not None else date_from
)

# List up all requests
dates = pd.date_range(date_from, date_to, freq="D", inclusive="both").date

Check warning on line 276 in climada_petals/hazard/rf_glofas/cds_glofas_downloader.py

View check run for this annotation

Jenkins - WCR / Pylint

no-member

HIGH: Instance of 'DatetimeIndex' has no 'date' member
Raw output
Used when a variable is accessed for an unexistent member.
requests = [
{"year": str(d.year), "month": f"{d.month:02d}", "day": f"{d.day:02d}"}
for d in dates
]

else:
NotImplementedError("Unknown product: %s" % product)

requests = [{**default_request, **req} for req in requests]
glofas_product = f"cems-glofas-{product}"

# Execute request
return glofas_request_multiple(
glofas_product, requests, output_dir, num_proc, use_cache, client_kw
)
Loading

0 comments on commit c118b6c

Please sign in to comment.