Skip to content

Commit

Permalink
Merge pull request #60 from esm-tools/test/pi-mesh
Browse files Browse the repository at this point in the history
PI Mesh Test Fixtures
  • Loading branch information
pgierz authored Nov 13, 2024
2 parents f891f4d + 5a25fb1 commit 2949e1e
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 7 deletions.
2 changes: 2 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"tests.fixtures.config_files",
"tests.fixtures.CV_Dir",
"tests.fixtures.CMIP_Tables_Dir",
"tests.fixtures.example_data.fesom_2p6_pimesh",
"tests.fixtures.example_data.pi_uxarray",
]


Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def read(filename):
"flexparser < 0.4", # NOTE(PG): See https://tinyurl.com/ypf99xnh
"flox",
"imohash",
"netcdf4",
"numbagg",
"pendulum",
"pint-xarray",
Expand Down
2 changes: 1 addition & 1 deletion src/pymorize/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def generate_cache_key(task, inputs):
def manual_checkpoint(data, rule):
"""Manually insert a checkpoint in the flow"""
logger.info("Manually inserting checkpoint")
return Completed(message="Checkpoint reached", return_value=data)
return Completed(message="Checkpoint reached", data=data)


def inspect_cache(cache_dir="~/.prefect/storage"):
Expand Down
3 changes: 2 additions & 1 deletion src/pymorize/cmorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ def validate(self):
# :PS: @PG the following functions are not defined yet
# self._check_rules_for_table()
# self._check_rules_for_output_dir()
self._check_is_subperiod()
# FIXME(PS): Turn off this check, see GH #59 (https://tinyurl.com/3z7d8uuy)
# self._check_is_subperiod()

def _check_is_subperiod(self):
logger.info("checking frequency in netcdf file and in table...")
Expand Down
5 changes: 4 additions & 1 deletion src/pymorize/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,7 @@ def multiyear_monthly_mean(data, rule_spec, *args, **kwargs):


def trigger_compute(data, rule_spec, *args, **kwargs):
return data.compute()
if hasattr(data, "compute"):
return data.compute()
# Data doesn't have a compute method, do nothing
return data
13 changes: 9 additions & 4 deletions src/pymorize/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import randomname
from prefect import flow
from prefect.cache_policies import INPUTS, TASK_SOURCE
from prefect.tasks import Task, task_input_hash
from prefect.tasks import Task
from prefect_dask import DaskTaskRunner

from .caching import generate_cache_key
Expand All @@ -21,13 +21,18 @@ def __init__(
*args,
name=None,
workflow_backend="prefect",
cache_policy=None,
dask_cluster=None,
cache_expiration=None,
):
self._steps = args
self.name = name or randomname.get_name()
self._workflow_backend = workflow_backend
self._cluster = dask_cluster
self._prefect_cache_kwargs = {}
if cache_policy is None:
self._cache_policy = TASK_SOURCE + INPUTS
self._prefect_cache_kwargs["cache_policy"] = self._cache_policy

