Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions tests/test_fluxclusterexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
import importlib
import unittest
import shutil
from time import sleep

from executorlib import FluxClusterExecutor
from executorlib.standalone.serialize import cloudpickle_register

try:
import flux.job
from executorlib.task_scheduler.file.hdf import dump
from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache
from executorlib.task_scheduler.file.queue_spawner import terminate_with_pysqa, terminate_tasks_in_cache, execute_with_pysqa
from executorlib.task_scheduler.file.shared import _get_execute_command

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("EXECUTORLIB_PMIX", None)
Expand Down Expand Up @@ -59,8 +61,19 @@ def test_executor_no_cwd(self):
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
self.assertTrue(fs1.done())

def test_terminate_with_pysqa(self):
self.assertIsNone(terminate_with_pysqa(queue_id=1, backend="flux"))
def test_pysqa_interface(self):
queue_id = execute_with_pysqa(
command=_get_execute_command(
file_name="test_i.h5",
cores=1,
),
file_name="test_i.h5",
data_dict={"fn": sleep, "args": (10,)},
resource_dict={"cores": 1},
cache_directory="executorlib_cache",
backend="flux"
)
self.assertIsNone(terminate_with_pysqa(queue_id=queue_id, backend="flux"))

def test_executor_existing_files(self):
with FluxClusterExecutor(
Expand Down
Loading