Skip to content

Commit

Permalink
Support threads on local backend (#504)
Browse files Browse the repository at this point in the history
* Support threads on local backend

* fixes

* more fixes

* Fix spawner
  • Loading branch information
jan-janssen authored Nov 19, 2024
1 parent 1565d4c commit d11a2d3
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 38 deletions.
2 changes: 0 additions & 2 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
check_nested_flux_executor,
check_oversubscribe,
check_pmi,
check_threads_per_core,
validate_number_of_cores,
)
from executorlib.standalone.interactive.spawner import (
Expand Down Expand Up @@ -258,7 +257,6 @@ def create_executor(
elif backend == "local":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
Expand Down
5 changes: 3 additions & 2 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,14 @@ def _submit_function_to_separate_process(
resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
):
resource_dict["cores"] = executor_kwargs["cores"]
slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1)
active_task_dict = _wait_for_free_slots(
active_task_dict=active_task_dict,
cores_requested=resource_dict["cores"],
cores_requested=slots_required,
max_cores=max_cores,
max_workers=max_workers,
)
active_task_dict[task_dict["future"]] = resource_dict["cores"]
active_task_dict[task_dict["future"]] = slots_required
task_kwargs = executor_kwargs.copy()
task_kwargs.update(resource_dict)
task_kwargs.update(
Expand Down
13 changes: 0 additions & 13 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,6 @@ def check_gpus_per_worker(gpus_per_worker: int) -> None:
)


def check_threads_per_core(threads_per_core: int) -> None:
"""
Check if threads_per_core is not 1 and raise a TypeError if it is.
"""
if threads_per_core != 1:
raise TypeError(
"Thread based parallelism is not supported for the executorlib.mpi.PyMPIExecutor backend."
"Please use threads_per_core=1 instead of threads_per_core="
+ str(threads_per_core)
+ "."
)


def check_executor(executor: Executor) -> None:
"""
Check if executor is not None and raise a ValueError if it is.
Expand Down
5 changes: 4 additions & 1 deletion executorlib/standalone/interactive/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ def __init__(
cwd: Optional[str] = None,
cores: int = 1,
openmpi_oversubscribe: bool = False,
threads_per_core: int = 1,
):
"""
Subprocess interface implementation.
Args:
cwd (str, optional): The current working directory. Defaults to None.
cores (int, optional): The number of cores to use. Defaults to 1.
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
"""
super().__init__(
Expand All @@ -72,6 +74,7 @@ def __init__(
openmpi_oversubscribe=openmpi_oversubscribe,
)
self._process = None
self._threads_per_core = threads_per_core

def bootup(
self,
Expand Down Expand Up @@ -169,8 +172,8 @@ def __init__(
cwd=cwd,
cores=cores,
openmpi_oversubscribe=openmpi_oversubscribe,
threads_per_core=threads_per_core,
)
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._slurm_cmd_args = slurm_cmd_args

Expand Down
6 changes: 0 additions & 6 deletions tests/test_executor_backend_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ def test_meta_executor_parallel(self):
self.assertTrue(fs_1.done())

def test_errors(self):
with self.assertRaises(TypeError):
Executor(
max_cores=1,
resource_dict={"cores": 1, "threads_per_core": 2},
backend="local",
)
with self.assertRaises(TypeError):
Executor(
max_cores=1,
Expand Down
9 changes: 0 additions & 9 deletions tests/test_executor_backend_mpi_noblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,6 @@ def test_meta_executor_single(self):
self.assertTrue(fs_2.done())

def test_errors(self):
with self.assertRaises(TypeError):
Executor(
max_cores=1,
resource_dict={
"cores": 1,
"threads_per_core": 2,
},
backend="local",
)
with self.assertRaises(TypeError):
Executor(
max_cores=1,
Expand Down
5 changes: 0 additions & 5 deletions tests/test_shared_input_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_gpus_per_worker,
check_threads_per_core,
check_oversubscribe,
check_executor,
check_init_function,
Expand Down Expand Up @@ -31,10 +30,6 @@ def test_check_gpus_per_worker(self):
with self.assertRaises(TypeError):
check_gpus_per_worker(gpus_per_worker=1)

def test_check_threads_per_core(self):
with self.assertRaises(TypeError):
check_threads_per_core(threads_per_core=2)

def test_check_oversubscribe(self):
with self.assertRaises(ValueError):
check_oversubscribe(oversubscribe=True)
Expand Down

0 comments on commit d11a2d3

Please sign in to comment.