Skip to content

Commit 9739df8

Browse files
committed
Merge remote-tracking branch 'origin/main' into core_limit
2 parents f8f69f6 + d85a615 commit 9739df8

File tree

4 files changed

+25
-3
lines changed

4 files changed

+25
-3
lines changed

executorlib/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ def __new__(
195195
hostname_localhost=hostname_localhost,
196196
block_allocation=block_allocation,
197197
init_function=init_function,
198+
disable_dependencies=disable_dependencies,
198199
)
199200
elif not disable_dependencies:
200201
_check_pysqa_config_directory(pysqa_config_directory=pysqa_config_directory)

executorlib/cache/executor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(
3232
terminate_function: Optional[callable] = None,
3333
pysqa_config_directory: Optional[str] = None,
3434
backend: Optional[str] = None,
35+
disable_dependencies: bool = False,
3536
):
3637
"""
3738
Initialize the FileExecutor.
@@ -45,6 +46,7 @@ def __init__(
4546
terminate_function (callable, optional): The function to terminate the tasks.
4647
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
4748
backend (str, optional): name of the backend used to spawn tasks.
49+
disable_dependencies (boolean): Disable resolving future objects during the submission.
4850
"""
4951
super().__init__()
5052
default_resource_dict = {
@@ -71,6 +73,7 @@ def __init__(
7173
"terminate_function": terminate_function,
7274
"pysqa_config_directory": pysqa_config_directory,
7375
"backend": backend,
76+
"disable_dependencies": disable_dependencies,
7477
},
7578
)
7679
)
@@ -89,6 +92,7 @@ def create_file_executor(
8992
hostname_localhost: Optional[bool] = None,
9093
block_allocation: bool = False,
9194
init_function: Optional[callable] = None,
95+
disable_dependencies: bool = False,
9296
):
9397
if cache_directory is None:
9498
cache_directory = "executorlib_cache"
@@ -110,4 +114,5 @@ def create_file_executor(
110114
resource_dict=resource_dict,
111115
pysqa_config_directory=pysqa_config_directory,
112116
backend=backend.split("pysqa_")[-1],
117+
disable_dependencies=disable_dependencies,
113118
)

executorlib/cache/shared.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def execute_tasks_h5(
5454
terminate_function: Optional[callable] = None,
5555
pysqa_config_directory: Optional[str] = None,
5656
backend: Optional[str] = None,
57+
disable_dependencies: bool = False,
5758
) -> None:
5859
"""
5960
Execute tasks stored in a queue using HDF5 files.
@@ -111,14 +112,22 @@ def execute_tasks_h5(
111112
if task_key + ".h5out" not in os.listdir(cache_directory):
112113
file_name = os.path.join(cache_directory, task_key + ".h5in")
113114
dump(file_name=file_name, data_dict=data_dict)
115+
if not disable_dependencies:
116+
task_dependent_lst = [
117+
process_dict[k] for k in future_wait_key_lst
118+
]
119+
else:
120+
if len(future_wait_key_lst) > 0:
121+
raise ValueError(
122+
"Future objects are not supported as input if disable_dependencies=True."
123+
)
124+
task_dependent_lst = []
114125
process_dict[task_key] = execute_function(
115126
command=_get_execute_command(
116127
file_name=file_name,
117128
cores=task_resource_dict["cores"],
118129
),
119-
task_dependent_lst=[
120-
process_dict[k] for k in future_wait_key_lst
121-
],
130+
task_dependent_lst=task_dependent_lst,
122131
resource_dict=task_resource_dict,
123132
config_directory=pysqa_config_directory,
124133
backend=backend,

tests/test_cache_executor_serial.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ def test_executor_dependence_mixed(self):
4646
self.assertEqual(fs2.result(), 4)
4747
self.assertTrue(fs2.done())
4848

49+
def test_executor_dependence_error(self):
50+
with self.assertRaises(ValueError):
51+
with FileExecutor(
52+
execute_function=execute_in_subprocess, disable_dependencies=True
53+
) as exe:
54+
exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2))
55+
4956
def test_executor_working_directory(self):
5057
cwd = os.path.join(os.path.dirname(__file__), "executables")
5158
with FileExecutor(

0 commit comments

Comments
 (0)