diff --git a/README.rst b/README.rst index 936eae4..e11c802 100644 --- a/README.rst +++ b/README.rst @@ -359,6 +359,7 @@ The project is reasonably easy: Changes ------- +- 1.0.1: Dispatcher now return jobs references instead of job ids. This allows to do some fancier stuff in the future, when the jobs infos are only available a short time after the job has been submitted. - 0.10.1: FIX: Listing functions will no longer execute setup functions. - 0.10.0: `Batch` is now named `JobBundling`. There is a method `join` for easier synchronization. `exec` allows to executed commands just like `srun` and `sbatch`, but uniform syntax with other slurmified functions. Functions can now also be called with `distribute_and_wait`. If you call `python3 -m slurminade.check --partition YOUR_PARTITION --constraint YOUR_CONSTRAINT` you can check if your slurm configuration is running correctly. - 0.9.0: Lots of improvements. diff --git a/pyproject.toml b/pyproject.toml index ac8d859..3a6d25d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ where = ["src"] [project] name = "slurminade" -version = "0.10.1" +version = "1.0.1" authors = [ { name = "TU Braunschweig, IBR, Algorithms Group (Dominik Krupke)", email = "krupke@ibr.cs.tu-bs.de" }, ] diff --git a/src/slurminade/bundling.py b/src/slurminade/bundling.py index e3b7333..8f4f528 100644 --- a/src/slurminade/bundling.py +++ b/src/slurminade/bundling.py @@ -14,7 +14,21 @@ from .function import SlurmFunction from .guard import BatchGuard from .options import SlurmOptions +from .job_reference import JobReference +class BundlingJobReference(JobReference): + def __init__(self) -> None: + super().__init__() + pass + + def get_job_id(self) -> typing.Optional[int]: + return None + + def get_exit_code(self) -> typing.Optional[int]: + return None + + def get_info(self) -> typing.Dict[str, typing.Any]: + return {} class TaskBuffer: """ @@ -116,20 +130,20 @@ def _dispatch( funcs: typing.Iterable[FunctionCall], options: SlurmOptions, block: bool = False, - ) -> int: + ) -> JobReference: if block: # if blocking, we don't buffer, but dispatch immediately return self.subdispatcher._dispatch(funcs, options, block=True) for func in funcs: self._tasks.add(func, options) - return -1 + return BundlingJobReference() def srun( self, command: str, conf: typing.Optional[typing.Dict] = None, simple_slurm_kwargs: typing.Optional[typing.Dict] = None, - ): + ) -> JobReference: conf = SlurmOptions(conf if conf else {}) return self.subdispatcher.srun(command, conf, simple_slurm_kwargs) @@ -138,7 +152,7 @@ def sbatch( command: str, conf: typing.Optional[typing.Dict] = None, simple_slurm_kwargs: typing.Optional[typing.Dict] = None, - ): + ) -> JobReference: conf = SlurmOptions(conf if conf else {}) return self.subdispatcher.sbatch(command, conf, simple_slurm_kwargs) diff --git a/src/slurminade/dispatcher.py b/src/slurminade/dispatcher.py index a835f4d..059e354 100644 --- a/src/slurminade/dispatcher.py +++ b/src/slurminade/dispatcher.py @@ -11,7 +11,7 @@ import shutil import subprocess import typing - +from typing import Any, Dict, Optional import simple_slurm from .conf import _get_conf @@ -24,6 +24,7 @@ # MAX_ARG_STRLEN on a Linux system with PAGE_SIZE 4096 is 131072 DEFAULT_MAX_ARG_LENGTH = 100000 +from .job_reference import JobReference class Dispatcher(abc.ABC): """ @@ -38,7 +39,7 @@ def _dispatch( funcs: typing.Iterable[FunctionCall], options: SlurmOptions, block: bool = False, - ) -> int: + ) -> JobReference: """ Define how to dispatch a number of function calls. :param funcs: The function calls to be dispatched. @@ -52,7 +53,7 @@ def srun( command: str, conf: typing.Optional[SlurmOptions] = None, simple_slurm_kwargs: typing.Optional[typing.Dict] = None, - ) -> int: + ) -> JobReference: """ Define how you want to execute an `srun` command. This command is directly executed and only terminates after completion. @@ -68,7 +69,7 @@ def sbatch( command: str, conf: typing.Optional[SlurmOptions] = None, simple_slurm_kwargs: typing.Optional[typing.Dict] = None, - ) -> int: + ) -> JobReference: """ Define how you want to execute an `sbatch` command. The command is scheduled and the function return immediately. @@ -93,7 +94,7 @@ def __call__( funcs: typing.Union[FunctionCall, typing.Iterable[FunctionCall]], options: SlurmOptions, block: bool = False, - ) -> int: + ) -> JobReference: """ Dispatches a function call or a number of function calls. :param funcs: The function calls to be distributed. @@ -124,6 +125,15 @@ def join(self): msg = "Joining is not implemented for this dispatcher." raise NotImplementedError(msg) +class TestJobReference(JobReference): + def get_job_id(self) -> None: + return None + + def get_exit_code(self) -> None: + return None + + def get_info(self) -> Dict[str, Any]: + return {"info": "test"} class TestDispatcher(Dispatcher): """ @@ -142,7 +152,7 @@ def _dispatch( funcs: typing.Iterable[FunctionCall], options: SlurmOptions, block: bool = False, - ) -> int: + ) -> JobReference: dispatch_guard() funcs = list(funcs) command = create_slurminade_command( @@ -151,7 +161,7 @@ def _dispatch( logging.getLogger("slurminade").info(command) self.calls.append(funcs) self._cleanup(command) - return -1 + return TestJobReference() def _cleanup(self, command): args = shlex.split(command) @@ -170,6 +180,7 @@ def srun( dispatch_guard() self.sruns.append(command) logging.getLogger("slurminade").info("[test output] SRUN %s", command) + return TestJobReference() def sbatch( self, @@ -180,10 +191,28 @@ def sbatch( dispatch_guard() self.sbatches.append(command) logging.getLogger("slurminade").info("[test output] SBATCH %s", command) + return TestJobReference() def is_sequential(self): return True +class SlurmJobReference(JobReference): + def __init__(self, job_id, exit_code, mode: str): + self.job_id = job_id + self.exit_code = exit_code + self.mode = mode + + def get_job_id(self) -> int: + return self.job_id + + def get_exit_code(self) -> Optional[int]: + return self.exit_code + + def get_info(self) -> Dict[str, Any]: + return {"job_id": self.job_id, + "exit_code": self.exit_code, + "on_slurm": True, + "mode": self.mode} class SlurmDispatcher(Dispatcher): """ @@ -214,7 +243,7 @@ def _dispatch( funcs: typing.Iterable[FunctionCall], options: SlurmOptions, block: bool = False, - ) -> typing.Optional[int]: + ) -> SlurmJobReference: dispatch_guard() if "job_name" not in options: funcs = list(funcs) @@ -234,17 +263,17 @@ def _dispatch( logging.getLogger("slurminade").info( "Returned from srun with exit code %s", ret ) - return None + return SlurmJobReference(None, ret, "srun") jid = slurm.sbatch(command) self._all_job_ids.append(jid) - return jid + return SlurmJobReference(jid, None, "sbatch") def sbatch( self, command: str, conf: typing.Optional[typing.Dict] = None, simple_slurm_kwargs: typing.Optional[typing.Dict] = None, - ): + ) -> SlurmJobReference: dispatch_guard() conf = _get_conf(conf) slurm = simple_slurm.Slurm(**conf) @@ -254,7 +283,7 @@ def sbatch( else: jid = slurm.sbatch(command) self._all_job_ids.append(jid) - return jid + return SlurmJobReference(jid, None, "sbatch") def join(self): if not self._all_job_ids: @@ -266,7 +295,7 @@ def srun( command: str, conf: typing.Optional[typing.Dict] = None, simple_slurm_kwargs: typing.Optional[typing.Dict] = None, - ): + ) -> SlurmJobReference: dispatch_guard() conf = _get_conf(conf) slurm = simple_slurm.Slurm(**conf) @@ -275,8 +304,20 @@ def srun( ret = slurm.srun(command, **simple_slurm_kwargs) else: ret = slurm.srun(command) - return ret + return SlurmJobReference(None, ret, "srun") + +class SubprocessJobReference(JobReference): + def __init__(self): + pass + + def get_job_id(self) -> Optional[int]: + return None + def get_exit_code(self) -> Optional[int]: + return None + + def get_info(self) -> Dict[str, Any]: + return {"on_slurm": False} class SubprocessDispatcher(Dispatcher): """ @@ -325,6 +366,16 @@ def sbatch( def is_sequential(self): return True +class LocalJobReference(JobReference): + def get_job_id(self) -> None: + return None + + def get_exit_code(self) -> None: + return None + + def get_info(self) -> Dict[str, Any]: + return {"on_slurm": False} + class DirectCallDispatcher(Dispatcher): """ @@ -338,11 +389,11 @@ def _dispatch( funcs: typing.Iterable[FunctionCall], options: SlurmOptions, block: bool = False, - ) -> int: + ) -> LocalJobReference: dispatch_guard() for func in funcs: FunctionMap.call(func.func_id, func.args, func.kwargs) - return -1 + return LocalJobReference() def srun( self, @@ -352,6 +403,7 @@ def srun( ): dispatch_guard() subprocess.run(command, check=True) + return LocalJobReference() def sbatch( self, @@ -359,7 +411,7 @@ def sbatch( conf: typing.Optional[typing.Dict] = None, simple_slurm_kwargs: typing.Optional[typing.Dict] = None, ): - self.srun(command) + return self.srun(command) def is_sequential(self): return True diff --git a/src/slurminade/function.py b/src/slurminade/function.py index bb908c6..ae30bfb 100644 --- a/src/slurminade/function.py +++ b/src/slurminade/function.py @@ -7,7 +7,7 @@ from .function_map import FunctionMap from .guard import guard_recursive_distribution from .options import SlurmOptions - +from .job_reference import JobReference class CallPolicy(Enum): """ @@ -52,7 +52,7 @@ def update_options(self, conf: typing.Dict[str, typing.Any]): self.special_slurm_opts.update(conf) def wait_for( - self, job_ids: typing.Union[int, typing.Iterable[int]], method: str = "afterany" + self, job_ids: typing.Union[JobReference, typing.Iterable[JobReference]], method: str = "afterany" ) -> "SlurmFunction": """ Add a dependency to a distribution. @@ -69,17 +69,17 @@ def wait_for( """ sfunc = SlurmFunction(self.special_slurm_opts, self.func, self.func_id) job_ids = ( - [job_ids] if isinstance(job_ids, int) else list(job_ids) + [job_ids] if isinstance(job_ids, JobReference) else list(job_ids) ) # make sure it is a list if not job_ids and not get_dispatcher().is_sequential(): msg = "Creating a dependency on an empty list of job ids." msg += " This is probably an error in your code." msg += " Maybe you are using `Batch` but flush outside of the `with` block?" raise RuntimeError(msg) - if any(jid < 0 for jid in job_ids) and not get_dispatcher().is_sequential(): + if any(jid.get_job_id() is None for jid in job_ids) and not get_dispatcher().is_sequential(): msg = "Invalid job id. Not every dispatcher can directly return job ids, because it may not directly distribute them or doesn't distribute them at all." raise RuntimeError(msg) - sfunc.special_slurm_opts.add_dependencies(list(job_ids), method) + sfunc.special_slurm_opts.add_dependencies(list(jid.get_job_id() for jid in job_ids), method) return sfunc def with_options(self, **kwargs) -> "SlurmFunction": diff --git a/src/slurminade/job_reference.py b/src/slurminade/job_reference.py new file mode 100644 index 0000000..4ff3af9 --- /dev/null +++ b/src/slurminade/job_reference.py @@ -0,0 +1,15 @@ +import abc +from typing import Any, Dict, Optional + +class JobReference(abc.ABC): + @abc.abstractmethod + def get_job_id(self) -> Optional[int]: + pass + + @abc.abstractmethod + def get_exit_code(self) -> Optional[int]: + pass + + @abc.abstractmethod + def get_info(self) -> Dict[str, Any]: + pass \ No newline at end of file