Skip to content

Commit dcf3961

Browse files
Refactor interactive task scheduler (#798)
* Refactor interactive task scheduler * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 9b4f6a7 commit dcf3961

File tree

7 files changed

+160
-165
lines changed

7 files changed

+160
-165
lines changed

executorlib/task_scheduler/interactive/blockallocation.py

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
from threading import Thread
44
from typing import Callable, Optional
55

6+
from executorlib.standalone.command import get_interactive_execute_command
67
from executorlib.standalone.inputcheck import (
78
check_resource_dict,
89
check_resource_dict_is_empty,
910
)
11+
from executorlib.standalone.interactive.communication import interface_bootup
1012
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
1113
from executorlib.standalone.queue import cancel_items_in_queue
1214
from executorlib.task_scheduler.base import TaskSchedulerBase
13-
from executorlib.task_scheduler.interactive.shared import execute_multiple_tasks
15+
from executorlib.task_scheduler.interactive.shared import execute_task_dict, task_done
1416

1517

1618
class BlockAllocationTaskScheduler(TaskSchedulerBase):
@@ -64,7 +66,7 @@ def __init__(
6466
self._set_process(
6567
process=[
6668
Thread(
67-
target=execute_multiple_tasks,
69+
target=_execute_multiple_tasks,
6870
kwargs=executor_kwargs | {"worker_id": worker_id},
6971
)
7072
for worker_id in range(self._max_workers)
@@ -90,7 +92,7 @@ def max_workers(self, max_workers: int):
9092
elif self._max_workers < max_workers:
9193
new_process_lst = [
9294
Thread(
93-
target=execute_multiple_tasks,
95+
target=_execute_multiple_tasks,
9496
kwargs=self._process_kwargs,
9597
)
9698
for _ in range(max_workers - self._max_workers)
@@ -175,3 +177,74 @@ def _set_process(self, process: list[Thread]): # type: ignore
175177
self._process = process
176178
for process_instance in self._process:
177179
process_instance.start()
180+
181+
182+
def _execute_multiple_tasks(
183+
future_queue: queue.Queue,
184+
cores: int = 1,
185+
spawner: type[BaseSpawner] = MpiExecSpawner,
186+
hostname_localhost: Optional[bool] = None,
187+
init_function: Optional[Callable] = None,
188+
cache_directory: Optional[str] = None,
189+
cache_key: Optional[str] = None,
190+
queue_join_on_shutdown: bool = True,
191+
log_obj_size: bool = False,
192+
error_log_file: Optional[str] = None,
193+
worker_id: Optional[int] = None,
194+
**kwargs,
195+
) -> None:
196+
"""
197+
Execute a single tasks in parallel using the message passing interface (MPI).
198+
199+
Args:
200+
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
201+
cores (int): defines the total number of MPI ranks to use
202+
spawner (BaseSpawner): Spawner to start process on selected compute resources
203+
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
204+
context of an HPC cluster this essential to be able to communicate to an
205+
Executor running on a different compute node within the same allocation. And
206+
in principle any computer should be able to resolve that their own hostname
207+
points to the same address as localhost. Still MacOS >= 12 seems to disable
208+
this look up for security reasons. So on MacOS it is required to set this
209+
option to true
210+
init_function (Callable): optional function to preset arguments for functions which are submitted later
211+
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
212+
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
213+
overwritten by setting the cache_key.
214+
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
215+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
216+
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
217+
submitted to the Executor.
218+
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
219+
distribution.
220+
"""
221+
interface = interface_bootup(
222+
command_lst=get_interactive_execute_command(
223+
cores=cores,
224+
),
225+
connections=spawner(cores=cores, **kwargs),
226+
hostname_localhost=hostname_localhost,
227+
log_obj_size=log_obj_size,
228+
worker_id=worker_id,
229+
)
230+
if init_function is not None:
231+
interface.send_dict(
232+
input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
233+
)
234+
while True:
235+
task_dict = future_queue.get()
236+
if "shutdown" in task_dict and task_dict["shutdown"]:
237+
interface.shutdown(wait=task_dict["wait"])
238+
task_done(future_queue=future_queue)
239+
if queue_join_on_shutdown:
240+
future_queue.join()
241+
break
242+
elif "fn" in task_dict and "future" in task_dict:
243+
execute_task_dict(
244+
task_dict=task_dict,
245+
interface=interface,
246+
cache_directory=cache_directory,
247+
cache_key=cache_key,
248+
error_log_file=error_log_file,
249+
)
250+
task_done(future_queue=future_queue)

executorlib/task_scheduler/interactive/onetoone.py

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
from threading import Thread
33
from typing import Optional
44

5+
from executorlib.standalone.command import get_interactive_execute_command
6+
from executorlib.standalone.interactive.communication import interface_bootup
57
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
68
from executorlib.task_scheduler.base import TaskSchedulerBase
7-
from executorlib.task_scheduler.interactive.shared import execute_single_task
9+
from executorlib.task_scheduler.interactive.shared import execute_task_dict
810

911

1012
class OneProcessTaskScheduler(TaskSchedulerBase):
@@ -60,13 +62,13 @@ def __init__(
6062
self._process_kwargs = executor_kwargs
6163
self._set_process(
6264
Thread(
63-
target=_execute_task_in_separate_process,
65+
target=_execute_single_task,
6466
kwargs=executor_kwargs,
6567
)
6668
)
6769

6870

69-
def _execute_task_in_separate_process(
71+
def _execute_single_task(
7072
future_queue: queue.Queue,
7173
spawner: type[BaseSpawner] = MpiExecSpawner,
7274
max_cores: Optional[int] = None,
@@ -166,7 +168,6 @@ def _wrap_execute_task_in_separate_process(
166168
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
167169
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
168170
active_task_dict (dict): Dictionary containing the future objects and the number of cores they require
169-
qtask (queue.Queue): Queue to communicate with the thread linked to the process executing the python function
170171
spawner (BaseSpawner): Interface to start process on selected compute resources
171172
executor_kwargs (dict): keyword parameters used to initialize the Executor
172173
max_cores (int): defines the number cores which can be used in parallel
@@ -207,8 +208,61 @@ def _wrap_execute_task_in_separate_process(
207208
}
208209
)
209210
process = Thread(
210-
target=execute_single_task,
211+
target=_execute_task_in_thread,
211212
kwargs=task_kwargs,
212213
)
213214
process.start()
214215
return process, active_task_dict
216+
217+
218+
def _execute_task_in_thread(
219+
task_dict: dict,
220+
cores: int = 1,
221+
spawner: type[BaseSpawner] = MpiExecSpawner,
222+
hostname_localhost: Optional[bool] = None,
223+
cache_directory: Optional[str] = None,
224+
cache_key: Optional[str] = None,
225+
log_obj_size: bool = False,
226+
error_log_file: Optional[str] = None,
227+
worker_id: Optional[int] = None,
228+
**kwargs,
229+
) -> None:
230+
"""
231+
Execute a single tasks in parallel using the message passing interface (MPI).
232+
233+
Args:
234+
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
235+
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
236+
cores (int): defines the total number of MPI ranks to use
237+
spawner (BaseSpawner): Spawner to start process on selected compute resources
238+
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
239+
context of an HPC cluster this essential to be able to communicate to an
240+
Executor running on a different compute node within the same allocation. And
241+
in principle any computer should be able to resolve that their own hostname
242+
points to the same address as localhost. Still MacOS >= 12 seems to disable
243+
this look up for security reasons. So on MacOS it is required to set this
244+
option to true
245+
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
246+
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
247+
overwritten by setting the cache_key.
248+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
249+
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
250+
submitted to the Executor.
251+
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
252+
distribution.
253+
"""
254+
execute_task_dict(
255+
task_dict=task_dict,
256+
interface=interface_bootup(
257+
command_lst=get_interactive_execute_command(
258+
cores=cores,
259+
),
260+
connections=spawner(cores=cores, **kwargs),
261+
hostname_localhost=hostname_localhost,
262+
log_obj_size=log_obj_size,
263+
worker_id=worker_id,
264+
),
265+
cache_directory=cache_directory,
266+
cache_key=cache_key,
267+
error_log_file=error_log_file,
268+
)

executorlib/task_scheduler/interactive/shared.py

Lines changed: 8 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -2,143 +2,13 @@
22
import os
33
import queue
44
import time
5-
from typing import Callable, Optional
5+
from typing import Optional
66

7-
from executorlib.standalone.command import get_interactive_execute_command
8-
from executorlib.standalone.interactive.communication import (
9-
SocketInterface,
10-
interface_bootup,
11-
)
12-
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
7+
from executorlib.standalone.interactive.communication import SocketInterface
138
from executorlib.standalone.serialize import serialize_funct
149

1510

16-
def execute_multiple_tasks(
17-
future_queue: queue.Queue,
18-
cores: int = 1,
19-
spawner: type[BaseSpawner] = MpiExecSpawner,
20-
hostname_localhost: Optional[bool] = None,
21-
init_function: Optional[Callable] = None,
22-
cache_directory: Optional[str] = None,
23-
cache_key: Optional[str] = None,
24-
queue_join_on_shutdown: bool = True,
25-
log_obj_size: bool = False,
26-
error_log_file: Optional[str] = None,
27-
worker_id: Optional[int] = None,
28-
**kwargs,
29-
) -> None:
30-
"""
31-
Execute a single tasks in parallel using the message passing interface (MPI).
32-
33-
Args:
34-
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
35-
cores (int): defines the total number of MPI ranks to use
36-
spawner (BaseSpawner): Spawner to start process on selected compute resources
37-
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
38-
context of an HPC cluster this essential to be able to communicate to an
39-
Executor running on a different compute node within the same allocation. And
40-
in principle any computer should be able to resolve that their own hostname
41-
points to the same address as localhost. Still MacOS >= 12 seems to disable
42-
this look up for security reasons. So on MacOS it is required to set this
43-
option to true
44-
init_function (Callable): optional function to preset arguments for functions which are submitted later
45-
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
46-
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
47-
overwritten by setting the cache_key.
48-
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
49-
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
50-
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
51-
submitted to the Executor.
52-
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
53-
distribution.
54-
"""
55-
interface = interface_bootup(
56-
command_lst=get_interactive_execute_command(
57-
cores=cores,
58-
),
59-
connections=spawner(cores=cores, **kwargs),
60-
hostname_localhost=hostname_localhost,
61-
log_obj_size=log_obj_size,
62-
worker_id=worker_id,
63-
)
64-
if init_function is not None:
65-
interface.send_dict(
66-
input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
67-
)
68-
while True:
69-
task_dict = future_queue.get()
70-
if "shutdown" in task_dict and task_dict["shutdown"]:
71-
interface.shutdown(wait=task_dict["wait"])
72-
_task_done(future_queue=future_queue)
73-
if queue_join_on_shutdown:
74-
future_queue.join()
75-
break
76-
elif "fn" in task_dict and "future" in task_dict:
77-
_execute_task_dict(
78-
task_dict=task_dict,
79-
interface=interface,
80-
cache_directory=cache_directory,
81-
cache_key=cache_key,
82-
error_log_file=error_log_file,
83-
)
84-
_task_done(future_queue=future_queue)
85-
86-
87-
def execute_single_task(
88-
task_dict: dict,
89-
cores: int = 1,
90-
spawner: type[BaseSpawner] = MpiExecSpawner,
91-
hostname_localhost: Optional[bool] = None,
92-
cache_directory: Optional[str] = None,
93-
cache_key: Optional[str] = None,
94-
log_obj_size: bool = False,
95-
error_log_file: Optional[str] = None,
96-
worker_id: Optional[int] = None,
97-
**kwargs,
98-
) -> None:
99-
"""
100-
Execute a single tasks in parallel using the message passing interface (MPI).
101-
102-
Args:
103-
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
104-
cores (int): defines the total number of MPI ranks to use
105-
spawner (BaseSpawner): Spawner to start process on selected compute resources
106-
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
107-
context of an HPC cluster this essential to be able to communicate to an
108-
Executor running on a different compute node within the same allocation. And
109-
in principle any computer should be able to resolve that their own hostname
110-
points to the same address as localhost. Still MacOS >= 12 seems to disable
111-
this look up for security reasons. So on MacOS it is required to set this
112-
option to true
113-
init_function (Callable): optional function to preset arguments for functions which are submitted later
114-
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
115-
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
116-
overwritten by setting the cache_key.
117-
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
118-
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
119-
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
120-
submitted to the Executor.
121-
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
122-
distribution.
123-
"""
124-
_execute_task_dict(
125-
task_dict=task_dict,
126-
interface=interface_bootup(
127-
command_lst=get_interactive_execute_command(
128-
cores=cores,
129-
),
130-
connections=spawner(cores=cores, **kwargs),
131-
hostname_localhost=hostname_localhost,
132-
log_obj_size=log_obj_size,
133-
worker_id=worker_id,
134-
),
135-
cache_directory=cache_directory,
136-
cache_key=cache_key,
137-
error_log_file=error_log_file,
138-
)
139-
140-
141-
def _execute_task_dict(
11+
def execute_task_dict(
14212
task_dict: dict,
14313
interface: SocketInterface,
14414
cache_directory: Optional[str] = None,
@@ -171,6 +41,11 @@ def _execute_task_dict(
17141
)
17242

17343

44+
def task_done(future_queue: queue.Queue):
45+
with contextlib.suppress(ValueError):
46+
future_queue.task_done()
47+
48+
17449
def _execute_task_without_cache(interface: SocketInterface, task_dict: dict):
17550
"""
17651
Execute the task in the task_dict by communicating it via the interface.
@@ -233,8 +108,3 @@ def _execute_task_with_cache(
233108
_, _, result = get_output(file_name=file_name)
234109
future = task_dict["future"]
235110
future.set_result(result)
236-
237-
238-
def _task_done(future_queue: queue.Queue):
239-
with contextlib.suppress(ValueError):
240-
future_queue.task_done()

0 commit comments

Comments
 (0)