Skip to content

Commit 71bb2fb

Browse files
Add terminate_tasks_in_cache() function to kill remaining tasks (#739)
* Add terminate_tasks_in_cache() function to kill remaining tasks * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Remove unused variable * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix type hint * Add Docstrings * Add flux based tests * fix tests * functions need to be named test --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 5836485 commit 71bb2fb

File tree

3 files changed

+48
-2
lines changed

3 files changed

+48
-2
lines changed

executorlib/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,11 @@
3636
"SlurmClusterExecutor",
3737
]
3838

39+
try:
40+
from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache
41+
42+
__all__ += ["terminate_tasks_in_cache"]
43+
except ImportError:
44+
pass
45+
3946
__version__ = _version.__version__

executorlib/task_scheduler/file/queue_spawner.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def execute_with_pysqa(
3333
cwd: None,
3434
}
3535
config_directory (str, optional): path to the config directory.
36-
backend (str, optional): name of the backend used to spawn tasks.
36+
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
3737
3838
Returns:
3939
int: queuing system ID
@@ -102,7 +102,7 @@ def terminate_with_pysqa(
102102
Args:
103103
queue_id (int): Queuing system ID of the job to delete.
104104
config_directory (str, optional): path to the config directory.
105-
backend (str, optional): name of the backend used to spawn tasks.
105+
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
106106
"""
107107
qa = QueueAdapter(
108108
directory=config_directory,
@@ -115,6 +115,33 @@ def terminate_with_pysqa(
115115
qa.delete_job(process_id=queue_id)
116116

117117

118+
def terminate_tasks_in_cache(
119+
cache_directory: str,
120+
config_directory: Optional[str] = None,
121+
backend: Optional[str] = None,
122+
):
123+
"""
124+
Delete all jobs stored in the cache directory from the queuing system
125+
126+
Args:
127+
cache_directory (str): The directory to store cache files.
128+
config_directory (str, optional): path to the config directory.
129+
backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"].
130+
"""
131+
hdf5_file_lst = []
132+
for root, _, files in os.walk(cache_directory):
133+
hdf5_file_lst += [os.path.join(root, f) for f in files if f[-5:] == "_i.h5"]
134+
135+
for f in hdf5_file_lst:
136+
queue_id = get_queue_id(f)
137+
if queue_id is not None:
138+
terminate_with_pysqa(
139+
queue_id=queue_id,
140+
config_directory=config_directory,
141+
backend=backend,
142+
)
143+
144+
118145
def _pysqa_execute_command(
119146
commands: str,
120147
working_directory: Optional[str] = None,

tests/test_fluxclusterexecutor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
try:
1010
import flux.job
1111
from executorlib.task_scheduler.file.hdf import dump
12+
from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache
1213

1314
skip_flux_test = "FLUX_URI" not in os.environ
1415
pmi = os.environ.get("EXECUTORLIB_PMIX", None)
@@ -58,6 +59,9 @@ def test_executor_no_cwd(self):
5859
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
5960
self.assertTrue(fs1.done())
6061

62+
def test_terminate_with_pysqa(self):
63+
self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux"))
64+
6165
def test_executor_existing_files(self):
6266
with FluxClusterExecutor(
6367
resource_dict={"cores": 2, "cwd": "executorlib_cache"},
@@ -89,5 +93,13 @@ def test_executor_existing_files(self):
8993
self.assertTrue(fs1.done())
9094
self.assertEqual(len(os.listdir("executorlib_cache")), 4)
9195

96+
def test_terminate_tasks_in_cache(self):
97+
file = os.path.join("executorlib_cache", "test_i.h5")
98+
dump(file_name=file, data_dict={"queue_id": 1})
99+
self.assertIsNone(terminate_tasks_in_cache(
100+
cache_directory="executorlib_cache",
101+
backend="flux",
102+
))
103+
92104
def tearDown(self):
93105
shutil.rmtree("executorlib_cache", ignore_errors=True)

0 commit comments

Comments
 (0)