Skip to content

Commit

Permalink
Support threads on local backend
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Nov 19, 2024
1 parent 1565d4c commit 78246d5
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 22 deletions.
2 changes: 0 additions & 2 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
check_nested_flux_executor,
check_oversubscribe,
check_pmi,
check_threads_per_core,
validate_number_of_cores,
)
from executorlib.standalone.interactive.spawner import (
Expand Down Expand Up @@ -258,7 +257,6 @@ def create_executor(
elif backend == "local":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
Expand Down
5 changes: 3 additions & 2 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,14 @@ def _submit_function_to_separate_process(
resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
):
resource_dict["cores"] = executor_kwargs["cores"]
slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1)
active_task_dict = _wait_for_free_slots(
active_task_dict=active_task_dict,
cores_requested=resource_dict["cores"],
cores_requested=slots_required,
max_cores=max_cores,
max_workers=max_workers,
)
active_task_dict[task_dict["future"]] = resource_dict["cores"]
active_task_dict[task_dict["future"]] = slots_required
task_kwargs = executor_kwargs.copy()
task_kwargs.update(resource_dict)
task_kwargs.update(
Expand Down
13 changes: 0 additions & 13 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,6 @@ def check_gpus_per_worker(gpus_per_worker: int) -> None:
)


def check_threads_per_core(threads_per_core: int) -> None:
"""
Check if threads_per_core is not 1 and raise a TypeError if it is.
"""
if threads_per_core != 1:
raise TypeError(
"Thread based parallelism is not supported for the executorlib.mpi.PyMPIExecutor backend."
"Please use threads_per_core=1 instead of threads_per_core="
+ str(threads_per_core)
+ "."
)


def check_executor(executor: Executor) -> None:
"""
Check if executor is not None and raise a ValueError if it is.
Expand Down
5 changes: 0 additions & 5 deletions tests/test_shared_input_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_gpus_per_worker,
check_threads_per_core,
check_oversubscribe,
check_executor,
check_init_function,
Expand Down Expand Up @@ -31,10 +30,6 @@ def test_check_gpus_per_worker(self):
with self.assertRaises(TypeError):
check_gpus_per_worker(gpus_per_worker=1)

def test_check_threads_per_core(self):
with self.assertRaises(TypeError):
check_threads_per_core(threads_per_core=2)

def test_check_oversubscribe(self):
with self.assertRaises(ValueError):
check_oversubscribe(oversubscribe=True)
Expand Down

0 comments on commit 78246d5

Please sign in to comment.