From c65100b29f23a47d0e9b2b1a9fd909b2693d7f3d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 29 Oct 2024 16:32:07 +0100 Subject: [PATCH] Add pysqa command which raises an exception (#465) * Add pysqa command which raises an exception * fix type hints * fix tests * another test * more fixes * add shell option --- executorlib/standalone/cache/queue.py | 45 +++++++++++++++++++++++++-- tests/test_pysqa_subprocess.py | 45 +++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 tests/test_pysqa_subprocess.py diff --git a/executorlib/standalone/cache/queue.py b/executorlib/standalone/cache/queue.py index 9975df0d..15990658 100644 --- a/executorlib/standalone/cache/queue.py +++ b/executorlib/standalone/cache/queue.py @@ -1,5 +1,6 @@ import os -from typing import List, Optional +import subprocess +from typing import List, Optional, Union from pysqa import QueueAdapter @@ -29,7 +30,11 @@ def execute_with_pysqa( """ if resource_dict is None: resource_dict = {"cwd": "."} - qa = QueueAdapter(directory=config_directory, queue_type=backend) + qa = QueueAdapter( + directory=config_directory, + queue_type=backend, + execute_command=_pysqa_execute_command, + ) submit_kwargs = { "command": " ".join(command), "dependency_list": [str(qid) for qid in task_dependent_lst], @@ -47,3 +52,39 @@ def execute_with_pysqa( del resource_dict[k] submit_kwargs.update(resource_dict) return qa.submit_job(**submit_kwargs) + + +def _pysqa_execute_command( + commands: str, + working_directory: Optional[str] = None, + split_output: bool = True, + shell: bool = False, + error_filename: str = "pysqa.err", +) -> Union[str, List[str]]: + """ + A wrapper around the subprocess.check_output function. Modified from pysqa to raise an exception if the subprocess + fails to submit the job to the queue. + + Args: + commands (str): The command(s) to be executed on the command line + working_directory (str, optional): The directory where the command is executed. Defaults to None. + split_output (bool, optional): Boolean flag to split newlines in the output. Defaults to True. + shell (bool, optional): Additional switch to convert commands to a single string. Defaults to False. + error_filename (str, optional): In case the execution fails, the output is written to this file. Defaults to "pysqa.err". + + Returns: + Union[str, List[str]]: Output of the shell command either as a string or as a list of strings + """ + if shell and isinstance(commands, list): + commands = " ".join(commands) + out = subprocess.check_output( + commands, + cwd=working_directory, + stderr=subprocess.STDOUT, + universal_newlines=True, + shell=not isinstance(commands, list), + ) + if out is not None and split_output: + return out.split("\n") + else: + return out diff --git a/tests/test_pysqa_subprocess.py b/tests/test_pysqa_subprocess.py new file mode 100644 index 00000000..f2906527 --- /dev/null +++ b/tests/test_pysqa_subprocess.py @@ -0,0 +1,45 @@ +import unittest + +try: + from executorlib.standalone.cache.queue import _pysqa_execute_command + + skip_pysqa_test = False +except ImportError: + skip_pysqa_test = True + + +@unittest.skipIf( + skip_pysqa_test, "pysqa is not installed, so the pysqa tests are skipped." +) +class TestPysqaExecuteCommand(unittest.TestCase): + def test_pysqa_execute_command_list(self): + out = _pysqa_execute_command( + commands=["echo", "test"], + working_directory=None, + split_output=True, + shell=True, + error_filename="pysqa.err", + ) + self.assertEqual(len(out), 2) + self.assertEqual("test", out[0]) + + def test_pysqa_execute_command_string(self): + out = _pysqa_execute_command( + commands="echo test", + working_directory=None, + split_output=False, + shell=False, + error_filename="pysqa.err", + ) + self.assertEqual(len(out), 5) + self.assertEqual("test\n", out) + + def test_pysqa_execute_command_fail(self): + with self.assertRaises(FileNotFoundError): + _pysqa_execute_command( + commands=["no/executable/available"], + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + )