Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions executorlib/task_scheduler/interactive/blockallocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.standalone.queue import cancel_items_in_queue
from executorlib.task_scheduler.base import TaskSchedulerBase
from executorlib.task_scheduler.interactive.shared import execute_tasks
from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks


class BlockAllocationTaskScheduler(TaskSchedulerBase):
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(
self._set_process(
process=[
Thread(
target=execute_tasks,
target=execute_multiple_tasks,
kwargs=executor_kwargs | {"worker_id": worker_id},
)
for worker_id in range(self._max_workers)
Expand All @@ -90,7 +90,7 @@ def max_workers(self, max_workers: int):
elif self._max_workers < max_workers:
new_process_lst = [
Thread(
target=execute_tasks,
target=execute_multiple_tasks,
kwargs=self._process_kwargs,
)
for _ in range(max_workers - self._max_workers)
Expand Down
14 changes: 3 additions & 11 deletions executorlib/task_scheduler/interactive/onetoone.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.task_scheduler.base import TaskSchedulerBase
from executorlib.task_scheduler.interactive.shared import execute_tasks
from executorlib.task_scheduler.interactive.shared import execute_single_task


class OneProcessTaskScheduler(TaskSchedulerBase):
Expand Down Expand Up @@ -94,7 +94,6 @@ def _execute_task_in_separate_process(
"""
active_task_dict: dict = {}
process_lst: list = []
qtask_lst: list = []
if "cores" not in kwargs:
kwargs["cores"] = 1
while True:
Expand All @@ -106,18 +105,15 @@ def _execute_task_in_separate_process(
future_queue.join()
break
elif "fn" in task_dict and "future" in task_dict:
qtask: queue.Queue = queue.Queue()
process, active_task_dict = _wrap_execute_task_in_separate_process(
task_dict=task_dict,
qtask=qtask,
active_task_dict=active_task_dict,
spawner=spawner,
executor_kwargs=kwargs,
max_cores=max_cores,
max_workers=max_workers,
hostname_localhost=hostname_localhost,
)
qtask_lst.append(qtask)
process_lst.append(process)
future_queue.task_done()

Expand Down Expand Up @@ -158,7 +154,6 @@ def _wait_for_free_slots(
def _wrap_execute_task_in_separate_process(
task_dict: dict,
active_task_dict: dict,
qtask: queue.Queue,
spawner: type[BaseSpawner],
executor_kwargs: dict,
max_cores: Optional[int] = None,
Expand Down Expand Up @@ -190,8 +185,6 @@ def _wrap_execute_task_in_separate_process(
dictionary containing the future objects and the number of cores they require
"""
resource_dict = task_dict.pop("resource_dict").copy()
qtask.put(task_dict)
qtask.put({"shutdown": True, "wait": True})
if "cores" not in resource_dict or (
resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
):
Expand All @@ -208,14 +201,13 @@ def _wrap_execute_task_in_separate_process(
task_kwargs.update(resource_dict)
task_kwargs.update(
{
"future_queue": qtask,
"task_dict": task_dict,
"spawner": spawner,
"hostname_localhost": hostname_localhost,
"init_function": None,
}
)
process = Thread(
target=execute_tasks,
target=execute_single_task,
kwargs=task_kwargs,
)
process.start()
Expand Down
107 changes: 95 additions & 12 deletions executorlib/task_scheduler/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from executorlib.standalone.serialize import serialize_funct


def execute_tasks(
def execute_multiple_tasks(
future_queue: queue.Queue,
cores: int = 1,
spawner: type[BaseSpawner] = MpiExecSpawner,
Expand Down Expand Up @@ -74,20 +74,103 @@ def execute_tasks(
future_queue.join()
break
elif "fn" in task_dict and "future" in task_dict:
if error_log_file is not None:
task_dict["error_log_file"] = error_log_file
if cache_directory is None:
_execute_task_without_cache(interface=interface, task_dict=task_dict)
else:
_execute_task_with_cache(
interface=interface,
task_dict=task_dict,
cache_directory=cache_directory,
cache_key=cache_key,
)
_execute_task_dict(
task_dict=task_dict,
interface=interface,
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)
_task_done(future_queue=future_queue)


def execute_single_task(
task_dict: dict,
cores: int = 1,
spawner: type[BaseSpawner] = MpiExecSpawner,
hostname_localhost: Optional[bool] = None,
cache_directory: Optional[str] = None,
cache_key: Optional[str] = None,
log_obj_size: bool = False,
error_log_file: Optional[str] = None,
worker_id: Optional[int] = None,
**kwargs,
) -> None:
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
spawner (BaseSpawner): Spawner to start process on selected compute resources
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
init_function (Callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
overwritten by setting the cache_key.
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
submitted to the Executor.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
"""
_execute_task_dict(
task_dict=task_dict,
interface=interface_bootup(
command_lst=get_interactive_execute_command(
cores=cores,
),
connections=spawner(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
worker_id=worker_id,
),
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)


def _execute_task_dict(
task_dict: dict,
interface: SocketInterface,
cache_directory: Optional[str] = None,
cache_key: Optional[str] = None,
error_log_file: Optional[str] = None,
):
"""
Execute the task in the task_dict by communicating it via the interface.

Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
interface (SocketInterface): socket interface for zmq communication
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
overwritten by setting the cache_key.
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
submitted to the Executor.
"""
if error_log_file is not None:
task_dict["error_log_file"] = error_log_file
if cache_directory is None:
_execute_task_without_cache(interface=interface, task_dict=task_dict)
else:
_execute_task_with_cache(
interface=interface,
task_dict=task_dict,
cache_directory=cache_directory,
cache_key=cache_key,
)

Comment on lines +161 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

execute_single_task leaks the interface; always shutdown in success path.

If no exception occurs, the interface process remains running. Ensure shutdown in a finally block.

-    if error_log_file is not None:
-        task_dict["error_log_file"] = error_log_file
-    if cache_directory is None:
-        _execute_task_without_cache(interface=interface, task_dict=task_dict)
-    else:
-        _execute_task_with_cache(
-            interface=interface,
-            task_dict=task_dict,
-            cache_directory=cache_directory,
-            cache_key=cache_key,
-        )
+    try:
+        if error_log_file is not None:
+            task_dict["error_log_file"] = error_log_file
+        if cache_directory is None:
+            _execute_task_without_cache(interface=interface, task_dict=task_dict)
+        else:
+            _execute_task_with_cache(
+                interface=interface,
+                task_dict=task_dict,
+                cache_directory=cache_directory,
+                cache_key=cache_key,
+            )
+    finally:
+        # Be robust if helpers already shut down on exception
+        with contextlib.suppress(Exception):
+            interface.shutdown(wait=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if error_log_file is not None:
task_dict["error_log_file"] = error_log_file
if cache_directory is None:
_execute_task_without_cache(interface=interface, task_dict=task_dict)
else:
_execute_task_with_cache(
interface=interface,
task_dict=task_dict,
cache_directory=cache_directory,
cache_key=cache_key,
)
try:
if error_log_file is not None:
task_dict["error_log_file"] = error_log_file
if cache_directory is None:
_execute_task_without_cache(interface=interface, task_dict=task_dict)
else:
_execute_task_with_cache(
interface=interface,
task_dict=task_dict,
cache_directory=cache_directory,
cache_key=cache_key,
)
finally:
# Be robust if helpers already shut down on exception
with contextlib.suppress(Exception):
interface.shutdown(wait=True)
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/shared.py around lines 142 to 153, the
code calls either _execute_task_without_cache or _execute_task_with_cache but
does not stop the interface on the success path, leaking the interface process;
wrap the execution calls in a try/finally so that interface.shutdown() (or the
appropriate stop/close method on the interface) is always invoked after the task
completes or raises, preserving the existing error_log_file and cache logic and
ensuring shutdown occurs in both success and exception cases.


def _execute_task_without_cache(interface: SocketInterface, task_dict: dict):
"""
Execute the task in the task_dict by communicating it via the interface.
Expand Down
6 changes: 3 additions & 3 deletions tests/test_fluxpythonspawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import numpy as np

from executorlib.task_scheduler.interactive.shared import execute_tasks
from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks
from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler
from executorlib.standalone.serialize import cloudpickle_register

Expand Down Expand Up @@ -112,7 +112,7 @@ def test_execute_task(self):
q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_tasks(
execute_multiple_tasks(
future_queue=q,
cores=1,
flux_executor=self.flux_executor,
Expand All @@ -127,7 +127,7 @@ def test_execute_task_threads(self):
q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_tasks(
execute_multiple_tasks(
future_queue=q,
cores=1,
threads_per_core=1,
Expand Down
16 changes: 8 additions & 8 deletions tests/test_mpiexecspawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from executorlib.task_scheduler.base import TaskSchedulerBase
from executorlib.standalone.interactive.spawner import MpiExecSpawner
from executorlib.task_scheduler.interactive.shared import execute_tasks
from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks
from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler
from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler
from executorlib.standalone.interactive.backend import call_funct
Expand Down Expand Up @@ -261,7 +261,7 @@ def test_execute_task(self):
q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_tasks(
execute_multiple_tasks(
future_queue=q,
cores=1,
openmpi_oversubscribe=False,
Expand Down Expand Up @@ -443,7 +443,7 @@ def test_execute_task_failed_no_argument(self):
q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_tasks(
execute_multiple_tasks(
future_queue=q,
cores=1,
openmpi_oversubscribe=False,
Expand All @@ -459,7 +459,7 @@ def test_execute_task_failed_wrong_argument(self):
q.put({"fn": calc_array, "args": (), "kwargs": {"j": 4}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_tasks(
execute_multiple_tasks(
future_queue=q,
cores=1,
openmpi_oversubscribe=False,
Expand All @@ -475,7 +475,7 @@ def test_execute_task(self):
q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_tasks(
execute_multiple_tasks(
future_queue=q,
cores=1,
openmpi_oversubscribe=False,
Expand All @@ -493,7 +493,7 @@ def test_execute_task_parallel(self):
q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_tasks(
execute_multiple_tasks(
future_queue=q,
cores=2,
openmpi_oversubscribe=False,
Expand All @@ -516,7 +516,7 @@ def test_execute_task_cache(self):
q.put({"fn": calc, "args": (), "kwargs": {"i": 1}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_tasks(
execute_multiple_tasks(
future_queue=q,
cores=1,
openmpi_oversubscribe=False,
Expand All @@ -535,7 +535,7 @@ def test_execute_task_cache_failed_no_argument(self):
q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f})
q.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
execute_tasks(
execute_multiple_tasks(
future_queue=q,
cores=1,
openmpi_oversubscribe=False,
Expand Down
8 changes: 4 additions & 4 deletions tests/test_singlenodeexecutor_shell_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from executorlib import SingleNodeExecutor
from executorlib.standalone.serialize import cloudpickle_register
from executorlib.task_scheduler.interactive.shared import execute_tasks
from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks
from executorlib.standalone.interactive.spawner import MpiExecSpawner


Expand All @@ -32,7 +32,7 @@ def test_execute_single_task(self):
test_queue.put({"shutdown": True, "wait": True})
cloudpickle_register(ind=1)
self.assertFalse(f.done())
execute_tasks(
execute_multiple_tasks(
future_queue=test_queue,
cores=1,
openmpi_oversubscribe=False,
Expand All @@ -58,7 +58,7 @@ def test_wrong_error(self):
)
cloudpickle_register(ind=1)
with self.assertRaises(TypeError):
execute_tasks(
execute_multiple_tasks(
future_queue=test_queue,
cores=1,
openmpi_oversubscribe=False,
Expand All @@ -85,7 +85,7 @@ def test_broken_executable(self):
)
cloudpickle_register(ind=1)
with self.assertRaises(FileNotFoundError):
execute_tasks(
execute_multiple_tasks(
future_queue=test_queue,
cores=1,
openmpi_oversubscribe=False,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_singlenodeexecutor_shell_interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from executorlib import SingleNodeExecutor
from executorlib.standalone.serialize import cloudpickle_register
from executorlib.task_scheduler.interactive.shared import execute_tasks
from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks
from executorlib.standalone.interactive.spawner import MpiExecSpawner


Expand Down Expand Up @@ -88,7 +88,7 @@ def test_execute_single_task(self):
cloudpickle_register(ind=1)
self.assertFalse(future_lines.done())
self.assertFalse(future_pattern.done())
execute_tasks(
execute_multiple_tasks(
future_queue=test_queue,
cores=1,
openmpi_oversubscribe=False,
Expand Down
Loading