diff --git a/gaps/cli/config.py b/gaps/cli/config.py index 85b35c2f..27ed12ba 100644 --- a/gaps/cli/config.py +++ b/gaps/cli/config.py @@ -77,6 +77,10 @@ def __init__(self, ctx, config_file, command_config): self.exec_kwargs = None self.logging_options = None self.exclude_from_status = None + self._include_tag_in_out_fpath = ( + self.command_config.is_split_spatially + and self.config.get("execution_control", {}).get("nodes", 1) > 1 + ) @property def project_dir(self): @@ -228,16 +232,9 @@ def kickoff_jobs(self): jobs = sorted(product(*lists_to_run)) num_jobs_submit = len(jobs) self._warn_about_excessive_au_usage(num_jobs_submit) - n_zfill = len(str(max(0, num_jobs_submit - 1))) - extra_exec_args = {} - for param in EXTRA_EXEC_PARAMS: - if param in self.exec_kwargs: - extra_exec_args[param] = self.exec_kwargs.pop(param) + extra_exec_args = self._extract_extra_exec_args_for_command() for node_index, values in enumerate(jobs): - if num_jobs_submit > 1: - tag = f"{TAG}{str(node_index).zfill(n_zfill)}" - else: - tag = "" + tag = _tag(node_index, num_jobs_submit) self.ctx.obj["NAME"] = job_name = f"{self.job_name}{tag}" node_specific_config = deepcopy(self.config) node_specific_config.pop("execution_control", None) @@ -249,7 +246,7 @@ def kickoff_jobs(self): "project_dir": self.project_dir.as_posix(), "job_name": job_name, "out_dir": self.project_dir.as_posix(), - "out_fpath": (self.project_dir / job_name).as_posix(), + "out_fpath": self._suggested_stem(job_name).as_posix(), "run_method": getattr( self.command_config, "run_method", None ), @@ -278,6 +275,21 @@ def kickoff_jobs(self): return self + def _suggested_stem(self, job_name_with_tag): + """Determine suggested filepath with filename stem.""" + if self._include_tag_in_out_fpath: + return self.project_dir / job_name_with_tag + return self.project_dir / self.job_name + + def _extract_extra_exec_args_for_command(self): + """Dictionary of function args from the exec block.""" + extra_exec_args = {} + for param in EXTRA_EXEC_PARAMS: + if param not in self.exec_kwargs: + continue + extra_exec_args[param] = self.exec_kwargs.pop(param) + return extra_exec_args + def _keys_and_lists_to_run(self): """Compile run lists based on `command_config.split_keys` input.""" keys_to_run = [] @@ -422,6 +434,14 @@ def _project_points_last(key): return key +def _tag(node_index, num_jobs): + """Determine node tag based on total number of jobs.""" + n_zfill = len(str(max(0, num_jobs - 1))) + if num_jobs > 1: + return f"{TAG}{str(node_index).zfill(n_zfill)}" + return "" + + def as_script_str(input_): """Convert input to how it would appear in a python script. diff --git a/gaps/version.py b/gaps/version.py index db1521ef..261f8d19 100644 --- a/gaps/version.py +++ b/gaps/version.py @@ -1,3 +1,3 @@ """GAPs Version Number. """ -__version__ = "0.4.4" +__version__ = "0.4.5" diff --git a/tests/cli/test_cli_config.py b/tests/cli/test_cli_config.py index 82d29072..722d9308 100644 --- a/tests/cli/test_cli_config.py +++ b/tests/cli/test_cli_config.py @@ -18,6 +18,7 @@ ) from gaps.cli.documentation import CommandDocumentation from gaps.cli.config import ( + TAG, as_script_str, from_config, run_with_status_updates, @@ -104,6 +105,74 @@ def _testing_function( return out_fp.as_posix() +def _testing_function_no_pp( + input1, + input3, + tag, + command_name, + config_file, + project_dir, + job_name, + out_dir, + out_fpath, + max_workers, + pool_size=16, + _input2=None, + _z_0=None, +): + """Test function to make CLI around. + + Parameters + ---------- + input1 : int + Input 1. + input3 : str + Input 3. + tag : str + Internal GAPs tag. + command_name : str + Internal GAPs command name. + config_file : str + Internal GAPs path to config file. + project_dir : str + Internal GAPs Path to project dir. + job_name : str + Internal GAPs job name. + out_dir : str + Internal GAPs path to out dir. + out_fpath : str + Internal GAPs out filepath. + max_workers : int + Max workers. + pool_size : int, optional + Pool size. By default, ``16``. + _input2 : float, optional + Secret input 2. By default, ``None``. + _z_0 : str, optional + Secret str. By default, ``None``. + """ + out_fp = Path(out_dir) / f"out{tag}.json" + out_vals = { + "input1": input1, + "_input2": _input2, + "input3": input3, + "max_workers": max_workers, + "pool_size": pool_size, + "_z_0": _z_0, + "out_fpath": out_fpath, + "out_dir": out_dir, + "tag": tag, + "command_name": command_name, + "config_file": config_file, + "project_dir": project_dir, + "job_name": job_name, + } + with open(out_fp, "w") as out_file: + json.dump(out_vals, out_file) + + return out_fp.as_posix() + + class TestCommand: """Test command class.""" @@ -188,6 +257,65 @@ def run( return out_fp.as_posix() + def run_no_pp( + self, + tag, + command_name, + config_file, + project_dir, + job_name, + out_dir, + out_fpath, + max_workers, + pool_size=16, + _z_0=None, + ): + """Test run function for CLI around. + + Parameters + ---------- + tag : str + Internal GAPs tag. + command_name : str + Internal GAPs command name. + config_file : str + Internal GAPs path to config file. + project_dir : str + Internal GAPs Path to project dir. + job_name : str + Internal GAPs job name. + out_dir : str + Internal GAPs path to out dir. + out_fpath : str + Internal GAPs out filepath. + max_workers : int + Max workers. + pool_size : int, optional + Pool size. By default, ``16``. + _z_0 : str, optional + Secret str. By default, ``None``. + """ + out_fp = Path(out_dir) / f"out{tag}.json" + out_vals = { + "input1": self._input1, + "_input2": self._input2, + "input3": self._input3, + "max_workers": max_workers, + "pool_size": pool_size, + "_z_0": _z_0, + "out_fpath": out_fpath, + "out_dir": out_dir, + "tag": tag, + "command_name": command_name, + "config_file": config_file, + "project_dir": project_dir, + "job_name": job_name, + } + with open(out_fp, "w") as out_file: + json.dump(out_vals, out_file) + + return out_fp.as_posix() + @pytest.fixture def runnable_script(): @@ -369,9 +497,9 @@ def test_run_multiple_nodes( assert len(set(job_names_cache)) == 4 for job_name, script in job_names_cache.items(): - if "_j0" in job_name or "_j1" in job_name: + if f"{TAG}0" in job_name or f"{TAG}1" in job_name: assert '"_z_0": "strings"' in script - elif "_j2" in job_name or "_j3" in job_name: + elif f"{TAG}2" in job_name or f"{TAG}3" in job_name: assert '"_z_0": "unsorted"' in script else: raise ValueError( @@ -475,11 +603,120 @@ def test_run_no_split_keys( assert len(set(job_names_cache)) == 1 for job_name, script in job_names_cache.items(): - assert "_j0" not in job_name + assert f"{TAG}0" not in job_name assert "[0, 1, 2, 4]" in script assert '["unsorted", "strings"]' in script +@pytest.mark.parametrize("test_class", [False, True]) +def test_run_single_node_out_fpath( + test_ctx, runnable_script, monkeypatch, test_class, job_names_cache +): + """Test the `run` function with no split keys specified.""" + + tmp_path = test_ctx.obj["TMP_PATH"] + + if test_class: + command_config = CLICommandFromClass( + TestCommand, + "run", + name="run", + split_keys={"project_points", "_z_0"}, + ) + else: + command_config = CLICommandFromFunction( + _testing_function, + name="run", + split_keys={"project_points", "_z_0"}, + ) + + config = { + "execution_control": { + "option": "eagle", + "allocation": "test", + "walltime": 1, + "nodes": 1, + "max_workers": 1, + }, + "input1": 1, + "input2": 7, + "input3": 8, + "_z_0": ["unsorted", "strings"], + "project_points": [0, 1, 2, 4], + } + + config_fp = tmp_path / "config.json" + with open(config_fp, "w") as config_file: + json.dump(config, config_file) + + assert len(job_names_cache) == 0 + from_config(config_fp, command_config) + assert len(job_names_cache) == 2 + assert len(set(job_names_cache)) == 2 + + for job_name, script in job_names_cache.items(): + for substr in script.split(","): + if '"out_fpath"' not in substr: + continue + fn = Path(substr.split(":")[-1].strip()).name + assert f"{TAG}" not in fn + assert f"{TAG}" in job_name + + +@pytest.mark.parametrize("test_class", [False, True]) +def test_run_split_key_only( + test_ctx, runnable_script, monkeypatch, test_class, job_names_cache +): + """Test the `run` function with no split keys specified.""" + + tmp_path = test_ctx.obj["TMP_PATH"] + + if test_class: + command_config = CLICommandFromClass( + TestCommand, + "run_no_pp", + name="run", + split_keys={"_z_0"}, + ) + else: + command_config = CLICommandFromFunction( + _testing_function_no_pp, + name="run", + split_keys={"_z_0"}, + ) + + config = { + "execution_control": { + "option": "eagle", + "allocation": "test", + "walltime": 1, + "nodes": 2, + "max_workers": 1, + }, + "input1": 1, + "input2": 7, + "input3": 8, + "_z_0": ["unsorted", "strings"], + } + + config_fp = tmp_path / "config.json" + with open(config_fp, "w") as config_file: + json.dump(config, config_file) + + assert len(job_names_cache) == 0 + from_config(config_fp, command_config) + assert len(job_names_cache) == 2 + assert len(set(job_names_cache)) == 2 + + for job_name, script in job_names_cache.items(): + for substr in script.split(","): + if '"out_fpath"' not in substr: + continue + fn = Path(substr.split(":")[-1].strip()).name + assert f"{TAG}" not in fn + assert f"{TAG}" in job_name + + @pytest.mark.parametrize("test_class", [False, True]) def test_run_empty_split_keys( test_ctx, runnable_script, monkeypatch, test_class, job_names_cache @@ -526,7 +763,7 @@ def test_run_empty_split_keys( assert len(set(job_names_cache)) == 1 for job_name, script in job_names_cache.items(): - assert "_j0" not in job_name + assert f"{TAG}0" not in job_name assert "[0, 1, 2, 4]" in script assert '"_z_0": None' in script @@ -648,27 +885,27 @@ def test_run_parallel_split_keys( assert len(set(job_names_cache)) == 6 for job_name, script in job_names_cache.items(): - if "_j0" in job_name: + if f"{TAG}0" in job_name: assert '"_z_0": "strings"' in script assert '"input1": 1' in script assert '"input3": 4' in script - elif "_j1" in job_name: + elif f"{TAG}1" in job_name: assert '"_z_0": "strings"' in script assert '"input1": 2' in script assert '"input3": 5' in script - elif "_j2" in job_name: + elif f"{TAG}2" in job_name: assert '"_z_0": "strings"' in script assert '"input1": 3' in script assert '"input3": 6' in script - elif "_j3" in job_name: + elif f"{TAG}3" in job_name: assert '"_z_0": "unsorted"' in script assert '"input1": 1' in script assert '"input3": 4' in script - elif "_j4" in job_name: + elif f"{TAG}4" in job_name: assert '"_z_0": "unsorted"' in script assert '"input1": 2' in script assert '"input3": 5' in script - elif "_j5" in job_name: + elif f"{TAG}5" in job_name: assert '"_z_0": "unsorted"' in script assert '"input1": 3' in script assert '"input3": 6' in script @@ -756,8 +993,9 @@ def test_run_local_multiple_out_files(test_ctx, runnable_script, test_class): json.dump(config, config_file) from_config(config_fp, command_config) + out_fns = [f"out{TAG}0.json", f"out{TAG}1.json"] - for out_fn, in3 in zip(["out_j0.json", "out_j1.json"], config["input3"]): + for out_fn, in3 in zip(out_fns, config["input3"]): expected_file = tmp_path / out_fn assert expected_file.exists() with open(expected_file, "r") as output_file: @@ -774,7 +1012,7 @@ def test_run_local_multiple_out_files(test_ctx, runnable_script, test_class): assert len(status["run"]) == 2 for job_name in status["run"]: assert f"{tmp_path.name}_run" in job_name - assert "_j" in job_name + assert f"{TAG}" in job_name @pytest.mark.parametrize("test_class", [False, True])