if cache_expiration is None:
self._cache_expiration = timedelta(days=1)
Expand All @@ -36,6 +41,7 @@ def __init__(
self._cache_expiration = cache_expiration
else:
raise TypeError("Cache expiration must be a timedelta!")
self._prefect_cache_kwargs["cache_expiration"] = self._cache_expiration

if self._workflow_backend == "prefect":
self._prefectize_steps()
Expand All @@ -51,7 +57,7 @@ def __str__(self):
return "\n".join(r_val)

def assign_cluster(self, cluster):
logger.debug("Assinging cluster to this pipeline")
logger.debug("Assigning cluster to this pipeline")
self._cluster = cluster

def _prefectize_steps(self):
Expand All @@ -64,9 +70,8 @@ def _prefectize_steps(self):
prefect_tasks.append(
Task(
fn=step,
**self._prefect_cache_kwargs,
# cache_key_fn=generate_cache_key,
cache_expiration=self._cache_expiration,
cache_policy=TASK_SOURCE + INPUTS,
)
)

Expand Down
23 changes: 23 additions & 0 deletions tests/configs/test_config_fesom_2p6_pimesh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
pymorize:
warn_on_no_rule: False
parallel: False
general:
name: "fesom_2p6_pimesh"
description: "This is a test configuration using esm-tools generated test data on PI Mesh"
maintainer: "pgierz"
email: "pgierz@awi.de"
cmor_version: "CMIP6"
mip: "CMIP"
frequency: "mon"
CMIP_Tables_Dir: "./cmip6-cmor-tables/Tables"
rules:
- name: "temp"
experiment_id: "piControl"
output_directory: "./output"
source_id: "FESOM"
variant_label: "r1i1p1f1"
inputs:
- path: "REPLACE_ME/outdata/fesom"
pattern: "temp.fesom..*.nc"
cmor_variable: "thetao"
model_variable: "temp"
23 changes: 23 additions & 0 deletions tests/configs/test_config_pi_uxarray.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
pymorize:
warn_on_no_rule: False
parallel: False
general:
name: "pi_uxarray"
description: "This is a test configuration using the UXArray test data on PI Mesh"
maintainer: "pgierz"
email: "pgierz@awi.de"
cmor_version: "CMIP6"
mip: "CMIP"
frequency: "mon"
CMIP_Tables_Dir: "./cmip6-cmor-tables/Tables"
rules:
- name: "temp"
experiment_id: "piControl"
output_directory: "./output"
source_id: "FESOM"
variant_label: "r1i1p1f1"
inputs:
- path: "REPLACE_ME"
pattern: "temp.fesom..*.nc"
cmor_variable: "thetao"
model_variable: "temp"
10 changes: 10 additions & 0 deletions tests/fixtures/config_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,13 @@
@pytest.fixture
def test_config():
return TEST_ROOT / "configs" / "test_config.yaml"


@pytest.fixture
def pi_uxarray_config():
return TEST_ROOT / "configs" / "test_config_pi_uxarray.yaml"


@pytest.fixture
def fesom_2p6_pimesh_esm_tools_config():
return TEST_ROOT / "configs" / "test_config_fesom_2p6_pimesh.yaml"
Empty file.
43 changes: 43 additions & 0 deletions tests/fixtures/example_data/fesom_2p6_pimesh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Example data for the FESOM model."""

import tarfile
from pathlib import Path

import pytest
import requests

URL = "https://nextcloud.awi.de/s/7gtFn38ZGifMAfw/download/fesom_2p6_pimesh.tar.gz"
"""str : URL to download the example data from."""


@pytest.fixture(scope="session")
def fesom_2p6_esm_tools_download_data(tmp_path_factory):
cache_dir = tmp_path_factory.getbasetemp() / "cached_data"
cache_dir.mkdir(exist_ok=True)
data_path = cache_dir / "fesom_2p6_pimesh.tar.gz"

if not data_path.exists():
response = requests.get(URL)
response.raise_for_status()
with open(data_path, "wb") as f:
f.write(response.content)
print(f"Data downloaded: {data_path}.")
else:
print(f"Using cached data: {data_path}.")

return data_path


@pytest.fixture(scope="session")
def fesom_2p6_pimesh_esm_tools_data(fesom_2p6_esm_tools_download_data):
data_dir = Path(fesom_2p6_esm_tools_download_data).parent / "fesom_2p6_pimesh"

# Extract only if the directory doesn't already exist
if not data_dir.exists():
with tarfile.open(fesom_2p6_esm_tools_download_data, "r:gz") as tar:
tar.extractall(data_dir.parent)
print(f"Data extracted to: {data_dir}.")
else:
print(f"Using cached extraction: {data_dir}.")

return data_dir
38 changes: 38 additions & 0 deletions tests/fixtures/example_data/pi_uxarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Example data for the FESOM model."""

import tarfile
from pathlib import Path

import pytest
import requests

URL = "https://nextcloud.awi.de/s/swqyFgbL2jjgjRo/download/pi_uxarray.tar"
"""str : URL to download the example data from."""


@pytest.fixture(scope="session")
def pi_uxarray_download_data(tmp_path_factory):
cache_dir = tmp_path_factory.getbasetemp() / "cached_data"
cache_dir.mkdir(exist_ok=True)
data_path = cache_dir / "pi_uxarray.tar"

if not data_path.exists():
response = requests.get(URL)
response.raise_for_status()
with open(data_path, "wb") as f:
f.write(response.content)
print(f"Data downloaded: {data_path}.")
else:
print(f"Using cached data: {data_path}.")

return data_path


@pytest.fixture(scope="session")
def pi_uxarray_data(pi_uxarray_download_data):

data_dir = Path(pi_uxarray_download_data).parent
with tarfile.open(pi_uxarray_download_data, "r") as tar:
tar.extractall(data_dir)

return data_dir / "pi_uxarray"
18 changes: 18 additions & 0 deletions tests/integration/test_fesom_2p6_pimesh_esm_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# import pytest
import yaml

from pymorize.cmorizer import CMORizer
from pymorize.logging import logger


def test_process(fesom_2p6_pimesh_esm_tools_config, fesom_2p6_pimesh_esm_tools_data):
logger.info(f"Processing {fesom_2p6_pimesh_esm_tools_config}")
with open(fesom_2p6_pimesh_esm_tools_config, "r") as f:
cfg = yaml.safe_load(f)
for rule in cfg["rules"]:
for input in rule["inputs"]:
input["path"] = input["path"].replace(
"REPLACE_ME", str(fesom_2p6_pimesh_esm_tools_data)
)
cmorizer = CMORizer.from_dict(cfg)
cmorizer.process()
16 changes: 16 additions & 0 deletions tests/integration/test_uxarray_pi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# import pytest
import yaml

from pymorize.cmorizer import CMORizer
from pymorize.logging import logger


def test_process(pi_uxarray_config, pi_uxarray_data):
logger.info(f"Processing {pi_uxarray_config}")
with open(pi_uxarray_config, "r") as f:
cfg = yaml.safe_load(f)
for rule in cfg["rules"]:
for input in rule["inputs"]:
input["path"] = input["path"].replace("REPLACE_ME", str(pi_uxarray_data))
cmorizer = CMORizer.from_dict(cfg)
cmorizer.process()

0 comments on commit 2949e1e

Please sign in to comment.