Skip to content

Commit

Permalink
Merge pull request #22 from d-krupke/development
Browse files Browse the repository at this point in the history
v1.0.1
  • Loading branch information
d-krupke authored Feb 26, 2024
2 parents 6d8a02c + b4d07c6 commit b1f26ce
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 27 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ The project is reasonably easy:
Changes
-------

- 1.0.1: Dispatcher now return jobs references instead of job ids. This allows to do some fancier stuff in the future, when the jobs infos are only available a short time after the job has been submitted.
- 0.10.1: FIX: Listing functions will no longer execute setup functions.
- 0.10.0: `Batch` is now named `JobBundling`. There is a method `join` for easier synchronization. `exec` allows to executed commands just like `srun` and `sbatch`, but uniform syntax with other slurmified functions. Functions can now also be called with `distribute_and_wait`. If you call `python3 -m slurminade.check --partition YOUR_PARTITION --constraint YOUR_CONSTRAINT` you can check if your slurm configuration is running correctly.
- 0.9.0: Lots of improvements.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ where = ["src"]

[project]
name = "slurminade"
version = "0.10.1"
version = "1.0.1"
authors = [
{ name = "TU Braunschweig, IBR, Algorithms Group (Dominik Krupke)", email = "krupke@ibr.cs.tu-bs.de" },
]
Expand Down
22 changes: 18 additions & 4 deletions src/slurminade/bundling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,21 @@
from .function import SlurmFunction
from .guard import BatchGuard
from .options import SlurmOptions
from .job_reference import JobReference

class BundlingJobReference(JobReference):
def __init__(self) -> None:
super().__init__()
pass

def get_job_id(self) -> typing.Optional[int]:
return None

def get_exit_code(self) -> typing.Optional[int]:
return None

def get_info(self) -> typing.Dict[str, typing.Any]:
return {}

class TaskBuffer:
"""
Expand Down Expand Up @@ -116,20 +130,20 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> JobReference:
if block:
# if blocking, we don't buffer, but dispatch immediately
return self.subdispatcher._dispatch(funcs, options, block=True)
for func in funcs:
self._tasks.add(func, options)
return -1
return BundlingJobReference()

def srun(
self,
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
) -> JobReference:
conf = SlurmOptions(conf if conf else {})
return self.subdispatcher.srun(command, conf, simple_slurm_kwargs)

Expand All @@ -138,7 +152,7 @@ def sbatch(
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
) -> JobReference:
conf = SlurmOptions(conf if conf else {})
return self.subdispatcher.sbatch(command, conf, simple_slurm_kwargs)

Expand Down
86 changes: 69 additions & 17 deletions src/slurminade/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import shutil
import subprocess
import typing

from typing import Any, Dict, Optional
import simple_slurm

from .conf import _get_conf
Expand All @@ -24,6 +24,7 @@
# MAX_ARG_STRLEN on a Linux system with PAGE_SIZE 4096 is 131072
DEFAULT_MAX_ARG_LENGTH = 100000

from .job_reference import JobReference

class Dispatcher(abc.ABC):
"""
Expand All @@ -38,7 +39,7 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> JobReference:
"""
Define how to dispatch a number of function calls.
:param funcs: The function calls to be dispatched.
Expand All @@ -52,7 +53,7 @@ def srun(
command: str,
conf: typing.Optional[SlurmOptions] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
) -> int:
) -> JobReference:
"""
Define how you want to execute an `srun` command. This command is directly
executed and only terminates after completion.
Expand All @@ -68,7 +69,7 @@ def sbatch(
command: str,
conf: typing.Optional[SlurmOptions] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
) -> int:
) -> JobReference:
"""
Define how you want to execute an `sbatch` command. The command is scheduled
and the function return immediately.
Expand All @@ -93,7 +94,7 @@ def __call__(
funcs: typing.Union[FunctionCall, typing.Iterable[FunctionCall]],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> JobReference:
"""
Dispatches a function call or a number of function calls.
:param funcs: The function calls to be distributed.
Expand Down Expand Up @@ -124,6 +125,15 @@ def join(self):
msg = "Joining is not implemented for this dispatcher."
raise NotImplementedError(msg)

class TestJobReference(JobReference):
def get_job_id(self) -> None:
return None

def get_exit_code(self) -> None:
return None

def get_info(self) -> Dict[str, Any]:
return {"info": "test"}

