Skip to content

Commit

Permalink
wip: first idea for pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
pgierz committed Jul 16, 2024
1 parent 938e286 commit 99b283b
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 117 deletions.
112 changes: 94 additions & 18 deletions src/pymorize/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,6 @@
- `linear_transform`: Applies a linear transformation to the data of a NetCDF file.
- `invert_z_axis`: Inverts the z-axis of a NetCDF file.
Classes:
- `Rule`: Defines a set of instructions to process a file and convert it to a CMOR standard.
- `CMORMetadata`: Inherits from `Rule` and adds CMOR metadata to a NetCDF file.
- `CMORizer`: Manages a set of `Rule` objects and applies them to NetCDF files.
Each `Rule` object represents a set of transformations that should be applied to a NetCDF file to
make it comply with the CMOR standard. The `CMORizer` class manages a collection of `Rule` objects and
applies the appropriate ones to each file.
The `CMORMetadata` class is a special type of `Rule` that adds CMOR metadata to a file.
The Full CMOR (yes, bad pun):
* Applied if no other rule sets are given for a file
* Adds CMOR metadata to the file
Expand All @@ -27,15 +16,102 @@
"""

import datetime
import json
import re
import sys
from functools import partial
from pathlib import Path

import questionary
import xarray as xr
import yaml
from loguru import logger

from .utils import generate_partial_function, get_callable_by_name

def convert_units(filepath: Path, source_units: str, target_units: str):
"""
Converts the units of a NetCDF file, using the Pint library.
Parameters
----------
filepath : Path
Path to the input file.
source_units : str
target_units : str
"""
ds = xr.open_dataset(filepath)
ds = ds.pint.quantify()
ds = ds.pint.to(target_units)
ds = ds.pint.dequantify()
logger.info(f"Converted units of {filepath} from {source_units} to {target_units}")
ds.to_netcdf(filepath)


def set_cmor_metadata(filepath: Path, cmor_metadata: dict, attrs_to_skip=[]):
"""
Adds CMOR metadata to a NetCDF file.
Parameters
----------
filepath : Path
Path to the input file.
cmor_metadata : dict
Dictionary with the CMOR metadata to be added to the file.
attrs_to_skip : list of str, optional
List of attributes to skip when adding CMOR metadata.
"""
attrs_to_skip = attrs_to_skip or ["units", "cell_methods", "cell_measures"]
ds = xr.open_dataset(filepath)
for key, value in cmor_metadata.items():
if key in attrs_to_skip:
continue
ds.attrs[key] = value
hist_str = ds.attrs.get("history", "")
hist_str += f"\n{datetime.now()}: CMOR metadata added by ``pymorize``\n"
ds.to_netcdf(filepath)


def linear_transform(
filepath: Path, execute: bool = False, slope: float = 1, offset: float = 0
):
"""
Applies a linear transformation to the data of a NetCDF file.
Parameters
----------
filepath : Path
Path to the input file.
execute : bool, optional
slope: float, optional
offset: float, optional
"""
if execute:
ds = xr.open_dataset(filepath)
ds = ds * slope + offset
logger.info(f"Applied linear transformation to {filepath}")
ds.to_netcdf(filepath)
else:
logger.info(f"Would apply linear transformation to {filepath}")
logger.info(f"slope: {slope}, offset: {offset}")
logger.info("Use `execute=True` to apply changes")


def invert_z_axis(filepath: Path, execute: bool = False, flip_sign: bool = False):
"""
Inverts the z-axis of a NetCDF file.
Parameters
----------
filepath : Path
Path to the input file.
execute : bool, optional
If True, the function will execute the inversion. If False, it will
only print the changes that would be made.
"""
if execute:
ds = xr.open_dataset(filepath)
ds = ds.reindex(z=ds.z[::-1])
logger.info(f"Inverted order of z-axis of {filepath}")
if flip_sign:
ds["z"] *= -1
logger.info(f"Flipped sign of z-axis of {filepath}")
ds.to_netcdf(filepath)
else:
logger.info(f"Would invert z-axis of {filepath}")
if flip_sign:
logger.info("Would flip sign of z-axis")
logger.info("Use `execute=True` to apply changes")
99 changes: 0 additions & 99 deletions src/pymorize/generic/__init__.py

This file was deleted.

33 changes: 33 additions & 0 deletions src/pymorize/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
Pipeline of the data processing steps.
"""

from .utils import get_callable_by_name


class Pipeline:
def __init__(self, *args):
self.steps = args

def run(self, data):
for step in self.steps:
data = step(data)
return data

@classmethod
def from_list(cls, steps):
return cls(*steps)

@classmethod
def from_qualname_list(cls, qualnames: list):
return cls.from_list(get_callable_by_name(name) for name in qualnames)


class DefaultPipeline(Pipeline):
def __init__(self):
super().__init__(
get_callable_by_name("pymorize.generic.load_data"),
get_callable_by_name("extract_feacitures"),
get_callable_by_name("train"),
get_callable_by_name("evaluate"),
)
Empty file added tests/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pymorize.pipeline import Pipeline


def test_basic_creation():
pl = Pipeline()


def test_qualname_creation():
pl = Pipeline.from_qualname_list(
["pymorize.generic.convert_units", "pymorize.generic.set_cmor_metadata"]
)

0 comments on commit 99b283b

Please sign in to comment.