Skip to content

Commit 3f01ead

Browse files
Refactor get_cache_execute_command() and get_interative_execute_command() (#746)
* Refactor get_cache_execute_command() and get_interative_exeute_command() * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix import --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent d49317e commit 3f01ead

File tree

4 files changed

+60
-61
lines changed

4 files changed

+60
-61
lines changed

executorlib/standalone/command.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import importlib.util
12
import os
3+
import sys
24

35

46
def get_command_path(executable: str) -> str:
@@ -12,3 +14,55 @@ def get_command_path(executable: str) -> str:
1214
str: absolute path to the executable script
1315
"""
1416
return os.path.abspath(os.path.join(__file__, "..", "..", "backend", executable))
17+
18+
19+
def get_cache_execute_command(file_name: str, cores: int = 1) -> list:
20+
"""
21+
Get command to call backend as a list of two strings
22+
23+
Args:
24+
file_name (str): The name of the file.
25+
cores (int, optional): Number of cores used to execute the task. Defaults to 1.
26+
27+
Returns:
28+
list[str]: List of strings containing the python executable path and the backend script to execute
29+
"""
30+
command_lst = [sys.executable]
31+
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
32+
command_lst = (
33+
["mpiexec", "-n", str(cores)]
34+
+ command_lst
35+
+ [get_command_path(executable="cache_parallel.py"), file_name]
36+
)
37+
elif cores > 1:
38+
raise ImportError(
39+
"mpi4py is required for parallel calculations. Please install mpi4py."
40+
)
41+
else:
42+
command_lst += [get_command_path(executable="cache_serial.py"), file_name]
43+
return command_lst
44+
45+
46+
def get_interactive_execute_command(
47+
cores: int,
48+
) -> list:
49+
"""
50+
Get command to call backend as a list of two strings
51+
52+
Args:
53+
cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py
54+
else interactive_serial.py
55+
56+
Returns:
57+
list[str]: List of strings containing the python executable path and the backend script to execute
58+
"""
59+
command_lst = [sys.executable]
60+
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
61+
command_lst += [get_command_path(executable="interactive_parallel.py")]
62+
elif cores > 1:
63+
raise ImportError(
64+
"mpi4py is required for parallel calculations. Please install mpi4py."
65+
)
66+
else:
67+
command_lst += [get_command_path(executable="interactive_serial.py")]
68+
return command_lst

executorlib/task_scheduler/file/shared.py

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import contextlib
2-
import importlib.util
32
import os
43
import queue
5-
import sys
64
from concurrent.futures import Future
75
from typing import Any, Callable, Optional
86

97
from executorlib.standalone.cache import get_cache_files
10-
from executorlib.standalone.command import get_command_path
8+
from executorlib.standalone.command import get_cache_execute_command
119
from executorlib.standalone.serialize import serialize_funct_h5
1210
from executorlib.task_scheduler.file.hdf import get_output
1311
from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess
@@ -153,7 +151,7 @@ def execute_tasks_h5(
153151
)
154152
task_dependent_lst = []
155153
process_dict[task_key] = execute_function(
156-
command=_get_execute_command(
154+
command=get_cache_execute_command(
157155
file_name=file_name,
158156
cores=task_resource_dict["cores"],
159157
),
@@ -183,33 +181,6 @@ def execute_tasks_h5(
183181
}
184182

185183

186-
def _get_execute_command(file_name: str, cores: int = 1) -> list:
187-
"""
188-
Get command to call backend as a list of two strings
189-
190-
Args:
191-
file_name (str): The name of the file.
192-
cores (int, optional): Number of cores used to execute the task. Defaults to 1.
193-
194-
Returns:
195-
list[str]: List of strings containing the python executable path and the backend script to execute
196-
"""
197-
command_lst = [sys.executable]
198-
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
199-
command_lst = (
200-
["mpiexec", "-n", str(cores)]
201-
+ command_lst
202-
+ [get_command_path(executable="cache_parallel.py"), file_name]
203-
)
204-
elif cores > 1:
205-
raise ImportError(
206-
"mpi4py is required for parallel calculations. Please install mpi4py."
207-
)
208-
else:
209-
command_lst += [get_command_path(executable="cache_serial.py"), file_name]
210-
return command_lst
211-
212-
213184
def _check_task_output(
214185
task_key: str, future_obj: Future, cache_directory: str
215186
) -> Future:

executorlib/task_scheduler/interactive/shared.py

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import contextlib
2-
import importlib.util
32
import os
43
import queue
5-
import sys
64
import time
75
from typing import Callable, Optional
86

97
from executorlib.standalone.cache import get_cache_files
10-
from executorlib.standalone.command import get_command_path
8+
from executorlib.standalone.command import get_interactive_execute_command
119
from executorlib.standalone.interactive.communication import (
1210
SocketInterface,
1311
interface_bootup,
@@ -51,7 +49,7 @@ def execute_tasks(
5149
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
5250
"""
5351
interface = interface_bootup(
54-
command_lst=_get_backend_path(
52+
command_lst=get_interactive_execute_command(
5553
cores=cores,
5654
),
5755
connections=spawner(cores=cores, **kwargs),
@@ -87,30 +85,6 @@ def execute_tasks(
8785
)
8886

8987

90-
def _get_backend_path(
91-
cores: int,
92-
) -> list:
93-
"""
94-
Get command to call backend as a list of two strings
95-
96-
Args:
97-
cores (int): Number of cores used to execute the task, if it is greater than one use interactive_parallel.py else interactive_serial.py
98-
99-
Returns:
100-
list[str]: List of strings containing the python executable path and the backend script to execute
101-
"""
102-
command_lst = [sys.executable]
103-
if cores > 1 and importlib.util.find_spec("mpi4py") is not None:
104-
command_lst += [get_command_path(executable="interactive_parallel.py")]
105-
elif cores > 1:
106-
raise ImportError(
107-
"mpi4py is required for parallel calculations. Please install mpi4py."
108-
)
109-
else:
110-
command_lst += [get_command_path(executable="interactive_serial.py")]
111-
return command_lst
112-
113-
11488
def _execute_task_without_cache(
11589
interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
11690
):

tests/test_fluxclusterexecutor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66

77
from executorlib import FluxClusterExecutor
88
from executorlib.standalone.serialize import cloudpickle_register
9+
from executorlib.standalone.command import get_cache_execute_command
910

1011
try:
1112
import flux.job
1213
from executorlib.task_scheduler.file.hdf import dump
1314
from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache, execute_with_pysqa
14-
from executorlib.task_scheduler.file.shared import _get_execute_command
1515

1616
skip_flux_test = "FLUX_URI" not in os.environ
1717
pmi = os.environ.get("EXECUTORLIB_PMIX", None)
@@ -63,7 +63,7 @@ def test_executor_no_cwd(self):
6363

6464
def test_pysqa_interface(self):
6565
queue_id = execute_with_pysqa(
66-
command=_get_execute_command(
66+
command=get_cache_execute_command(
6767
file_name="test_i.h5",
6868
cores=1,
6969
),

0 commit comments

Comments
 (0)