Skip to content

Commit b05b2b5

Browse files
committed
refactor
1 parent b67fb12 commit b05b2b5

File tree

2 files changed

+62
-44
lines changed

2 files changed

+62
-44
lines changed

executorlib/task_scheduler/interactive/onetoone.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ def _wrap_execute_task_in_separate_process(
204204
"task_dict": task_dict,
205205
"spawner": spawner,
206206
"hostname_localhost": hostname_localhost,
207-
"init_function": None,
208207
}
209208
)
210209
process = Thread(

executorlib/task_scheduler/interactive/shared.py

Lines changed: 62 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,13 @@ def execute_multiple_tasks(
7474
future_queue.join()
7575
break
7676
elif "fn" in task_dict and "future" in task_dict:
77-
if error_log_file is not None:
78-
task_dict["error_log_file"] = error_log_file
79-
if cache_directory is None:
80-
_execute_task_without_cache(interface=interface, task_dict=task_dict)
81-
else:
82-
_execute_task_with_cache(
83-
interface=interface,
84-
task_dict=task_dict,
85-
cache_directory=cache_directory,
86-
cache_key=cache_key,
87-
)
77+
_execute_task_dict(
78+
task_dict=task_dict,
79+
interface=interface,
80+
cache_directory=cache_directory,
81+
cache_key=cache_key,
82+
error_log_file=error_log_file,
83+
)
8884
_task_done(future_queue=future_queue)
8985

9086

@@ -93,7 +89,6 @@ def execute_single_task(
9389
cores: int = 1,
9490
spawner: type[BaseSpawner] = MpiExecSpawner,
9591
hostname_localhost: Optional[bool] = None,
96-
init_function: Optional[Callable] = None,
9792
cache_directory: Optional[str] = None,
9893
cache_key: Optional[str] = None,
9994
log_obj_size: bool = False,
@@ -105,40 +100,64 @@ def execute_single_task(
105100
Execute a single tasks in parallel using the message passing interface (MPI).
106101
107102
Args:
108-
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
109-
cores (int): defines the total number of MPI ranks to use
110-
spawner (BaseSpawner): Spawner to start process on selected compute resources
111-
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
112-
context of an HPC cluster this essential to be able to communicate to an
113-
Executor running on a different compute node within the same allocation. And
114-
in principle any computer should be able to resolve that their own hostname
115-
points to the same address as localhost. Still MacOS >= 12 seems to disable
116-
this look up for security reasons. So on MacOS it is required to set this
117-
option to true
118-
init_function (Callable): optional function to preset arguments for functions which are submitted later
119-
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
120-
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
121-
overwritten by setting the cache_key.
122-
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
123-
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
124-
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
125-
submitted to the Executor.
126-
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
127-
distribution.
103+
future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process
104+
cores (int): defines the total number of MPI ranks to use
105+
spawner (BaseSpawner): Spawner to start process on selected compute resources
106+
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
107+
context of an HPC cluster this essential to be able to communicate to an
108+
Executor running on a different compute node within the same allocation. And
109+
in principle any computer should be able to resolve that their own hostname
110+
points to the same address as localhost. Still MacOS >= 12 seems to disable
111+
this look up for security reasons. So on MacOS it is required to set this
112+
option to true
113+
init_function (Callable): optional function to preset arguments for functions which are submitted later
114+
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
115+
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
116+
overwritten by setting the cache_key.
117+
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
118+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
119+
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
120+
submitted to the Executor.
121+
worker_id (int): Communicate the worker which ID was assigned to it for future reference and resource
122+
distribution.
128123
"""
129-
interface = interface_bootup(
130-
command_lst=get_interactive_execute_command(
131-
cores=cores,
124+
_execute_task_dict(
125+
task_dict=task_dict,
126+
interface=interface_bootup(
127+
command_lst=get_interactive_execute_command(
128+
cores=cores,
129+
),
130+
connections=spawner(cores=cores, **kwargs),
131+
hostname_localhost=hostname_localhost,
132+
log_obj_size=log_obj_size,
133+
worker_id=worker_id,
132134
),
133-
connections=spawner(cores=cores, **kwargs),
134-
hostname_localhost=hostname_localhost,
135-
log_obj_size=log_obj_size,
136-
worker_id=worker_id,
135+
cache_directory=cache_directory,
136+
cache_key=cache_key,
137+
error_log_file=error_log_file,
137138
)
138-
if init_function is not None:
139-
interface.send_dict(
140-
input_dict={"init": True, "fn": init_function, "args": (), "kwargs": {}}
141-
)
139+
140+
141+
def _execute_task_dict(
142+
task_dict: dict,
143+
interface: SocketInterface,
144+
cache_directory: Optional[str] = None,
145+
cache_key: Optional[str] = None,
146+
error_log_file: Optional[str] = None,
147+
):
148+
"""
149+
Execute the task in the task_dict by communicating it via the interface.
150+
151+
Args:
152+
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
153+
{"fn": Callable, "args": (), "kwargs": {}, "resource_dict": {}}
154+
interface (SocketInterface): socket interface for zmq communication
155+
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
156+
cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be
157+
overwritten by setting the cache_key.
158+
error_log_file (str): Name of the error log file to use for storing exceptions raised by the Python functions
159+
submitted to the Executor.
160+
"""
142161
if error_log_file is not None:
143162
task_dict["error_log_file"] = error_log_file
144163
if cache_directory is None:

0 commit comments

Comments
 (0)