class TestDispatcher(Dispatcher):
"""
Expand All @@ -142,7 +152,7 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> JobReference:
dispatch_guard()
funcs = list(funcs)
command = create_slurminade_command(
Expand All @@ -151,7 +161,7 @@ def _dispatch(
logging.getLogger("slurminade").info(command)
self.calls.append(funcs)
self._cleanup(command)
return -1
return TestJobReference()

def _cleanup(self, command):
args = shlex.split(command)
Expand All @@ -170,6 +180,7 @@ def srun(
dispatch_guard()
self.sruns.append(command)
logging.getLogger("slurminade").info("[test output] SRUN %s", command)
return TestJobReference()

def sbatch(
self,
Expand All @@ -180,10 +191,28 @@ def sbatch(
dispatch_guard()
self.sbatches.append(command)
logging.getLogger("slurminade").info("[test output] SBATCH %s", command)
return TestJobReference()

def is_sequential(self):
return True

class SlurmJobReference(JobReference):
def __init__(self, job_id, exit_code, mode: str):
self.job_id = job_id
self.exit_code = exit_code
self.mode = mode

def get_job_id(self) -> int:
return self.job_id

def get_exit_code(self) -> Optional[int]:
return self.exit_code

def get_info(self) -> Dict[str, Any]:
return {"job_id": self.job_id,
"exit_code": self.exit_code,
"on_slurm": True,
"mode": self.mode}

class SlurmDispatcher(Dispatcher):
"""
Expand Down Expand Up @@ -214,7 +243,7 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> typing.Optional[int]:
) -> SlurmJobReference:
dispatch_guard()
if "job_name" not in options:
funcs = list(funcs)
Expand All @@ -234,17 +263,17 @@ def _dispatch(
logging.getLogger("slurminade").info(
"Returned from srun with exit code %s", ret
)
return None
return SlurmJobReference(None, ret, "srun")
jid = slurm.sbatch(command)
self._all_job_ids.append(jid)
return jid
return SlurmJobReference(jid, None, "sbatch")

def sbatch(
self,
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
) -> SlurmJobReference:
dispatch_guard()
conf = _get_conf(conf)
slurm = simple_slurm.Slurm(**conf)
Expand All @@ -254,7 +283,7 @@ def sbatch(
else:
jid = slurm.sbatch(command)
self._all_job_ids.append(jid)
return jid
return SlurmJobReference(jid, None, "sbatch")

def join(self):
if not self._all_job_ids:
Expand All @@ -266,7 +295,7 @@ def srun(
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
) -> SlurmJobReference:
dispatch_guard()
conf = _get_conf(conf)
slurm = simple_slurm.Slurm(**conf)
Expand All @@ -275,8 +304,20 @@ def srun(
ret = slurm.srun(command, **simple_slurm_kwargs)
else:
ret = slurm.srun(command)
return ret
return SlurmJobReference(None, ret, "srun")

class SubprocessJobReference(JobReference):
def __init__(self):
pass

def get_job_id(self) -> Optional[int]:
return None

def get_exit_code(self) -> Optional[int]:
return None

def get_info(self) -> Dict[str, Any]:
return {"on_slurm": False}

class SubprocessDispatcher(Dispatcher):
"""
Expand Down Expand Up @@ -325,6 +366,16 @@ def sbatch(
def is_sequential(self):
return True

class LocalJobReference(JobReference):
def get_job_id(self) -> None:
return None

def get_exit_code(self) -> None:
return None

def get_info(self) -> Dict[str, Any]:
return {"on_slurm": False}


class DirectCallDispatcher(Dispatcher):
"""
Expand All @@ -338,11 +389,11 @@ def _dispatch(
funcs: typing.Iterable[FunctionCall],
options: SlurmOptions,
block: bool = False,
) -> int:
) -> LocalJobReference:
dispatch_guard()
for func in funcs:
FunctionMap.call(func.func_id, func.args, func.kwargs)
return -1
return LocalJobReference()

def srun(
self,
Expand All @@ -352,14 +403,15 @@ def srun(
):
dispatch_guard()
subprocess.run(command, check=True)
return LocalJobReference()

def sbatch(
self,
command: str,
conf: typing.Optional[typing.Dict] = None,
simple_slurm_kwargs: typing.Optional[typing.Dict] = None,
):
self.srun(command)
return self.srun(command)

def is_sequential(self):
return True
Expand Down
10 changes: 5 additions & 5 deletions src/slurminade/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .function_map import FunctionMap
from .guard import guard_recursive_distribution
from .options import SlurmOptions

from .job_reference import JobReference

class CallPolicy(Enum):
"""
Expand Down Expand Up @@ -52,7 +52,7 @@ def update_options(self, conf: typing.Dict[str, typing.Any]):
self.special_slurm_opts.update(conf)

def wait_for(
self, job_ids: typing.Union[int, typing.Iterable[int]], method: str = "afterany"
self, job_ids: typing.Union[JobReference, typing.Iterable[JobReference]], method: str = "afterany"
) -> "SlurmFunction":
"""
Add a dependency to a distribution.
Expand All @@ -69,17 +69,17 @@ def wait_for(
"""
sfunc = SlurmFunction(self.special_slurm_opts, self.func, self.func_id)
job_ids = (
[job_ids] if isinstance(job_ids, int) else list(job_ids)
[job_ids] if isinstance(job_ids, JobReference) else list(job_ids)
) # make sure it is a list
if not job_ids and not get_dispatcher().is_sequential():
msg = "Creating a dependency on an empty list of job ids."
msg += " This is probably an error in your code."
msg += " Maybe you are using `Batch` but flush outside of the `with` block?"
raise RuntimeError(msg)
if any(jid < 0 for jid in job_ids) and not get_dispatcher().is_sequential():
if any(jid.get_job_id() is None for jid in job_ids) and not get_dispatcher().is_sequential():
msg = "Invalid job id. Not every dispatcher can directly return job ids, because it may not directly distribute them or doesn't distribute them at all."
raise RuntimeError(msg)
sfunc.special_slurm_opts.add_dependencies(list(job_ids), method)
sfunc.special_slurm_opts.add_dependencies(list(jid.get_job_id() for jid in job_ids), method)
return sfunc

def with_options(self, **kwargs) -> "SlurmFunction":
Expand Down
15 changes: 15 additions & 0 deletions src/slurminade/job_reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import abc
from typing import Any, Dict, Optional

class JobReference(abc.ABC):
@abc.abstractmethod
def get_job_id(self) -> Optional[int]:
pass

@abc.abstractmethod
def get_exit_code(self) -> Optional[int]:
pass

@abc.abstractmethod
def get_info(self) -> Dict[str, Any]:
pass

0 comments on commit b1f26ce

Please sign in to comment.