diff --git a/.github/workflows/CI-test.yaml b/.github/workflows/CI-test.yaml index 7e7cc15f..36169f2e 100644 --- a/.github/workflows/CI-test.yaml +++ b/.github/workflows/CI-test.yaml @@ -43,17 +43,17 @@ jobs: run: | export HDF5_DEBUG=1 export NETCDF_DEBUG=1 - export XARRAY_BACKEND=h5netcdf + export XARRAY_ENGINE=h5netcdf export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300 pytest -vvv -s --cov tests/meta/*.py - name: Test with pytest (Unit) run: | - export XARRAY_BACKEND=h5netcdf + export XARRAY_ENGINE=h5netcdf export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300 pytest -vvv -s --cov tests/unit/*.py - name: Test with pytest (Integration) run: | - export XARRAY_BACKEND=h5netcdf + export XARRAY_ENGINE=h5netcdf export PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=300 pytest -vvv -s --cov tests/integration/*.py - name: Test with doctest diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 00000000..217981b0 --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1,3 @@ +*.nc +slurm*.out +pymorize_report.log diff --git a/examples/cleanup.py b/examples/cleanup.py index 515c5e42..dac0bf99 100755 --- a/examples/cleanup.py +++ b/examples/cleanup.py @@ -6,6 +6,22 @@ from pathlib import Path +def rm_file(fname): + try: + fname.unlink(fname) + print(f"Removed file: {fname}") + except Exception as e: + print(f"Error removing file {fname}: {e}") + + +def rm_dir(dirname): + try: + shutil.rmtree(dirname) + print(f"Removed directory: {dirname}") + except Exception as e: + print(f"Error removing directory {dirname}: {e}") + + def cleanup(): current_dir = Path.cwd() @@ -15,34 +31,19 @@ def cleanup(): and item.name.startswith("slurm") and item.name.endswith("out") ): - try: - item.unlink() - print(f"Removed file: {item}") - except Exception as e: - print(f"Error removing file {item}: {e}") + rm_file(item) if ( item.is_file() and item.name.startswith("pymorize") and item.name.endswith("json") ): - try: - item.unlink() - print(f"Removed file: {item}") - except Exception as e: - print(f"Error removing file {item}: {e}") + rm_file(item) if item.is_file() and item.name.endswith("nc"): - try: - item.unlink() - print(f"Removed file: {item}") - except Exception as e: - print(f"Error removing file {item}: {e}") - + rm_file(item) + if item.name == "pymorize_report.log": + rm_file(item) elif item.is_dir() and item.name == "logs": - try: - shutil.rmtree(item) - print(f"Removed directory: {item}") - except Exception as e: - print(f"Error removing directory {item}: {e}") + rm_dir(item) print("Cleanup completed.") diff --git a/examples/pymorize.slurm b/examples/pymorize.slurm index 83141e1a..06b1a4d2 100644 --- a/examples/pymorize.slurm +++ b/examples/pymorize.slurm @@ -1,9 +1,10 @@ #!/bin/bash -l -#SBATCH --account=ab0246 +#SBATCH --job-name=pymorize-controller # <<< This is the main job, it will launch subjobs if you have Dask enabled. +#SBATCH --account=ab0246 # <<< Adapt this to your computing account! #SBATCH --partition=compute #SBATCH --nodes=1 -#SBATCH --time=00:30:00 -# export PREFECT_SERVER_ALLOW_EPHEMERAL_MODE=False +#SBATCH --time=00:30:00 # <<< You may need more time, adapt as needed! +export PREFECT_SERVER_ALLOW_EPHEMERAL_MODE=True export PREFECT_SERVER_API_HOST=0.0.0.0 conda activate pymorize prefect server start & diff --git a/examples/sample.yaml b/examples/sample.yaml index 01192e03..83794382 100644 --- a/examples/sample.yaml +++ b/examples/sample.yaml @@ -10,11 +10,14 @@ pymorize: # parallel: True warn_on_no_rule: False use_flox: True - cluster_mode: fixed + dask_cluster: "slurm" + dask_cluster_scaling_mode: fixed fixed_jobs: 12 # minimum_jobs: 8 # maximum_jobs: 30 - dimensionless_mapping_table: ../data/dimensionless_mappings.yaml + # You can add your own path to the dimensionless mapping table + # If nothing is specified here, it will use the built-in one. + # dimensionless_mapping_table: ../data/dimensionless_mappings.yaml rules: - name: paul_example_rule description: "You can put some text here" diff --git a/src/pymorize/cluster.py b/src/pymorize/cluster.py index b34d9759..86f0449b 100644 --- a/src/pymorize/cluster.py +++ b/src/pymorize/cluster.py @@ -3,9 +3,18 @@ """ import dask +from dask.distributed import LocalCluster +from dask_jobqueue import SLURMCluster from .logging import logger +CLUSTER_MAPPINGS = { + "local": LocalCluster, + "slurm": SLURMCluster, +} +CLUSTER_SCALE_SUPPORT = {"local": False, "slurm": True} +CLUSTER_ADAPT_SUPPORT = {"local": False, "slurm": True} + def set_dashboard_link(cluster): """ diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index 7c8567a5..52090286 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -10,13 +10,13 @@ import xarray as xr # noqa: F401 import yaml from dask.distributed import Client -from dask_jobqueue import SLURMCluster from everett.manager import generate_uppercase_key, get_runtime_config from prefect import flow, task from prefect.futures import wait from rich.progress import track -from .cluster import set_dashboard_link +from .cluster import (CLUSTER_ADAPT_SUPPORT, CLUSTER_MAPPINGS, + CLUSTER_SCALE_SUPPORT, set_dashboard_link) from .config import PymorizeConfig, PymorizeConfigManager from .data_request import (DataRequest, DataRequestTable, DataRequestVariable, IgnoreTableFiles) @@ -88,10 +88,13 @@ def __init__( ################################################################################ # Post_Init: - if self._pymorize_cfg("parallel"): - if self._pymorize_cfg("parallel_backend") == "dask": - self._post_init_configure_dask() - self._post_init_create_dask_cluster() + if self._pymorize_cfg("enable_dask"): + logger.debug("Setting up dask configuration...") + self._post_init_configure_dask() + logger.debug("...done!") + logger.debug("Creating dask cluster...") + self._post_init_create_dask_cluster() + logger.debug("...done!") self._post_init_create_pipelines() self._post_init_create_rules() self._post_init_read_bare_tables() @@ -99,6 +102,7 @@ def __init__( self._post_init_populate_rules_with_tables() self._post_init_read_dimensionless_unit_mappings() self._post_init_data_request_variables() + logger.debug("...post-init done!") ################################################################################ def _post_init_configure_dask(self): @@ -120,29 +124,42 @@ def _post_init_configure_dask(self): def _post_init_create_dask_cluster(self): # FIXME: In the future, we can support PBS, too. - logger.info("Setting up SLURMCluster...") - self._cluster = SLURMCluster() + logger.info("Setting up dask cluster...") + cluster_name = self._pymorize_cfg("dask_cluster") + ClusterClass = CLUSTER_MAPPINGS[cluster_name] + self._cluster = ClusterClass() set_dashboard_link(self._cluster) - cluster_mode = self._pymorize_cfg.get("cluster_mode", "adapt") - if cluster_mode == "adapt": - min_jobs = self._pymorize_cfg.get("minimum_jobs", 1) - max_jobs = self._pymorize_cfg.get("maximum_jobs", 10) - self._cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs) - elif cluster_mode == "fixed": - jobs = self._pymorize_cfg.get("fixed_jobs", 5) - self._cluster.scale(jobs=jobs) + cluster_scaling_mode = self._pymorize_cfg.get( + "dask_cluster_scaling_mode", "adapt" + ) + if cluster_scaling_mode == "adapt": + if CLUSTER_ADAPT_SUPPORT[cluster_name]: + min_jobs = self._pymorize_cfg.get( + "dask_cluster_scaling_minimum_jobs", 1 + ) + max_jobs = self._pymorize_cfg.get( + "dask_cluster_scaling_maximum_jobs", 10 + ) + self._cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs) + else: + logger.warning(f"{self._cluster} does not support adaptive scaling!") + elif cluster_scaling_mode == "fixed": + if CLUSTER_SCALE_SUPPORT[cluster_name]: + jobs = self._pymorize_cfg.get("dask_cluster_scaling_fixed_jobs", 5) + self._cluster.scale(jobs=jobs) + else: + logger.warning(f"{self._cluster} does not support fixed scaing") else: raise ValueError( - "You need to specify adapt or fixed for pymorize.cluster_mode" + "You need to specify adapt or fixed for pymorize.dask_cluster_scaling_mode" ) - # Wait for at least min_jobs to be available... - # FIXME: Client needs to be available here? - logger.info(f"SLURMCluster can be found at: {self._cluster=}") + # FIXME: Include the gateway option if possible + # FIXME: Does ``Client`` needs to be available here? + logger.info(f"Cluster can be found at: {self._cluster=}") logger.info(f"Dashboard {self._cluster.dashboard_link}") - # NOTE(PG): In CI context, os.getlogin and nodename may not be available (???) + username = getpass.getuser() nodename = getattr(os.uname(), "nodename", "UNKNOWN") - # FIXME: Include the gateway option if possible logger.info( "To see the dashboards run the following command in your computer's " "terminal:\n" @@ -152,7 +169,7 @@ def _post_init_create_dask_cluster(self): dask_extras = 0 logger.info("Importing Dask Extras...") - if self._pymorize_cfg.get("use_flox", True): + if self._pymorize_cfg.get("enable_flox", True): dask_extras += 1 logger.info("...flox...") import flox # noqa: F401 @@ -337,7 +354,9 @@ def validate(self): # self._check_rules_for_output_dir() # FIXME(PS): Turn off this check, see GH #59 (https://tinyurl.com/3z7d8uuy) # self._check_is_subperiod() + logger.debug("Starting validate....") self._check_units() + logger.debug("...done!") def _check_is_subperiod(self): logger.info("checking frequency in netcdf file and in table...") @@ -443,6 +462,7 @@ def from_dict(cls, data): instance._post_init_create_data_request() instance._post_init_data_request_variables() instance._post_init_read_dimensionless_unit_mappings() + logger.debug("Object creation done!") return instance def add_rule(self, rule): @@ -509,16 +529,23 @@ def check_rules_for_output_dir(self, output_dir): logger.warning(filepath) def process(self, parallel=None): + logger.debug("Process start!") if parallel is None: parallel = self._pymorize_cfg.get("parallel", True) if parallel: - parallel_backend = self._pymorize_cfg.get("parallel_backend", "prefect") - return self.parallel_process(backend=parallel_backend) + logger.debug("Parallel processing...") + # FIXME(PG): This is mixed up, hard-coding to prefect for now... + workflow_backend = self._pymorize_cfg.get( + "pipeline_orchestrator", "prefect" + ) + logger.debug(f"...with {workflow_backend}...") + return self.parallel_process(backend=workflow_backend) else: return self.serial_process() def parallel_process(self, backend="prefect"): if backend == "prefect": + logger.debug("About to submit _parallel_process_prefect()") return self._parallel_process_prefect() elif backend == "dask": return self._parallel_process_dask() @@ -529,6 +556,8 @@ def _parallel_process_prefect(self): # prefect_logger = get_run_logger() # logger = prefect_logger # @flow(task_runner=DaskTaskRunner(address=self._cluster.scheduler_address)) + logger.debug("Defining dynamically generated prefect workflow...") + @flow def dynamic_flow(): rule_results = [] @@ -537,6 +566,9 @@ def dynamic_flow(): wait(rule_results) return rule_results + logger.debug("...done!") + + logger.debug("About to return dynamic_flow()...") return dynamic_flow() def _parallel_process_dask(self, external_client=None): @@ -567,13 +599,11 @@ def _process_rule(self, rule): # FIXME(PG): This might also be a place we need to consider copies... rule.match_pipelines(self.pipelines) data = None - # NOTE(PG): Send in a COPY of the rule, not the original rule - local_rule_copy = copy.deepcopy(rule) if not len(rule.pipelines) > 0: logger.error("No pipeline defined, something is wrong!") for pipeline in rule.pipelines: logger.info(f"Running {str(pipeline)}") - data = pipeline.run(data, local_rule_copy) + data = pipeline.run(data, rule) return data @task diff --git a/src/pymorize/config.py b/src/pymorize/config.py index 617fe444..f182259e 100644 --- a/src/pymorize/config.py +++ b/src/pymorize/config.py @@ -42,7 +42,7 @@ >>> pymorize_cfg = {} >>> config = PymorizeConfigManager.from_pymorize_cfg(pymorize_cfg) - >>> engine = config("xarray_backend") + >>> engine = config("xarray_engine") >>> print(f"Using xarray backend: {engine}") Using xarray backend: netcdf4 @@ -56,11 +56,11 @@ >>> import yaml >>> cfg_file = pathlib.Path("~/.config/pymorize/pymorize.yaml").expanduser() >>> cfg_file.parent.mkdir(parents=True, exist_ok=True) - >>> cfg_to_dump = {"xarray_backend": "zarr"} + >>> cfg_to_dump = {"xarray_engine": "zarr"} >>> with open(cfg_file, "w") as f: ... yaml.dump(cfg_to_dump, f) >>> config = PymorizeConfigManager.from_pymorize_cfg() - >>> engine = config("xarray_backend") + >>> engine = config("xarray_engine") >>> print(f"Using xarray backend: {engine}") Using xarray backend: zarr @@ -71,6 +71,7 @@ import os import pathlib +from importlib.resources import files from everett import InvalidKeyError from everett.ext.yamlfile import ConfigYamlEnv @@ -78,6 +79,10 @@ ConfigOSEnv, Option, _get_component_name, parse_bool) +DIMENSIONLESS_MAPPING_TABLE = files("pymorize.data").joinpath( + "dimensionless_mappings.yaml" +) + def _parse_bool(value): if isinstance(value, bool): @@ -87,33 +92,85 @@ def _parse_bool(value): class PymorizeConfig: class Config: - quiet = Option( - default=False, doc="Whether to suppress output.", parser=_parse_bool + dask_cluster = Option( + default="local", + doc="Dask cluster to use. See: https://docs.dask.org/en/stable/deploying.html", + parser=ChoiceOf( + str, + choices=[ + "local", + "slurm", + ], + ), ) - xarray_backend = Option( - default="netcdf4", - doc="Which backend to use for xarray.", - parser=ChoiceOf(str, choices=["netcdf4", "h5netcdf", "zarr"]), + dask_cluster_scaling_mode = Option( + default="adapt", + doc="Flexible dask cluster scaling", + parser=ChoiceOf( + str, + choices=[ + "adapt", + "fixed", + ], + ), + ) + dask_cluster_scaling_minimum_jobs = Option( + parser=int, + default=1, + doc="Minimum number of jobs to create for Jobqueue-backed Dask Clusters (adaptive)", + ) + dask_cluster_scaling_maximum_jobs = Option( + parser=int, + default=10, + doc="Maximum number of jobs to create for Jobqueue-backed Dask Clusters (adaptive)", + ) + dask_cluster_scaling_fixed_jobs = Option( + parser=int, + default=5, + doc="Number of jobs to create for Jobqueue-backed Dask Cluster", + ) + dimensionless_mapping_table = Option( + parser=str, + default=DIMENSIONLESS_MAPPING_TABLE, + doc="Where the dimensionless unit mapping table is defined.", + ) + enable_dask = Option( + parser=_parse_bool, + default="yes", + doc="Whether to enable Dask-based processing", + ) + enable_flox = Option( + parser=_parse_bool, + default="yes", + doc="Whether to enable flox for group-by operation. See: https://flox.readthedocs.io/en/latest/", ) parallel = Option( parser=_parse_bool, default="yes", doc="Whether to run in parallel." ) parallel_backend = Option(default="dask", doc="Which parallel backend to use.") - cluster_mode = Option(default="adapt", doc="Flexible dask cluster scaling") - dask_scheduler = Option( - default="local_process", - doc="Dask scheduler to use.", - ) - prefect_backend = Option( - default="dask", doc="Which backend to use for Prefect." - ) - pipeline_orchestrator = Option( + pipeline_workflow_orcherstator = Option( default="prefect", - doc="Which orchestrator to use.", + doc="Which workflow orchestrator to use for running pipelines", + parser=ChoiceOf( + str, + choices=[ + "prefect", + ], + ), ) - prefect_flow_runner = Option( - default="local", + prefect_task_runner = Option( + default="thread_pool", doc="Which runner to use for Prefect flows.", + parser=ChoiceOf( + str, + choices=[ + "thread_pool", + "dask", + ], + ), + ) + quiet = Option( + default=False, doc="Whether to suppress output.", parser=_parse_bool ) raise_on_no_rule = Option( parser=_parse_bool, @@ -125,6 +182,18 @@ class Config: default="yes", doc="Whether or not to issue a warning if no rule is found for every single DataRequestVariable", ) + xarray_engine = Option( + default="netcdf4", + doc="Which engine to use for xarray.", + parser=ChoiceOf( + str, + choices=[ + "netcdf4", + "h5netcdf", + "zarr", + ], + ), + ) class PymorizeConfigManager(ConfigManager): diff --git a/src/pymorize/gather_inputs.py b/src/pymorize/gather_inputs.py index ce14f139..8f9d6393 100644 --- a/src/pymorize/gather_inputs.py +++ b/src/pymorize/gather_inputs.py @@ -31,11 +31,10 @@ def __init__(self, path, pattern, frequency=None, time_dim_name=None): self.frequency = frequency self.time_dim_name = time_dim_name - # def __iter__(self): @property def files(self): files = [] - for file in self.path.iterdir(): + for file in list(self.path.iterdir()): if self.pattern.match( file.name ): # Check if the filename matches the pattern @@ -268,7 +267,7 @@ def load_mfdataset(data, rule_spec): rule_spec : Rule Rule being handled """ - engine = rule_spec._pymorize_cfg("xarray_backend") + engine = rule_spec._pymorize_cfg("xarray_engine") all_files = [] for file_collection in rule_spec.inputs: for f in file_collection.files: diff --git a/src/pymorize/rule.py b/src/pymorize/rule.py index 829d8f44..4c98a247 100644 --- a/src/pymorize/rule.py +++ b/src/pymorize/rule.py @@ -3,7 +3,8 @@ import typing import warnings -import deprecation +# import deprecation + # import questionary import yaml @@ -17,11 +18,11 @@ def __init__( self, *, name: str = None, - inputs: typing.List[dict] = [], + inputs: typing.List[dict] = None, cmor_variable: str, - pipelines: typing.List[pipeline.Pipeline] = [], - tables: typing.List[data_request.DataRequestTable] = [], - data_request_variables: typing.List[data_request.DataRequestVariable] = [], + pipelines: typing.List[pipeline.Pipeline] = None, + tables: typing.List[data_request.DataRequestTable] = None, + data_request_variables: typing.List[data_request.DataRequestVariable] = None, **kwargs, ): """ @@ -43,11 +44,13 @@ def __init__( The DataRequestVariables this rule should create """ self.name = name - self.inputs = [InputFileCollection.from_dict(inp_dict) for inp_dict in inputs] + self.inputs = [ + InputFileCollection.from_dict(inp_dict) for inp_dict in (inputs or []) + ] self.cmor_variable = cmor_variable self._pipelines = pipelines or [pipeline.DefaultPipeline()] - self.tables = tables - self.data_request_variables = data_request_variables + self.tables = tables or [] + self.data_request_variables = data_request_variables or [] # NOTE(PG): I'm not sure I really like this part. It is too magical and makes the object's public API unclear. # Attach all keyword arguments to the object for key, value in kwargs.items(): @@ -56,6 +59,11 @@ def __init__( # Internal flags: self._pipelines_are_mapped = False + def __getstate__(self): + """Custom pickling of a Rule""" + state = self.__dict__.copy() + return state + @property def pipelines(self): """ @@ -199,15 +207,15 @@ def from_yaml(cls, yaml_str): """Wrapper around ``from_dict`` for initializing from YAML""" return cls.from_dict(yaml.safe_load(yaml_str)) - @deprecation.deprecated(details="This shouldn't be used, avoid it") - def to_yaml(self): - return yaml.dump( - { - "inputs": [p.to_dict for p in self.input_patterns], - "cmor_variable": self.cmor_variable, - "pipelines": [p.to_dict() for p in self.pipelines], - } - ) + # @deprecation.deprecated(details="This shouldn't be used, avoid it") + # def to_yaml(self): + # return yaml.dump( + # { + # "inputs": [p.to_dict() for p in self.input_patterns], + # "cmor_variable": self.cmor_variable, + # "pipelines": [p.to_dict() for p in self.pipelines], + # } + # ) def add_table(self, tbl): """Add a table to the rule""" diff --git a/tests/unit/test_units.py b/tests/unit/test_units.py index 3bd8a8d3..a4fa50c8 100644 --- a/tests/unit/test_units.py +++ b/tests/unit/test_units.py @@ -194,6 +194,7 @@ def test_units_with_g_kg_to_0001_g_kg(rule_sos, CMIP_Tables_Dir): cmorizer = CMORizer( pymorize_cfg={ "parallel": False, + "enable_dask": False, }, general_cfg={"CMIP_Tables_Dir": CMIP_Tables_Dir}, rules_cfg=[rule_sos], @@ -211,6 +212,7 @@ def test_units_with_g_g_to_0001_g_kg(rule_sos, CMIP_Tables_Dir): cmorizer = CMORizer( pymorize_cfg={ "parallel": False, + "enable_dask": False, }, general_cfg={"CMIP_Tables_Dir": CMIP_Tables_Dir}, rules_cfg=[rule_sos],