Skip to content

Commit

Permalink
Merge pull request #72 from esm-tools/test/save_dataset
Browse files Browse the repository at this point in the history
Test/save_dataset
  • Loading branch information
pgierz authored Nov 25, 2024
2 parents 59f8681 + 26ff426 commit 4c61f1e
Show file tree
Hide file tree
Showing 14 changed files with 874 additions and 52 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/CI-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ jobs:
export HDF5_DEBUG=1
export NETCDF_DEBUG=1
export XARRAY_BACKEND=h5netcdf
export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300
pytest -vvv -s --cov tests/meta/*.py
- name: Test with pytest (Unit)
run: |
export XARRAY_BACKEND=h5netcdf
export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300
pytest -vvv -s --cov tests/unit/*.py
- name: Test with pytest (Integration)
run: |
export XARRAY_BACKEND=h5netcdf
export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300
pytest -vvv -s --cov tests/integration/*.py
- name: Test with doctest
run: |
export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300
PYTHONPATH=src pytest -v --doctest-modules src/
13 changes: 9 additions & 4 deletions src/pymorize/cmorizer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from pathlib import Path

import dask # noqa: F401
Expand Down Expand Up @@ -171,7 +172,7 @@ def _post_init_populate_rules_with_tables(self):
for rule in self.rules:
for tbl in tables.values():
if rule.cmor_variable in tbl.variable_ids:
rule.add_table(tbl)
rule.add_table(tbl.table_id)

def _post_init_data_request_variables(self):
for drv in self.data_request.variables:
Expand Down Expand Up @@ -259,7 +260,8 @@ def _post_init_create_rules(self):

def _post_init_attach_pymorize_config_rules(self):
for rule in self.rules:
rule._pymorize_cfg = self._pymorize_cfg
# NOTE(PG): **COPY** (don't assign) the configuration to the rule
rule._pymorize_cfg = copy.deepcopy(self._pymorize_cfg)

def _post_init_inherit_rules(self):
for rule_attr, rule_value in self._inherit_cfg.items():
Expand Down Expand Up @@ -449,20 +451,23 @@ def _parallel_process_dask(self, external_client=None):
def serial_process(self):
data = {}
for rule in track(self.rules, description="Processing rules"):
data[rule] = self._process_rule(rule)
data[rule.name] = self._process_rule(rule)
logger.success("Processing completed.")
return data

def _process_rule(self, rule):
logger.info(f"Starting to process rule {rule}")
# Match up the pipelines:
# FIXME(PG): This might also be a place we need to consider copies...
rule.match_pipelines(self.pipelines)
data = None
# NOTE(PG): Send in a COPY of the rule, not the original rule
local_rule_copy = copy.deepcopy(rule)
if not len(rule.pipelines) > 0:
logger.error("No pipeline defined, something is wrong!")
for pipeline in rule.pipelines:
logger.info(f"Running {str(pipeline)}")
data = pipeline.run(data, rule)
data = pipeline.run(data, local_rule_copy)
return data

@task
Expand Down
13 changes: 7 additions & 6 deletions src/pymorize/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,25 @@
>>> engine = config("xarray_backend")
>>> print(f"Using xarray backend: {engine}")
'netcdf4'
Using xarray backend: netcdf4
>>> parallel = config("parallel")
>>> print(f"Running in parallel: {parallel}")
True
Running in parallel: True
You can define a user file at ``${XDG_CONFIG_DIR}/pymorize/pymorize.yaml``::
>>> import pathlib
>>> import yaml
>>> cfg_file = pathlib.Path("~/.config/pymorize/pymorize.yaml").expanduser()
>>> cfg_file.mkdir(parents=True, exist_ok=True)
>>> cfg_file.parent.mkdir(parents=True, exist_ok=True)
>>> cfg_to_dump = {"xarray_backend": "zarr"}
>>> with open(cfg_file, "w") as f:
... f.write("xarray_backend: zarr\nparallel: False\n")
... yaml.dump(cfg_to_dump, f)
>>> config = PymorizeConfigManager.from_pymorize_cfg()
>>> engine = config("xarray_backend")
>>> print(f"Using xarray backend: {engine}")
'zarr'
Using xarray backend: zarr
See Also
--------
Expand Down
1 change: 0 additions & 1 deletion src/pymorize/data_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ def __repr__(self):
class DataRequest:
"""Represents a data request with associated metadata."""

# NOTE(PG): Inherited from Ruby Seamore, not needed for now.
@staticmethod
def approx_interval_for_table(table_id):
return CMIP_FREQUENCIES[table_id]
Expand Down
112 changes: 112 additions & 0 deletions src/pymorize/dataset_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from collections import deque

import cftime
import numpy as np
import pandas as pd
from xarray.core.utils import is_scalar


def is_datetime_type(arr: np.ndarray) -> bool:
"Checks if array elements are datetime objects or cftime objects"
return isinstance(
arr.item(0), tuple(cftime._cftime.DATE_TYPES.values())
) or np.issubdtype(arr, np.datetime64)


def get_time_label(ds):
"""
Determines the name of the coordinate in the dataset that can serve as a time label.
Parameters
----------
ds : xarray.Dataset
The dataset containing coordinates to check for a time label.
Returns
-------
str or None
The name of the coordinate that is a datetime type and can serve as a time label,
or None if no such coordinate is found.
Example
-------
>>> import xarray as xr
>>> import pandas as pd
>>> import numpy as np
>>> ds = xr.Dataset({'time': ('time', pd.date_range('2000-01-01', periods=10))})
>>> get_time_label(ds)
'time'
>>> ds = xr.DataArray(np.ones(10), coords={'T': ('T', pd.date_range('2000-01-01', periods=10))})
>>> get_time_label(ds)
'T'
>>> # The following does have a valid time coordinate, expected to return None
>>> da = xr.Dataset({'time': ('time', [1,2,3,4,5])})
>>> get_time_label(da) is None
True
"""
label = deque()
for name, coord in ds.coords.items():
if not is_datetime_type(coord):
continue
if not coord.dims:
continue
if name in coord.dims:
label.appendleft(name)
else:
label.append(name)
label.append(None)
return label.popleft()


def has_time_axis(ds) -> bool:
"""
Checks if the given dataset has a time axis.
Parameters
----------
ds : xarray.Dataset or xarray.DataArray
The dataset to check.
Returns
-------
bool
True if the dataset has a time axis, False otherwise.
"""
return bool(get_time_label(ds))


def needs_resampling(ds, timespan):
"""
Checks if a given dataset needs resampling based on its time axis.
Parameters
----------
ds : xr.Dataset or xr.DataArray
The dataset to check.
timespan : str
The time span for which the dataset is to be resampled.
10YS, 1YS, 6MS, etc.
Returns
-------
bool
True if the dataset needs resampling, False otherwise.
Notes:
------
After time-averaging step, this function aids in determining if
splitting into multiple files is required based on provided
timespan.
"""
if (timespan is None) or (not timespan):
return False
time_label = get_time_label(ds)
if time_label is None:
return False
if is_scalar(ds[time_label]):
return False
# string representation is need to deal with cftime
start = pd.Timestamp(str(ds[time_label].data[0]))
end = pd.Timestamp(str(ds[time_label].data[-1]))
offset = pd.tseries.frequencies.to_offset(timespan)
return (start + offset) < end
Loading

0 comments on commit 4c61f1e

Please sign in to comment.