From 99b283be56b86750088d2ff47dbf7d4f7689da12 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Tue, 16 Jul 2024 09:34:41 +0200 Subject: [PATCH] wip: first idea for pipelines --- src/pymorize/generic.py | 112 ++++++++++++++++++++++++++----- src/pymorize/generic/__init__.py | 99 --------------------------- src/pymorize/pipeline.py | 33 +++++++++ tests/__init__.py | 0 tests/test_pipeline.py | 11 +++ 5 files changed, 138 insertions(+), 117 deletions(-) delete mode 100644 src/pymorize/generic/__init__.py create mode 100644 src/pymorize/pipeline.py create mode 100644 tests/__init__.py create mode 100644 tests/test_pipeline.py diff --git a/src/pymorize/generic.py b/src/pymorize/generic.py index cee79689..43c91f3b 100644 --- a/src/pymorize/generic.py +++ b/src/pymorize/generic.py @@ -8,17 +8,6 @@ - `linear_transform`: Applies a linear transformation to the data of a NetCDF file. - `invert_z_axis`: Inverts the z-axis of a NetCDF file. -Classes: -- `Rule`: Defines a set of instructions to process a file and convert it to a CMOR standard. -- `CMORMetadata`: Inherits from `Rule` and adds CMOR metadata to a NetCDF file. -- `CMORizer`: Manages a set of `Rule` objects and applies them to NetCDF files. - -Each `Rule` object represents a set of transformations that should be applied to a NetCDF file to -make it comply with the CMOR standard. The `CMORizer` class manages a collection of `Rule` objects and -applies the appropriate ones to each file. - -The `CMORMetadata` class is a special type of `Rule` that adds CMOR metadata to a file. - The Full CMOR (yes, bad pun): * Applied if no other rule sets are given for a file * Adds CMOR metadata to the file @@ -27,15 +16,102 @@ """ import datetime -import json -import re -import sys -from functools import partial from pathlib import Path -import questionary import xarray as xr -import yaml from loguru import logger -from .utils import generate_partial_function, get_callable_by_name + +def convert_units(filepath: Path, source_units: str, target_units: str): + """ + Converts the units of a NetCDF file, using the Pint library. + + Parameters + ---------- + filepath : Path + Path to the input file. + source_units : str + target_units : str + """ + ds = xr.open_dataset(filepath) + ds = ds.pint.quantify() + ds = ds.pint.to(target_units) + ds = ds.pint.dequantify() + logger.info(f"Converted units of {filepath} from {source_units} to {target_units}") + ds.to_netcdf(filepath) + + +def set_cmor_metadata(filepath: Path, cmor_metadata: dict, attrs_to_skip=[]): + """ + Adds CMOR metadata to a NetCDF file. + + Parameters + ---------- + filepath : Path + Path to the input file. + cmor_metadata : dict + Dictionary with the CMOR metadata to be added to the file. + attrs_to_skip : list of str, optional + List of attributes to skip when adding CMOR metadata. + """ + attrs_to_skip = attrs_to_skip or ["units", "cell_methods", "cell_measures"] + ds = xr.open_dataset(filepath) + for key, value in cmor_metadata.items(): + if key in attrs_to_skip: + continue + ds.attrs[key] = value + hist_str = ds.attrs.get("history", "") + hist_str += f"\n{datetime.now()}: CMOR metadata added by ``pymorize``\n" + ds.to_netcdf(filepath) + + +def linear_transform( + filepath: Path, execute: bool = False, slope: float = 1, offset: float = 0 +): + """ + Applies a linear transformation to the data of a NetCDF file. + + Parameters + ---------- + filepath : Path + Path to the input file. + execute : bool, optional + slope: float, optional + offset: float, optional + """ + if execute: + ds = xr.open_dataset(filepath) + ds = ds * slope + offset + logger.info(f"Applied linear transformation to {filepath}") + ds.to_netcdf(filepath) + else: + logger.info(f"Would apply linear transformation to {filepath}") + logger.info(f"slope: {slope}, offset: {offset}") + logger.info("Use `execute=True` to apply changes") + + +def invert_z_axis(filepath: Path, execute: bool = False, flip_sign: bool = False): + """ + Inverts the z-axis of a NetCDF file. + + Parameters + ---------- + filepath : Path + Path to the input file. + execute : bool, optional + If True, the function will execute the inversion. If False, it will + only print the changes that would be made. + """ + if execute: + ds = xr.open_dataset(filepath) + ds = ds.reindex(z=ds.z[::-1]) + logger.info(f"Inverted order of z-axis of {filepath}") + if flip_sign: + ds["z"] *= -1 + logger.info(f"Flipped sign of z-axis of {filepath}") + ds.to_netcdf(filepath) + else: + logger.info(f"Would invert z-axis of {filepath}") + if flip_sign: + logger.info("Would flip sign of z-axis") + logger.info("Use `execute=True` to apply changes") diff --git a/src/pymorize/generic/__init__.py b/src/pymorize/generic/__init__.py deleted file mode 100644 index 7b54dcaf..00000000 --- a/src/pymorize/generic/__init__.py +++ /dev/null @@ -1,99 +0,0 @@ -from pathlib import Path - -import xarray as xr -from loguru import logger - - -def convert_units(filepath: Path, source_units: str, target_units: str): - """ - Converts the units of a NetCDF file, using the Pint library. - - Parameters - ---------- - filepath : Path - Path to the input file. - source_units : str - target_units : str - """ - ds = xr.open_dataset(filepath) - ds = ds.pint.quantify() - ds = ds.pint.to(target_units) - ds = ds.pint.dequantify() - logger.info(f"Converted units of {filepath} from {source_units} to {target_units}") - ds.to_netcdf(filepath) - - -def set_cmor_metadata(filepath: Path, cmor_metadata: dict, attrs_to_skip=[]): - """ - Adds CMOR metadata to a NetCDF file. - - Parameters - ---------- - filepath : Path - Path to the input file. - cmor_metadata : dict - Dictionary with the CMOR metadata to be added to the file. - attrs_to_skip : list of str, optional - List of attributes to skip when adding CMOR metadata. - """ - attrs_to_skip = attrs_to_skip or ["units", "cell_methods", "cell_measures"] - ds = xr.open_dataset(filepath) - for key, value in cmor_metadata.items(): - if key in attrs_to_skip: - continue - ds.attrs[key] = value - hist_str = ds.attrs.get("history", "") - hist_str += f"\n{datetime.now()}: CMOR metadata added by ``pymorize``\n" - ds.to_netcdf(filepath) - - -def linear_transform( - filepath: Path, execute: bool = False, slope: float = 1, offset: float = 0 -): - """ - Applies a linear transformation to the data of a NetCDF file. - - Parameters - ---------- - filepath : Path - Path to the input file. - execute : bool, optional - slope: float, optional - offset: float, optional - """ - if execute: - ds = xr.open_dataset(filepath) - ds = ds * slope + offset - logger.info(f"Applied linear transformation to {filepath}") - ds.to_netcdf(filepath) - else: - logger.info(f"Would apply linear transformation to {filepath}") - logger.info(f"slope: {slope}, offset: {offset}") - logger.info("Use `execute=True` to apply changes") - - -def invert_z_axis(filepath: Path, execute: bool = False, flip_sign: bool = False): - """ - Inverts the z-axis of a NetCDF file. - - Parameters - ---------- - filepath : Path - Path to the input file. - execute : bool, optional - If True, the function will execute the inversion. If False, it will - only print the changes that would be made. - """ - if execute: - ds = xr.open_dataset(filepath) - ds = ds.reindex(z=ds.z[::-1]) - logger.info(f"Inverted order of z-axis of {filepath}") - if flip_sign: - ds["z"] *= -1 - logger.info(f"Flipped sign of z-axis of {filepath}") - ds.to_netcdf(filepath) - else: - logger.info(f"Would invert z-axis of {filepath}") - if flip_sign: - logger.info("Would flip sign of z-axis") - logger.info("Use `execute=True` to apply changes") diff --git a/src/pymorize/pipeline.py b/src/pymorize/pipeline.py new file mode 100644 index 00000000..df4bfb58 --- /dev/null +++ b/src/pymorize/pipeline.py @@ -0,0 +1,33 @@ +""" +Pipeline of the data processing steps. +""" + +from .utils import get_callable_by_name + + +class Pipeline: + def __init__(self, *args): + self.steps = args + + def run(self, data): + for step in self.steps: + data = step(data) + return data + + @classmethod + def from_list(cls, steps): + return cls(*steps) + + @classmethod + def from_qualname_list(cls, qualnames: list): + return cls.from_list(get_callable_by_name(name) for name in qualnames) + + +class DefaultPipeline(Pipeline): + def __init__(self): + super().__init__( + get_callable_by_name("pymorize.generic.load_data"), + get_callable_by_name("extract_feacitures"), + get_callable_by_name("train"), + get_callable_by_name("evaluate"), + ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 00000000..2e59eb9e --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,11 @@ +from pymorize.pipeline import Pipeline + + +def test_basic_creation(): + pl = Pipeline() + + +def test_qualname_creation(): + pl = Pipeline.from_qualname_list( + ["pymorize.generic.convert_units", "pymorize.generic.set_cmor_metadata"] + )