Skip to content

Commit

Permalink
Queuing system submission: check if the job is already waiting in the…
Browse files Browse the repository at this point in the history
… queue or currently running. (#499)

* Queuing system submission: check if the job is already waiting in the queue or currently running.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update shared.py

* fix tests

* Add tests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* test without working directoru parameter

* more fixes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* more tests

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Nov 15, 2024
1 parent 76ccca9 commit c1b8b22
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 43 deletions.
4 changes: 2 additions & 2 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from executorlib.base.executor import ExecutorBase
from executorlib.cache.shared import execute_tasks_h5
from executorlib.standalone.cache.spawner import (
from executorlib.cache.subprocess_spawner import (
execute_in_subprocess,
terminate_subprocess,
)
Expand All @@ -17,7 +17,7 @@
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.standalone.cache.queue import execute_with_pysqa
from executorlib.cache.queue_spawner import execute_with_pysqa
except ImportError:
# If pysqa is not available fall back to executing tasks in a subprocess
execute_with_pysqa = execute_in_subprocess
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import os
import subprocess
from typing import List, Optional, Union
from typing import List, Optional, Tuple, Union

from pysqa import QueueAdapter

from executorlib.standalone.hdf import dump, get_queue_id
from executorlib.standalone.inputcheck import check_file_exists


def execute_with_pysqa(
command: str,
resource_dict: dict,
task_dependent_lst: List[int] = [],
command: list,
task_dependent_lst: list[int] = [],
file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
cache_directory: Optional[str] = None,
) -> int:
) -> Tuple[int, int]:
"""
Execute a command by submitting it to the queuing system
Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
file_name (str): Name of the HDF5 file which contains the Python function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cwd: None,
Expand All @@ -30,37 +35,42 @@ def execute_with_pysqa(
Returns:
int: queuing system ID
"""
if resource_dict is None:
resource_dict = {}
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
cwd = resource_dict["cwd"]
else:
cwd = cache_directory
check_file_exists(file_name=file_name)
queue_id = get_queue_id(file_name=file_name)
qa = QueueAdapter(
directory=config_directory,
queue_type=backend,
execute_command=_pysqa_execute_command,
)
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
}
if "cwd" in resource_dict:
del resource_dict["cwd"]
unsupported_keys = [
"threads_per_core",
"gpus_per_core",
"openmpi_oversubscribe",
"slurm_cmd_args",
]
for k in unsupported_keys:
if k in resource_dict:
del resource_dict[k]
if "job_name" not in resource_dict:
resource_dict["job_name"] = "pysqa"
submit_kwargs.update(resource_dict)
return qa.submit_job(**submit_kwargs)
if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None:
if resource_dict is None:
resource_dict = {}
if "cwd" in resource_dict and resource_dict["cwd"] is not None:
cwd = resource_dict["cwd"]
else:
cwd = cache_directory
submit_kwargs = {
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
}
if "cwd" in resource_dict:
del resource_dict["cwd"]
unsupported_keys = [
"threads_per_core",
"gpus_per_core",
"openmpi_oversubscribe",
"slurm_cmd_args",
]
for k in unsupported_keys:
if k in resource_dict:
del resource_dict[k]
if "job_name" not in resource_dict:
resource_dict["job_name"] = "pysqa"
submit_kwargs.update(resource_dict)
queue_id = qa.submit_job(**submit_kwargs)
dump(file_name=file_name, data_dict={"queue_id": queue_id})
return queue_id


