diff --git a/executorlib/task_scheduler/interactive/blockallocation.py b/executorlib/task_scheduler/interactive/blockallocation.py index 96cec2c1..ff4c7375 100644 --- a/executorlib/task_scheduler/interactive/blockallocation.py +++ b/executorlib/task_scheduler/interactive/blockallocation.py @@ -10,7 +10,7 @@ from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.standalone.queue import cancel_items_in_queue from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks class BlockAllocationTaskScheduler(TaskSchedulerBase): @@ -64,7 +64,7 @@ def __init__( self._set_process( process=[ Thread( - target=execute_tasks, + target=execute_multiple_tasks, kwargs=executor_kwargs | {"worker_id": worker_id}, ) for worker_id in range(self._max_workers) @@ -90,7 +90,7 @@ def max_workers(self, max_workers: int): elif self._max_workers < max_workers: new_process_lst = [ Thread( - target=execute_tasks, + target=execute_multiple_tasks, kwargs=self._process_kwargs, ) for _ in range(max_workers - self._max_workers) diff --git a/executorlib/task_scheduler/interactive/onetoone.py b/executorlib/task_scheduler/interactive/onetoone.py index d28f014b..e97fa4bc 100644 --- a/executorlib/task_scheduler/interactive/onetoone.py +++ b/executorlib/task_scheduler/interactive/onetoone.py @@ -4,7 +4,7 @@ from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner from executorlib.task_scheduler.base import TaskSchedulerBase -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_single_task class OneProcessTaskScheduler(TaskSchedulerBase): @@ -94,7 +94,6 @@ def _execute_task_in_separate_process( """ active_task_dict: dict = {} process_lst: list = [] - qtask_lst: list = [] if "cores" not in kwargs: kwargs["cores"] = 1 while True: @@ -106,10 +105,8 @@ def _execute_task_in_separate_process( future_queue.join() break elif "fn" in task_dict and "future" in task_dict: - qtask: queue.Queue = queue.Queue() process, active_task_dict = _wrap_execute_task_in_separate_process( task_dict=task_dict, - qtask=qtask, active_task_dict=active_task_dict, spawner=spawner, executor_kwargs=kwargs, @@ -117,7 +114,6 @@ def _execute_task_in_separate_process( max_workers=max_workers, hostname_localhost=hostname_localhost, ) - qtask_lst.append(qtask) process_lst.append(process) future_queue.task_done() @@ -158,7 +154,6 @@ def _wait_for_free_slots( def _wrap_execute_task_in_separate_process( task_dict: dict, active_task_dict: dict, - qtask: queue.Queue, spawner: type[BaseSpawner], executor_kwargs: dict, max_cores: Optional[int] = None, @@ -190,8 +185,6 @@ def _wrap_execute_task_in_separate_process( dictionary containing the future objects and the number of cores they require """ resource_dict = task_dict.pop("resource_dict").copy() - qtask.put(task_dict) - qtask.put({"shutdown": True, "wait": True}) if "cores" not in resource_dict or ( resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1 ): @@ -208,14 +201,13 @@ def _wrap_execute_task_in_separate_process( task_kwargs.update(resource_dict) task_kwargs.update( { - "future_queue": qtask, + "task_dict": task_dict, "spawner": spawner, "hostname_localhost": hostname_localhost, - "init_function": None, } ) process = Thread( - target=execute_tasks, + target=execute_single_task, kwargs=task_kwargs, ) process.start() diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 8ce33ada..0b46324d 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -13,7 +13,7 @@ from executorlib.standalone.serialize import serialize_funct -def execute_tasks( +def execute_multiple_tasks( future_queue: queue.Queue, cores: int = 1, spawner: type[BaseSpawner] = MpiExecSpawner, @@ -74,20 +74,103 @@ def execute_tasks( future_queue.join() break elif "fn" in task_dict and "future" in task_dict: - if error_log_file is not None: - task_dict["error_log_file"] = error_log_file - if cache_directory is None: - _execute_task_without_cache(interface=interface, task_dict=task_dict) - else: - _execute_task_with_cache( - interface=interface, - task_dict=task_dict, - cache_directory=cache_directory, - cache_key=cache_key, - ) + _execute_task_dict( + task_dict=task_dict, + interface=interface, + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) _task_done(future_queue=future_queue) +def execute_single_task( + task_dict: dict, + cores: int = 1, + spawner: type[BaseSpawner] = MpiExecSpawner, + hostname_localhost: Optional[bool] = None, + cache_directory: Optional[str] = None, + cache_key: Optional[str] = None, + log_obj_size: bool = False, + error_log_file: Optional[str] = None, + worker_id: Optional[int] = None, + **kwargs, +) -> None: + """ + Execute a single tasks in parallel using the message passing interface (MPI). + + Args: + future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process + cores (int): defines the total number of MPI ranks to use + spawner (BaseSpawner): Spawner to start process on selected compute resources + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + init_function (Callable): optional function to preset arguments for functions which are submitted later + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be + overwritten by setting the cache_key. + queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions + submitted to the Executor. + worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource + distribution. + """ + _execute_task_dict( + task_dict=task_dict, + interface=interface_bootup( + command_lst=get_interactive_execute_command( + cores=cores, + ), + connections=spawner(cores=cores, **kwargs), + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + worker_id=worker_id, + ), + cache_directory=cache_directory, + cache_key=cache_key, + error_log_file=error_log_file, + ) + + +def _execute_task_dict( + task_dict: dict, + interface: SocketInterface, + cache_directory: Optional[str] = None, + cache_key: Optional[str] = None, + error_log_file: Optional[str] = None, +): + """ + Execute the task in the task_dict by communicating it via the interface. + + Args: + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}} + interface (SocketInterface): socket interface for zmq communication + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be + overwritten by setting the cache_key. + error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions + submitted to the Executor. + """ + if error_log_file is not None: + task_dict["error_log_file"] = error_log_file + if cache_directory is None: + _execute_task_without_cache(interface=interface, task_dict=task_dict) + else: + _execute_task_with_cache( + interface=interface, + task_dict=task_dict, + cache_directory=cache_directory, + cache_key=cache_key, + ) + + def _execute_task_without_cache(interface: SocketInterface, task_dict: dict): """ Execute the task in the task_dict by communicating it via the interface. diff --git a/tests/test_fluxpythonspawner.py b/tests/test_fluxpythonspawner.py index 01f1d160..6235f0a9 100644 --- a/tests/test_fluxpythonspawner.py +++ b/tests/test_fluxpythonspawner.py @@ -5,7 +5,7 @@ import numpy as np -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler from executorlib.standalone.serialize import cloudpickle_register @@ -112,7 +112,7 @@ def test_execute_task(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, flux_executor=self.flux_executor, @@ -127,7 +127,7 @@ def test_execute_task_threads(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, threads_per_core=1, diff --git a/tests/test_mpiexecspawner.py b/tests/test_mpiexecspawner.py index 9a9b861f..9ebf02de 100644 --- a/tests/test_mpiexecspawner.py +++ b/tests/test_mpiexecspawner.py @@ -10,7 +10,7 @@ from executorlib.task_scheduler.base import TaskSchedulerBase from executorlib.standalone.interactive.spawner import MpiExecSpawner -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler from executorlib.standalone.interactive.backend import call_funct @@ -261,7 +261,7 @@ def test_execute_task(self): q.put({"fn": get_global, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -443,7 +443,7 @@ def test_execute_task_failed_no_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -459,7 +459,7 @@ def test_execute_task_failed_wrong_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {"j": 4}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -475,7 +475,7 @@ def test_execute_task(self): q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -493,7 +493,7 @@ def test_execute_task_parallel(self): q.put({"fn": calc_array, "args": (), "kwargs": {"i": 2}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=2, openmpi_oversubscribe=False, @@ -516,7 +516,7 @@ def test_execute_task_cache(self): q.put({"fn": calc, "args": (), "kwargs": {"i": 1}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, @@ -535,7 +535,7 @@ def test_execute_task_cache_failed_no_argument(self): q.put({"fn": calc_array, "args": (), "kwargs": {}, "future": f}) q.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) - execute_tasks( + execute_multiple_tasks( future_queue=q, cores=1, openmpi_oversubscribe=False, diff --git a/tests/test_singlenodeexecutor_shell_executor.py b/tests/test_singlenodeexecutor_shell_executor.py index df97ecd2..9dae5e99 100644 --- a/tests/test_singlenodeexecutor_shell_executor.py +++ b/tests/test_singlenodeexecutor_shell_executor.py @@ -5,7 +5,7 @@ from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -32,7 +32,7 @@ def test_execute_single_task(self): test_queue.put({"shutdown": True, "wait": True}) cloudpickle_register(ind=1) self.assertFalse(f.done()) - execute_tasks( + execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, @@ -58,7 +58,7 @@ def test_wrong_error(self): ) cloudpickle_register(ind=1) with self.assertRaises(TypeError): - execute_tasks( + execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, @@ -85,7 +85,7 @@ def test_broken_executable(self): ) cloudpickle_register(ind=1) with self.assertRaises(FileNotFoundError): - execute_tasks( + execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False, diff --git a/tests/test_singlenodeexecutor_shell_interactive.py b/tests/test_singlenodeexecutor_shell_interactive.py index 0adc54bf..ed1f4f68 100644 --- a/tests/test_singlenodeexecutor_shell_interactive.py +++ b/tests/test_singlenodeexecutor_shell_interactive.py @@ -6,7 +6,7 @@ from executorlib import SingleNodeExecutor from executorlib.standalone.serialize import cloudpickle_register -from executorlib.task_scheduler.interactive.shared import execute_tasks +from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -88,7 +88,7 @@ def test_execute_single_task(self): cloudpickle_register(ind=1) self.assertFalse(future_lines.done()) self.assertFalse(future_pattern.done()) - execute_tasks( + execute_multiple_tasks( future_queue=test_queue, cores=1, openmpi_oversubscribe=False,