Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test/save_dataset #72

Merged
merged 32 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e67e9c3
added tests for saving data
siligam Nov 19, 2024
b648a67
black formatted
siligam Nov 19, 2024
2d29cc9
added function documentation
siligam Nov 19, 2024
f75e20b
switched from switch statements to if statements for compatibility
siligam Nov 19, 2024
38cfc1e
added tests for get_time_label
siligam Nov 19, 2024
e160d65
Merge branch 'fix/ci-segfault' into test/save_dataset
siligam Nov 19, 2024
3d2cf73
a stab at ci-bug
siligam Nov 20, 2024
f8d50ee
stab 2 at ci-bug
siligam Nov 20, 2024
9ebc5ad
stab 3 ci
siligam Nov 20, 2024
e917f26
stab 4 ci
siligam Nov 20, 2024
6910d2f
reverting ci stabs
siligam Nov 20, 2024
3deac14
saving dataset requires parent folders to be created.
siligam Nov 20, 2024
f6b9de9
test: trying to isolate rule serialization
pgierz Nov 22, 2024
9969218
test: now with cache test
pgierz Nov 22, 2024
013a600
wip
pgierz Nov 22, 2024
1fa02eb
wip: wrong language keyword (oops)
pgierz Nov 22, 2024
33f2b75
wip
pgierz Nov 22, 2024
f4251c7
wip
pgierz Nov 22, 2024
4a0bb0d
wip
pgierz Nov 22, 2024
902ad39
wip
pgierz Nov 22, 2024
4c74678
wip
pgierz Nov 22, 2024
1f3470d
wip
pgierz Nov 22, 2024
6f9325a
wip 12345
pgierz Nov 22, 2024
6593c42
wip for pavan
pgierz Nov 22, 2024
54efe3b
refactored and cleaned up save-dataset
siligam Nov 24, 2024
d5842c8
lots of stuff, but importantly: skips broken serialized progressive s…
pgierz Nov 25, 2024
a247c47
re-includes approx_interval_for_table
pgierz Nov 25, 2024
c4fa80f
doctest 1
pgierz Nov 25, 2024
f6c32df
doctest 2
pgierz Nov 25, 2024
9bc13df
ci: longer timeout for prefect ephemeral server
pgierz Nov 25, 2024
257f3ed
Apply suggestions from code review
pgierz Nov 25, 2024
26ff426
test parameter file_timspan to save data to single file and mutiple f…
siligam Nov 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/CI-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ jobs:
export HDF5_DEBUG=1
export NETCDF_DEBUG=1
export XARRAY_BACKEND=h5netcdf
export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300
pytest -vvv -s --cov tests/meta/*.py
- name: Test with pytest (Unit)
run: |
export XARRAY_BACKEND=h5netcdf
export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300
pytest -vvv -s --cov tests/unit/*.py
- name: Test with pytest (Integration)
run: |
export XARRAY_BACKEND=h5netcdf
export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300
pytest -vvv -s --cov tests/integration/*.py
- name: Test with doctest
run: |
export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300
PYTHONPATH=src pytest -v --doctest-modules src/
13 changes: 9 additions & 4 deletions src/pymorize/cmorizer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from pathlib import Path

import dask # noqa: F401
Expand Down Expand Up @@ -173,7 +174,7 @@ def _post_init_populate_rules_with_tables(self):
for rule in self.rules:
for tbl in tables.values():
if rule.cmor_variable in tbl.variable_ids:
rule.add_table(tbl)
rule.add_table(tbl.table_id)

def _post_init_data_request_variables(self):
for drv in self.data_request.variables:
Expand Down Expand Up @@ -261,7 +262,8 @@ def _post_init_create_rules(self):

def _post_init_attach_pymorize_config_rules(self):
for rule in self.rules:
rule._pymorize_cfg = self._pymorize_cfg
# NOTE(PG): **COPY** (don't assign) the configuration to the rule
rule._pymorize_cfg = copy.deepcopy(self._pymorize_cfg)

def _post_init_inherit_rules(self):
for rule_attr, rule_value in self._inherit_cfg.items():
Expand Down Expand Up @@ -451,20 +453,23 @@ def _parallel_process_dask(self, external_client=None):
def serial_process(self):
data = {}
for rule in track(self.rules, description="Processing rules"):
data[rule] = self._process_rule(rule)
data[rule.name] = self._process_rule(rule)
logger.success("Processing completed.")
return data

def _process_rule(self, rule):
logger.info(f"Starting to process rule {rule}")
# Match up the pipelines:
# FIXME(PG): This might also be a place we need to consider copies...
rule.match_pipelines(self.pipelines)
data = None
# NOTE(PG): Send in a COPY of the rule, not the original rule
local_rule_copy = copy.deepcopy(rule)
if not len(rule.pipelines) > 0:
logger.error("No pipeline defined, something is wrong!")
for pipeline in rule.pipelines:
logger.info(f"Running {str(pipeline)}")
data = pipeline.run(data, rule)
data = pipeline.run(data, local_rule_copy)
return data

@task
Expand Down
13 changes: 7 additions & 6 deletions src/pymorize/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,25 @@

>>> engine = config("xarray_backend")
>>> print(f"Using xarray backend: {engine}")
'netcdf4'
Using xarray backend: netcdf4

>>> parallel = config("parallel")
>>> print(f"Running in parallel: {parallel}")
True
Running in parallel: True

You can define a user file at ``${XDG_CONFIG_DIR}/pymorize/pymorize.yaml``::

>>> import pathlib
>>> import yaml
>>> cfg_file = pathlib.Path("~/.config/pymorize/pymorize.yaml").expanduser()
>>> cfg_file.mkdir(parents=True, exist_ok=True)
>>> cfg_file.parent.mkdir(parents=True, exist_ok=True)
>>> cfg_to_dump = {"xarray_backend": "zarr"}
>>> with open(cfg_file, "w") as f:
... f.write("xarray_backend: zarr\nparallel: False\n")
... yaml.dump(cfg_to_dump, f)
>>> config = PymorizeConfigManager.from_pymorize_cfg()
>>> engine = config("xarray_backend")
>>> print(f"Using xarray backend: {engine}")
'zarr'

Using xarray backend: zarr

See Also
--------
Expand Down
1 change: 0 additions & 1 deletion src/pymorize/data_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ def __repr__(self):
class DataRequest:
"""Represents a data request with associated metadata."""

# NOTE(PG): Inherited from Ruby Seamore, not needed for now.
@staticmethod
def approx_interval_for_table(table_id):
return CMIP_FREQUENCIES[table_id]
Expand Down
112 changes: 112 additions & 0 deletions src/pymorize/dataset_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from collections import deque

import cftime
import numpy as np
import pandas as pd
from xarray.core.utils import is_scalar


def is_datetime_type(arr: np.ndarray) -> bool:
"Checks if array elements are datetime objects or cftime objects"
return isinstance(
arr.item(0), tuple(cftime._cftime.DATE_TYPES.values())
) or np.issubdtype(arr, np.datetime64)


def get_time_label(ds):
"""
Determines the name of the coordinate in the dataset that can serve as a time label.

Parameters
----------
ds : xarray.Dataset
The dataset containing coordinates to check for a time label.

Returns
-------
str or None
The name of the coordinate that is a datetime type and can serve as a time label,
or None if no such coordinate is found.

Example
-------
>>> import xarray as xr
>>> import pandas as pd
>>> import numpy as np
>>> ds = xr.Dataset({'time': ('time', pd.date_range('2000-01-01', periods=10))})
>>> get_time_label(ds)
'time'
>>> ds = xr.DataArray(np.ones(10), coords={'T': ('T', pd.date_range('2000-01-01', periods=10))})
>>> get_time_label(ds)
'T'
>>> # The following does have a valid time coordinate, expected to return None
>>> da = xr.Dataset({'time': ('time', [1,2,3,4,5])})
>>> get_time_label(da) is None
True
"""
label = deque()
for name, coord in ds.coords.items():
if not is_datetime_type(coord):
continue
if not coord.dims:
continue
if name in coord.dims:
label.appendleft(name)
else:
label.append(name)
label.append(None)
return label.popleft()


def has_time_axis(ds) -> bool:
"""
Checks if the given dataset has a time axis.

Parameters
----------
ds : xarray.Dataset or xarray.DataArray
The dataset to check.

Returns
-------
bool
True if the dataset has a time axis, False otherwise.
"""
return bool(get_time_label(ds))


def needs_resampling(ds, timespan):
"""
Checks if a given dataset needs resampling based on its time axis.

Parameters
----------
ds : xr.Dataset or xr.DataArray
The dataset to check.
timespan : str
The time span for which the dataset is to be resampled.
10YS, 1YS, 6MS, etc.

Returns
-------
bool
True if the dataset needs resampling, False otherwise.

Notes:
------
After time-averaging step, this function aids in determining if
splitting into multiple files is required based on provided
timespan.
"""
if (timespan is None) or (not timespan):
return False
time_label = get_time_label(ds)
if time_label is None:
return False
if is_scalar(ds[time_label]):
return False
# string representation is need to deal with cftime
start = pd.Timestamp(str(ds[time_label].data[0]))
end = pd.Timestamp(str(ds[time_label].data[-1]))
offset = pd.tseries.frequencies.to_offset(timespan)
return (start + offset) < end
Loading