def _pysqa_execute_command(
Expand Down
1 change: 1 addition & 0 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def execute_tasks_h5(
file_name=file_name,
cores=task_resource_dict["cores"],
),
file_name=file_name,
task_dependent_lst=task_dependent_lst,
resource_dict=task_resource_dict,
config_directory=pysqa_config_directory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import time
from typing import Optional

from executorlib.standalone.inputcheck import check_file_exists


def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
Expand All @@ -17,6 +20,7 @@ def execute_in_subprocess(
Args:
command (list): The command to be executed.
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
file_name (str): Name of the HDF5 file which contains the Python function
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
Example resource dictionary: {
cwd: None,
Expand All @@ -29,6 +33,7 @@ def execute_in_subprocess(
subprocess.Popen: The subprocess object.
"""
check_file_exists(file_name=file_name)
while len(task_dependent_lst) > 0:
task_dependent_lst = [
task for task in task_dependent_lst if task.poll() is None
Expand Down
Empty file.
11 changes: 10 additions & 1 deletion executorlib/standalone/hdf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple
from typing import Optional, Tuple

import cloudpickle
import h5py
Expand All @@ -18,6 +18,7 @@ def dump(file_name: str, data_dict: dict) -> None:
"args": "input_args",
"kwargs": "input_kwargs",
"output": "output",
"queue_id": "queue_id",
}
with h5py.File(file_name, "a") as fname:
for data_key, data_value in data_dict.items():
Expand Down Expand Up @@ -70,3 +71,11 @@ def get_output(file_name: str) -> Tuple[bool, object]:
return True, cloudpickle.loads(np.void(hdf["/output"]))
else:
return False, None


def get_queue_id(file_name: str) -> Optional[int]:
with h5py.File(file_name, "r") as hdf:
if "queue_id" in hdf:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
else:
return None
8 changes: 8 additions & 0 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import multiprocessing
import os.path
from concurrent.futures import Executor
from typing import Callable, List, Optional

Expand Down Expand Up @@ -188,3 +189,10 @@ def validate_number_of_cores(
elif max_cores is not None and max_workers is None:
max_workers = int(max_cores / cores_per_worker)
return max_workers


def check_file_exists(file_name: str):
if file_name is None:
raise ValueError("file_name is not set.")
if not os.path.exists(file_name):
raise ValueError("file_name is not written to the file system.")
2 changes: 1 addition & 1 deletion tests/test_cache_executor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import execute_in_subprocess
from executorlib.cache.subprocess_spawner import execute_in_subprocess


try:
Expand Down
18 changes: 13 additions & 5 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import (
from executorlib.cache.subprocess_spawner import (
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.cache.executor import FileExecutor
from executorlib.cache.executor import FileExecutor, create_file_executor
from executorlib.cache.shared import execute_tasks_h5

skip_h5py_test = False
Expand Down Expand Up @@ -46,6 +46,12 @@ def test_executor_dependence_mixed(self):
self.assertEqual(fs2.result(), 4)
self.assertTrue(fs2.done())

def test_create_file_executor_error(self):
with self.assertRaises(ValueError):
create_file_executor(block_allocation=True)
with self.assertRaises(ValueError):
create_file_executor(init_function=True)

def test_executor_dependence_error(self):
with self.assertRaises(ValueError):
with FileExecutor(
Expand Down Expand Up @@ -163,7 +169,7 @@ def test_executor_function_dependence_args(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"resource_dict": {"cores": 1, "cwd": None},
"resource_dict": {"cores": 1},
"terminate_function": terminate_subprocess,
},
)
Expand All @@ -176,9 +182,11 @@ def test_executor_function_dependence_args(self):

def test_execute_in_subprocess_errors(self):
with self.assertRaises(ValueError):
execute_in_subprocess(command=[], config_directory="test")
execute_in_subprocess(
file_name=__file__, command=[], config_directory="test"
)
with self.assertRaises(ValueError):
execute_in_subprocess(command=[], backend="flux")
execute_in_subprocess(file_name=__file__, command=[], backend="flux")

def tearDown(self):
if os.path.exists("cache"):
Expand Down
24 changes: 22 additions & 2 deletions tests/test_cache_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


try:
from executorlib.standalone.hdf import dump, load, get_output
from executorlib.standalone.hdf import dump, load, get_output, get_queue_id

skip_h5py_test = False
except ImportError:
Expand Down Expand Up @@ -60,12 +60,32 @@ def test_hdf_kwargs(self):
b = 2
dump(
file_name=file_name,
data_dict={"fn": my_funct, "args": (), "kwargs": {"a": a, "b": b}},
data_dict={
"fn": my_funct,
"args": (),
"kwargs": {"a": a, "b": b},
"queue_id": 123,
},
)
data_dict = load(file_name=file_name)
self.assertTrue("fn" in data_dict.keys())
self.assertEqual(data_dict["args"], ())
self.assertEqual(data_dict["kwargs"], {"a": a, "b": b})
self.assertEqual(get_queue_id(file_name=file_name), 123)
flag, output = get_output(file_name=file_name)
self.assertFalse(flag)
self.assertIsNone(output)

def test_hdf_queue_id(self):
cache_directory = os.path.abspath("cache")
os.makedirs(cache_directory, exist_ok=True)
file_name = os.path.join(cache_directory, "test_queue.h5")
queue_id = 123
dump(
file_name=file_name,
data_dict={"queue_id": queue_id},
)
self.assertEqual(get_queue_id(file_name=file_name), 123)
flag, output = get_output(file_name=file_name)
self.assertFalse(flag)
self.assertIsNone(output)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pysqa_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest

try:
from executorlib.standalone.cache.queue import _pysqa_execute_command
from executorlib.cache.queue_spawner import _pysqa_execute_command

skip_pysqa_test = False
except ImportError:
Expand Down
7 changes: 7 additions & 0 deletions tests/test_shared_input_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
check_max_workers_and_cores,
check_hostname_localhost,
check_pysqa_config_directory,
check_file_exists,
validate_number_of_cores,
)

Expand Down Expand Up @@ -97,6 +98,12 @@ def test_check_pysqa_config_directory(self):
with self.assertRaises(ValueError):
check_pysqa_config_directory(pysqa_config_directory="path/to/config")

def test_check_file_exists(self):
with self.assertRaises(ValueError):
check_file_exists(file_name=None)
with self.assertRaises(ValueError):
check_file_exists(file_name="/path/does/not/exist")

def test_validate_number_of_cores(self):
with self.assertRaises(ValueError):
validate_number_of_cores(
Expand Down

0 comments on commit c1b8b22

Please sign in to comment.