|
2 | 2 | import importlib |
3 | 3 | import unittest |
4 | 4 | import shutil |
| 5 | +from time import sleep |
5 | 6 |
|
6 | 7 | from executorlib import FluxClusterExecutor |
7 | 8 | from executorlib.standalone.serialize import cloudpickle_register |
8 | 9 |
|
9 | 10 | try: |
10 | 11 | import flux.job |
11 | 12 | from executorlib.task_scheduler.file.hdf import dump |
12 | | - from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache |
| 13 | + from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache, execute_with_pysqa |
| 14 | + from executorlib.task_scheduler.file.shared import _get_execute_command |
13 | 15 |
|
14 | 16 | skip_flux_test = "FLUX_URI" not in os.environ |
15 | 17 | pmi = os.environ.get("EXECUTORLIB_PMIX", None) |
@@ -59,8 +61,19 @@ def test_executor_no_cwd(self): |
59 | 61 | self.assertEqual(len(os.listdir("executorlib_cache")), 2) |
60 | 62 | self.assertTrue(fs1.done()) |
61 | 63 |
|
62 | | - def test_terminate_with_pysqa(self): |
63 | | - self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux")) |
| 64 | + def test_pysqa_interface(self): |
| 65 | + queue_id = execute_with_pysqa( |
| 66 | + command=_get_execute_command( |
| 67 | + file_name="test_i.h5", |
| 68 | + cores=1, |
| 69 | + ), |
| 70 | + file_name="test_i.h5", |
| 71 | + data_dict={"fn": sleep, "args": (10,)}, |
| 72 | + resource_dict={"cores": 1}, |
| 73 | + cache_directory="executorlib_cache", |
| 74 | + backend="flux" |
| 75 | + ) |
| 76 | + self.assertIsNone(terminate_with_pysqa(queue_id=queue_id, backend="flux")) |
64 | 77 |
|
65 | 78 | def test_executor_existing_files(self): |
66 | 79 | with FluxClusterExecutor( |
|
0 commit comments