Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 76 additions & 3 deletions executorlib/task_scheduler/interactive/blockallocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
from threading import Thread
from typing import Callable, Optional

from executorlib.standalone.command import get_interactive_execute_command
from executorlib.standalone.inputcheck import (
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.standalone.interactive.communication import interface_bootup
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.standalone.queue import cancel_items_in_queue
from executorlib.task_scheduler.base import TaskSchedulerBase
from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks
from executorlib.task_scheduler.interactive.shared import execute_task_dict, task_done


class BlockAllocationTaskScheduler(TaskSchedulerBase):
Expand Down Expand Up @@ -64,7 +66,7 @@ def __init__(
self._set_process(
process=[
Thread(
target=execute_multiple_tasks,
target=_execute_multiple_tasks,
kwargs=executor_kwargs | {"worker_id": worker_id},
)
for worker_id in range(self._max_workers)
Expand All @@ -90,7 +92,7 @@ def max_workers(self, max_workers: int):
elif self._max_workers < max_workers:
new_process_lst = [
Thread(
target=execute_multiple_tasks,
target=_execute_multiple_tasks,
kwargs=self._process_kwargs,
)
for _ in range(max_workers - self._max_workers)
Expand Down Expand Up @@ -175,3 +177,74 @@ def _set_process(self, process: list[Thread]): # type: ignore
self._process = process
for process_instance in self._process:
process_instance.start()


def _execute_multiple_tasks(
future_queue: queue.Queue,
cores: int = 1,
spawner: type[BaseSpawner] = MpiExecSpawner,
hostname_localhost: Optional[bool] = None,
init_function: Optional[Callable] = None,
cache_directory: Optional[str] = None,
cache_key: Optional[str] = None,
queue_join_on_shutdown: bool = True,
log_obj_size: bool = False,
error_log_file: Optional[str] = None,
worker_id: Optional[int] = None,
**kwargs,
) -> None:
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
spawner (BaseSpawner): Spawner to start process on selected compute resources
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
init_function (Callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
overwritten by setting the cache_key.
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
submitted to the Executor.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
"""
interface = interface_bootup(
Comment on lines +182 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Harden worker against unexpected exceptions and ensure cleanup

If an exception escapes the loop, the remote process is left running. Wrap with try/except/finally to always shut down the interface.

 def _execute_multiple_tasks(
@@
-    interface = interface_bootup(
+    interface = interface_bootup(
@@
-    if init_function is not None:
-        interface.send_dict(
-            input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
-        )
-    while True:
-        task_dict = future_queue.get()
-        if "shutdown" in task_dict and task_dict["shutdown"]:
-            interface.shutdown(wait=task_dict["wait"])
-            task_done(future_queue=future_queue)
-            if queue_join_on_shutdown:
-                future_queue.join()
-            break
-        elif "fn" in task_dict and "future" in task_dict:
-            execute_task_dict(
-                task_dict=task_dict,
-                interface=interface,
-                cache_directory=cache_directory,
-                cache_key=cache_key,
-                error_log_file=error_log_file,
-            )
-            task_done(future_queue=future_queue)
+    try:
+        if init_function is not None:
+            interface.send_dict(
+                input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
+            )
+        while True:
+            task_dict = future_queue.get()
+            if "shutdown" in task_dict and task_dict["shutdown"]:
+                interface.shutdown(wait=task_dict["wait"])
+                task_done(future_queue=future_queue)
+                if queue_join_on_shutdown:
+                    future_queue.join()
+                break
+            elif "fn" in task_dict and "future" in task_dict:
+                try:
+                    execute_task_dict(
+                        task_dict=task_dict,
+                        interface=interface,
+                        cache_directory=cache_directory,
+                        cache_key=cache_key,
+                        error_log_file=error_log_file,
+                    )
+                finally:
+                    task_done(future_queue=future_queue)
+    except Exception:
+        # Best-effort cleanup if something unexpected happens
+        interface.shutdown(wait=True)
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _execute_multiple_tasks(
future_queue: queue.Queue,
cores: int = 1,
spawner: type[BaseSpawner] = MpiExecSpawner,
hostname_localhost: Optional[bool] = None,
init_function: Optional[Callable] = None,
cache_directory: Optional[str] = None,
cache_key: Optional[str] = None,
queue_join_on_shutdown: bool = True,
log_obj_size: bool = False,
error_log_file: Optional[str] = None,
worker_id: Optional[int] = None,
**kwargs,
) -> None:
"""
Execute a single tasks in parallel using the message passing interface (MPI).
Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
spawner (BaseSpawner): Spawner to start process on selected compute resources
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
init_function (Callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
overwritten by setting the cache_key.
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
submitted to the Executor.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
"""
interface = interface_bootup(
interface = interface_bootup(
cores=cores,
spawner=spawner,
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
worker_id=worker_id,
**kwargs,
)
try:
if init_function is not None:
interface.send_dict(
input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
)
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
task_done(future_queue=future_queue)
if queue_join_on_shutdown:
future_queue.join()
break
elif "fn" in task_dict and "future" in task_dict:
try:
execute_task_dict(
task_dict=task_dict,
interface=interface,
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)
finally:
task_done(future_queue=future_queue)
except Exception:
# Best‐effort cleanup if something unexpected happens
interface.shutdown(wait=True)
raise
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/blockallocation.py around lines 182 to
221, the function _execute_multiple_tasks can let exceptions escape the task
loop and leave the remote interface/process running; wrap the main execution
loop and any code that can raise in a try/except/finally so that exceptions are
caught, logged to error_log_file (or re-raised after logging) and the interface
is always cleaned up in the finally block (call interface.shutdown() and any
spawner/cleanup routines), ensuring any threads/queues are joined and resources
released even on error.

command_lst=get_interactive_execute_command(
cores=cores,
),
connections=spawner(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
worker_id=worker_id,
)
if init_function is not None:
interface.send_dict(
input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
)
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
task_done(future_queue=future_queue)
if queue_join_on_shutdown:
future_queue.join()
break
elif "fn" in task_dict and "future" in task_dict:
execute_task_dict(
task_dict=task_dict,
interface=interface,
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)
task_done(future_queue=future_queue)
64 changes: 59 additions & 5 deletions executorlib/task_scheduler/interactive/onetoone.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from threading import Thread
from typing import Optional

from executorlib.standalone.command import get_interactive_execute_command
from executorlib.standalone.interactive.communication import interface_bootup
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.task_scheduler.base import TaskSchedulerBase
from executorlib.task_scheduler.interactive.shared import execute_single_task
from executorlib.task_scheduler.interactive.shared import execute_task_dict


class OneProcessTaskScheduler(TaskSchedulerBase):
Expand Down Expand Up @@ -60,13 +62,13 @@ def __init__(
self._process_kwargs = executor_kwargs
self._set_process(
Thread(
target=_execute_task_in_separate_process,
target=_execute_single_task,
kwargs=executor_kwargs,
)
)


def _execute_task_in_separate_process(
def _execute_single_task(
future_queue: queue.Queue,
spawner: type[BaseSpawner] = MpiExecSpawner,
max_cores: Optional[int] = None,
Expand Down Expand Up @@ -166,7 +168,6 @@ def _wrap_execute_task_in_separate_process(
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
spawner (BaseSpawner): Interface to start process on selected compute resources
executor_kwargs (dict): keyword parameters used to initialize the Executor
max_cores (int): defines the number cores which can be used in parallel
Expand Down Expand Up @@ -207,8 +208,61 @@ def _wrap_execute_task_in_separate_process(
}
)
process = Thread(
target=execute_single_task,
target=_execute_task_in_thread,
kwargs=task_kwargs,
)
process.start()
return process, active_task_dict


def _execute_task_in_thread(
task_dict: dict,
cores: int = 1,
spawner: type[BaseSpawner] = MpiExecSpawner,
hostname_localhost: Optional[bool] = None,
cache_directory: Optional[str] = None,
cache_key: Optional[str] = None,
log_obj_size: bool = False,
error_log_file: Optional[str] = None,
worker_id: Optional[int] = None,
**kwargs,
) -> None:
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
cores (int): defines the total number of MPI ranks to use
spawner (BaseSpawner): Spawner to start process on selected compute resources
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
overwritten by setting the cache_key.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
submitted to the Executor.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
"""
execute_task_dict(
task_dict=task_dict,
interface=interface_bootup(
command_lst=get_interactive_execute_command(
cores=cores,
),
connections=spawner(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
worker_id=worker_id,
),
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)
146 changes: 8 additions & 138 deletions executorlib/task_scheduler/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,143 +2,13 @@
import os
import queue
import time
from typing import Callable, Optional
from typing import Optional

from executorlib.standalone.command import get_interactive_execute_command
from executorlib.standalone.interactive.communication import (
SocketInterface,
interface_bootup,
)
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.standalone.interactive.communication import SocketInterface
from executorlib.standalone.serialize import serialize_funct


def execute_multiple_tasks(
future_queue: queue.Queue,
cores: int = 1,
spawner: type[BaseSpawner] = MpiExecSpawner,
hostname_localhost: Optional[bool] = None,
init_function: Optional[Callable] = None,
cache_directory: Optional[str] = None,
cache_key: Optional[str] = None,
queue_join_on_shutdown: bool = True,
log_obj_size: bool = False,
error_log_file: Optional[str] = None,
worker_id: Optional[int] = None,
**kwargs,
) -> None:
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
spawner (BaseSpawner): Spawner to start process on selected compute resources
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
init_function (Callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
overwritten by setting the cache_key.
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
submitted to the Executor.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
"""
interface = interface_bootup(
command_lst=get_interactive_execute_command(
cores=cores,
),
connections=spawner(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
worker_id=worker_id,
)
if init_function is not None:
interface.send_dict(
input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
)
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
_task_done(future_queue=future_queue)
if queue_join_on_shutdown:
future_queue.join()
break
elif "fn" in task_dict and "future" in task_dict:
_execute_task_dict(
task_dict=task_dict,
interface=interface,
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)
_task_done(future_queue=future_queue)


def execute_single_task(
task_dict: dict,
cores: int = 1,
spawner: type[BaseSpawner] = MpiExecSpawner,
hostname_localhost: Optional[bool] = None,
cache_directory: Optional[str] = None,
cache_key: Optional[str] = None,
log_obj_size: bool = False,
error_log_file: Optional[str] = None,
worker_id: Optional[int] = None,
**kwargs,
) -> None:
"""
Execute a single tasks in parallel using the message passing interface (MPI).

Args:
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
cores (int): defines the total number of MPI ranks to use
spawner (BaseSpawner): Spawner to start process on selected compute resources
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
init_function (Callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
overwritten by setting the cache_key.
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
submitted to the Executor.
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
distribution.
"""
_execute_task_dict(
task_dict=task_dict,
interface=interface_bootup(
command_lst=get_interactive_execute_command(
cores=cores,
),
connections=spawner(cores=cores, **kwargs),
hostname_localhost=hostname_localhost,
log_obj_size=log_obj_size,
worker_id=worker_id,
),
cache_directory=cache_directory,
cache_key=cache_key,
error_log_file=error_log_file,
)


def _execute_task_dict(
def execute_task_dict(
task_dict: dict,
interface: SocketInterface,
cache_directory: Optional[str] = None,
Expand Down Expand Up @@ -171,6 +41,11 @@ def _execute_task_dict(
)


def task_done(future_queue: queue.Queue):
with contextlib.suppress(ValueError):
future_queue.task_done()


def _execute_task_without_cache(interface: SocketInterface, task_dict: dict):
"""
Execute the task in the task_dict by communicating it via the interface.
Expand Down Expand Up @@ -233,8 +108,3 @@ def _execute_task_with_cache(
_, _, result = get_output(file_name=file_name)
future = task_dict["future"]
future.set_result(result)


def _task_done(future_queue: queue.Queue):
with contextlib.suppress(ValueError):
future_queue.task_done()
Loading
Loading