diff --git a/pysqa/queueadapter.py b/pysqa/queueadapter.py index 8216943..c0afe7e 100644 --- a/pysqa/queueadapter.py +++ b/pysqa/queueadapter.py @@ -1,5 +1,5 @@ import os -from typing import List, Optional +from typing import List, Optional, Tuple, Union import pandas @@ -315,7 +315,9 @@ def check_queue_parameters( run_time_max: Optional[int] = None, memory_max: Optional[int] = None, active_queue: Optional[dict] = None, - ) -> List: + ) -> Tuple[ + Union[float, int, None], Union[float, int, None], Union[float, int, None] + ]: """ Check the parameters of a queue. diff --git a/pysqa/utils/basic.py b/pysqa/utils/basic.py index b4c1b30..b21b887 100644 --- a/pysqa/utils/basic.py +++ b/pysqa/utils/basic.py @@ -1,5 +1,3 @@ -import getpass -import importlib import os from typing import List, Optional, Tuple, Union @@ -7,74 +5,13 @@ from jinja2 import Template from jinja2.exceptions import TemplateSyntaxError +from pysqa.utils.core import CoreQueueAdapter from pysqa.utils.execute import execute_command from pysqa.utils.queues import Queues from pysqa.utils.validate import value_error_if_none, value_in_range -from pysqa.wrapper.generic import SchedulerCommands - -queue_type_dict = { - "SGE": { - "class_name": "SunGridEngineCommands", - "module_name": "pysqa.wrapper.sge", - }, - "TORQUE": { - "class_name": "TorqueCommands", - "module_name": "pysqa.wrapper.torque", - }, - "SLURM": { - "class_name": "SlurmCommands", - "module_name": "pysqa.wrapper.slurm", - }, - "LSF": { - "class_name": "LsfCommands", - "module_name": "pysqa.wrapper.lsf", - }, - "MOAB": { - "class_name": "MoabCommands", - "module_name": "pysqa.wrapper.moab", - }, - "GENT": { - "class_name": "GentCommands", - "module_name": "pysqa.wrapper.gent", - }, - "REMOTE": { - "class_name": None, - "module_name": None, - }, - "FLUX": { - "class_name": "FluxCommands", - "module_name": "pysqa.wrapper.flux", - }, -} - - -def get_queue_commands(queue_type: str) -> Union[SchedulerCommands, None]: - """ - Load queuing system commands class - - Args: - queue_type (str): Type of the queuing system in capital letters - - Returns: - SchedulerCommands: queuing system commands class instance - """ - if queue_type in queue_type_dict.keys(): - class_name = queue_type_dict[queue_type]["class_name"] - module_name = queue_type_dict[queue_type]["module_name"] - if module_name is not None and class_name is not None: - return getattr(importlib.import_module(module_name), class_name)() - else: - return None - else: - raise ValueError( - "The queue_type " - + queue_type - + " is not found in the list of supported queue types " - + str(list(queue_type_dict.keys())) - ) -class BasisQueueAdapter(object): +class BasisQueueAdapter(CoreQueueAdapter): """ The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process locally. @@ -97,14 +34,15 @@ def __init__( directory: str = "~/.queues", execute_command: callable = execute_command, ): + super().__init__( + queue_type=config["queue_type"], execute_command=execute_command + ) self._config = config self._fill_queue_dict(queue_lst_dict=self._config["queues"]) self._load_templates(queue_lst_dict=self._config["queues"], directory=directory) - self._commands = get_queue_commands(queue_type=self._config["queue_type"]) self._queues = Queues(self.queue_list) self._remote_flag = False self._ssh_delete_file_on_remote = True - self._execute_command_function = execute_command @property def ssh_delete_file_on_remote(self) -> bool: @@ -168,175 +106,6 @@ def queues(self): """ return self._queues - def submit_job( - self, - queue: Optional[str] = None, - job_name: Optional[str] = None, - working_directory: Optional[str] = None, - cores: Optional[int] = None, - memory_max: Optional[int] = None, - run_time_max: Optional[int] = None, - dependency_list: Optional[List[str]] = None, - command: Optional[str] = None, - **kwargs, - ) -> Union[int, None]: - """ - Submit a job to the queue. - - Args: - queue (str/None): The queue to submit the job to. - job_name (str/None): The name of the job. - working_directory (str/None): The working directory for the job. - cores (int/None): The number of cores required for the job. - memory_max (int/None): The maximum memory required for the job. - run_time_max (int/None): The maximum run time for the job. - dependency_list (list[str]/None): List of job dependencies. - command (str/None): The command to execute for the job. - - Returns: - int: The job ID. - """ - if working_directory is not None and " " in working_directory: - raise ValueError( - "Whitespaces in the working_directory name are not supported!" - ) - working_directory, queue_script_path = self._write_queue_script( - queue=queue, - job_name=job_name, - working_directory=working_directory, - cores=cores, - memory_max=memory_max, - run_time_max=run_time_max, - command=command, - **kwargs, - ) - out = self._execute_command( - commands=self._list_command_to_be_executed( - queue_script_path=queue_script_path - ), - working_directory=working_directory, - split_output=False, - ) - if out is not None: - return self._commands.get_job_id_from_output(out) - else: - return None - - def _list_command_to_be_executed(self, queue_script_path: str) -> list: - """ - Get the list of commands to be executed. - - Args: - queue_script_path (str): The path to the queue script. - - Returns: - list: The list of commands to be executed. - """ - return self._commands.submit_job_command + [queue_script_path] - - def enable_reservation(self, process_id: int): - """ - Enable reservation for a process. - - Args: - process_id (int): The process ID. - - Returns: - str: The result of the enable reservation command. - """ - out = self._execute_command( - commands=self._commands.enable_reservation_command + [str(process_id)], - split_output=True, - ) - if out is not None: - return out[0] - else: - return None - - def delete_job(self, process_id: int) -> Union[str, None]: - """ - Delete a job. - - Args: - process_id (int): The process ID. - - Returns: - str: The result of the delete job command. - """ - out = self._execute_command( - commands=self._commands.delete_job_command + [str(process_id)], - split_output=True, - ) - if out is not None: - return out[0] - else: - return None - - def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame: - """ - Get the status of the queue. - - Args: - user (str): The user to filter the queue status for. - - Returns: - pandas.DataFrame: The queue status. - """ - out = self._execute_command( - commands=self._commands.get_queue_status_command, split_output=False - ) - df = self._commands.convert_queue_status(queue_status_output=out) - if user is None: - return df - else: - return df[df["user"] == user] - - def get_status_of_my_jobs(self) -> pandas.DataFrame: - """ - Get the status of the user's jobs. - - Returns: - pandas.DataFrame: The status of the user's jobs. - """ - return self.get_queue_status(user=self._get_user()) - - def get_status_of_job(self, process_id: int) -> Union[str, None]: - """ - Get the status of a job. - - Args: - process_id (int): The process ID. - - Returns: - str: The status of the job.results_lst.append(df_selected.values[0]) - """ - df = self.get_queue_status() - df_selected = df[df["jobid"] == process_id]["status"] - if len(df_selected) != 0: - return df_selected.values[0] - else: - return None - - def get_status_of_jobs(self, process_id_lst: List[int]) -> List[str]: - """ - Get the status of multiple jobs. - - Args: - process_id_lst (list[int]): List of process IDs. - - Returns: - list[str]: List of job statuses. - """ - df = self.get_queue_status() - results_lst = [] - for process_id in process_id_lst: - df_selected = df[df["jobid"] == process_id]["status"] - if len(df_selected) != 0: - results_lst.append(df_selected.values[0]) - else: - results_lst.append("finished") - return results_lst - def get_job_from_remote(self, working_directory: str) -> None: """ Get the results of the calculation - this is necessary when the calculation was executed on a remote host. @@ -412,59 +181,10 @@ def check_queue_parameters( ) return cores, run_time_max, memory_max - def _write_queue_script( - self, - queue: Optional[str] = None, - job_name: Optional[str] = None, - working_directory: Optional[str] = None, - cores: Optional[int] = None, - memory_max: Optional[int] = None, - run_time_max: Optional[int] = None, - dependency_list: Optional[List[int]] = None, - command: Optional[str] = None, - **kwargs, - ) -> Tuple[str, str]: - """ - Write the queue script to a file. - - Args: - queue (str/None): The queue name. - job_name (str/None): The job name. - working_directory (str/None): The working directory. - cores (int/None): The number of cores. - memory_max (int/None): The maximum memory. - run_time_max (int/None): The maximum run time. - dependency_list (list/None): The list of dependency job IDs. - command (str/None): The command to be executed. - - Returns: - Tuple[str, str]: A tuple containing the working directory and the path to the queue script file. - """ - if isinstance(command, list): - command = "".join(command) - if working_directory is None: - working_directory = "." - queue_script = self._job_submission_template( - queue=queue, - job_name=job_name, - working_directory=working_directory, - cores=cores, - memory_max=memory_max, - run_time_max=run_time_max, - dependency_list=dependency_list, - command=command, - **kwargs, - ) - if not os.path.exists(working_directory): - os.makedirs(working_directory) - queue_script_path = os.path.join(working_directory, "run_queue.sh") - with open(queue_script_path, "w") as f: - f.writelines(queue_script) - return working_directory, queue_script_path - def _job_submission_template( self, queue: Optional[str] = None, + submission_template: Optional[Union[str, Template]] = None, job_name: str = "job.py", working_directory: str = ".", cores: Optional[int] = None, @@ -508,57 +228,19 @@ def _job_submission_template( memory_max=memory_max, active_queue=active_queue, ) - template = active_queue["template"] - return template.render( + return super()._job_submission_template( + queue=None, + submission_template=active_queue["template"], job_name=job_name, working_directory=working_directory, cores=cores, memory_max=memory_max, run_time_max=run_time_max, - command=command, dependency_list=dependency_list, + command=command, **kwargs, ) - def _execute_command( - self, - commands: Union[str, List[str]], - working_directory: Optional[str] = None, - split_output: bool = True, - shell: bool = False, - error_filename: str = "pysqa.err", - ) -> str: - """ - Execute a command or a list of commands. - - Args: - commands (Union[str, List[str]]): The command(s) to be executed. - working_directory (Optional[str], optional): The working directory. Defaults to None. - split_output (bool, optional): Whether to split the output into lines. Defaults to True. - shell (bool, optional): Whether to use the shell to execute the command. Defaults to False. - error_filename (str, optional): The name of the error file. Defaults to "pysqa.err". - - Returns: - str: The output of the command(s). - """ - return self._execute_command_function( - commands=commands, - working_directory=working_directory, - split_output=split_output, - shell=shell, - error_filename=error_filename, - ) - - @staticmethod - def _get_user() -> str: - """ - Get the current user. - - Returns: - str: The current user. - """ - return getpass.getuser() - @staticmethod def _fill_queue_dict(queue_lst_dict: dict): """ diff --git a/pysqa/utils/core.py b/pysqa/utils/core.py new file mode 100644 index 0000000..087144d --- /dev/null +++ b/pysqa/utils/core.py @@ -0,0 +1,396 @@ +import getpass +import importlib +import os +from typing import List, Optional, Tuple, Union + +import pandas +from jinja2 import Template + +from pysqa.utils.execute import execute_command +from pysqa.wrapper.abstract import SchedulerCommands + +queue_type_dict = { + "SGE": { + "class_name": "SunGridEngineCommands", + "module_name": "pysqa.wrapper.sge", + }, + "TORQUE": { + "class_name": "TorqueCommands", + "module_name": "pysqa.wrapper.torque", + }, + "SLURM": { + "class_name": "SlurmCommands", + "module_name": "pysqa.wrapper.slurm", + }, + "LSF": { + "class_name": "LsfCommands", + "module_name": "pysqa.wrapper.lsf", + }, + "MOAB": { + "class_name": "MoabCommands", + "module_name": "pysqa.wrapper.moab", + }, + "GENT": { + "class_name": "GentCommands", + "module_name": "pysqa.wrapper.gent", + }, + "REMOTE": { + "class_name": None, + "module_name": None, + }, + "FLUX": { + "class_name": "FluxCommands", + "module_name": "pysqa.wrapper.flux", + }, +} + + +def get_queue_commands(queue_type: str) -> Union[SchedulerCommands, None]: + """ + Load queuing system commands class + + Args: + queue_type (str): Type of the queuing system in capital letters + + Returns: + SchedulerCommands: queuing system commands class instance + """ + if queue_type in queue_type_dict.keys(): + class_name = queue_type_dict[queue_type]["class_name"] + module_name = queue_type_dict[queue_type]["module_name"] + if module_name is not None and class_name is not None: + return getattr(importlib.import_module(module_name), class_name)() + else: + return None + else: + raise ValueError( + "The queue_type " + + queue_type + + " is not found in the list of supported queue types " + + str(list(queue_type_dict.keys())) + ) + + +class CoreQueueAdapter(object): + def __init__( + self, + queue_type: str, + execute_command: callable = execute_command, + ): + self._commands = get_queue_commands(queue_type=queue_type) + if queue_type_dict[queue_type]["module_name"] is not None: + self._submission_template = getattr( + importlib.import_module(queue_type_dict[queue_type]["module_name"]), + "template", + ) + self._execute_command_function = execute_command + + def submit_job( + self, + queue: Optional[str] = None, + job_name: Optional[str] = None, + working_directory: Optional[str] = None, + cores: Optional[int] = None, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[str]] = None, + command: Optional[str] = None, + submission_template: Optional[Union[str, Template]] = None, + **kwargs, + ) -> Union[int, None]: + """ + Submit a job to the queue. + + Args: + queue (str/None): The queue to submit the job to. + job_name (str/None): The name of the job. + working_directory (str/None): The working directory for the job. + cores (int/None): The number of cores required for the job. + memory_max (int/None): The maximum memory required for the job. + run_time_max (int/None): The maximum run time for the job. + dependency_list (list[str]/None): List of job dependencies. + command (str/None): The command to execute for the job. + + Returns: + int: The job ID. + """ + if working_directory is not None and " " in working_directory: + raise ValueError( + "Whitespaces in the working_directory name are not supported!" + ) + if submission_template is None: + submission_template = self._submission_template + working_directory, queue_script_path = self._write_queue_script( + queue=queue, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + command=command, + dependency_list=dependency_list, + submission_template=submission_template, + **kwargs, + ) + out = self._execute_command( + commands=self._list_command_to_be_executed( + queue_script_path=queue_script_path + ), + working_directory=working_directory, + split_output=False, + ) + if out is not None: + return self._commands.get_job_id_from_output(out) + else: + return None + + def enable_reservation(self, process_id: int): + """ + Enable reservation for a process. + + Args: + process_id (int): The process ID. + + Returns: + str: The result of the enable reservation command. + """ + out = self._execute_command( + commands=self._commands.enable_reservation_command + [str(process_id)], + split_output=True, + ) + if out is not None: + return out[0] + else: + return None + + def delete_job(self, process_id: int) -> Union[str, None]: + """ + Delete a job. + + Args: + process_id (int): The process ID. + + Returns: + str: The result of the delete job command. + """ + out = self._execute_command( + commands=self._commands.delete_job_command + [str(process_id)], + split_output=True, + ) + if out is not None: + return out[0] + else: + return None + + def get_queue_status(self, user: Optional[str] = None) -> pandas.DataFrame: + """ + Get the status of the queue. + + Args: + user (str): The user to filter the queue status for. + + Returns: + pandas.DataFrame: The queue status. + """ + out = self._execute_command( + commands=self._commands.get_queue_status_command, split_output=False + ) + df = self._commands.convert_queue_status(queue_status_output=out) + if user is None: + return df + else: + return df[df["user"] == user] + + def get_status_of_my_jobs(self) -> pandas.DataFrame: + """ + Get the status of the user's jobs. + + Returns: + pandas.DataFrame: The status of the user's jobs. + """ + return self.get_queue_status(user=self._get_user()) + + def get_status_of_job(self, process_id: int) -> Union[str, None]: + """ + Get the status of a job. + + Args: + process_id (int): The process ID. + + Returns: + str: The status of the job.results_lst.append(df_selected.values[0]) + """ + df = self.get_queue_status() + df_selected = df[df["jobid"] == process_id]["status"] + if len(df_selected) != 0: + return df_selected.values[0] + else: + return None + + def get_status_of_jobs(self, process_id_lst: List[int]) -> List[str]: + """ + Get the status of multiple jobs. + + Args: + process_id_lst (list[int]): List of process IDs. + + Returns: + list[str]: List of job statuses. + """ + df = self.get_queue_status() + results_lst = [] + for process_id in process_id_lst: + df_selected = df[df["jobid"] == process_id]["status"] + if len(df_selected) != 0: + results_lst.append(df_selected.values[0]) + else: + results_lst.append("finished") + return results_lst + + def _list_command_to_be_executed(self, queue_script_path: str) -> list: + """ + Get the list of commands to be executed. + + Args: + queue_script_path (str): The path to the queue script. + + Returns: + list: The list of commands to be executed. + """ + return self._commands.submit_job_command + [queue_script_path] + + def _execute_command( + self, + commands: Union[str, List[str]], + working_directory: Optional[str] = None, + split_output: bool = True, + shell: bool = False, + error_filename: str = "pysqa.err", + ) -> str: + """ + Execute a command or a list of commands. + + Args: + commands (Union[str, List[str]]): The command(s) to be executed. + working_directory (Optional[str], optional): The working directory. Defaults to None. + split_output (bool, optional): Whether to split the output into lines. Defaults to True. + shell (bool, optional): Whether to use the shell to execute the command. Defaults to False. + error_filename (str, optional): The name of the error file. Defaults to "pysqa.err". + + Returns: + str: The output of the command(s). + """ + return self._execute_command_function( + commands=commands, + working_directory=working_directory, + split_output=split_output, + shell=shell, + error_filename=error_filename, + ) + + def _write_queue_script( + self, + queue: Optional[str] = None, + submission_template: Optional[Union[str, Template]] = None, + job_name: Optional[str] = None, + working_directory: Optional[str] = None, + cores: Optional[int] = None, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[int]] = None, + command: Optional[str] = None, + **kwargs, + ) -> Tuple[str, str]: + """ + Write the queue script to a file. + + Args: + queue (str/None): The queue name. + job_name (str/None): The job name. + working_directory (str/None): The working directory. + cores (int/None): The number of cores. + memory_max (int/None): The maximum memory. + run_time_max (int/None): The maximum run time. + dependency_list (list/None): The list of dependency job IDs. + command (str/None): The command to be executed. + + Returns: + Tuple[str, str]: A tuple containing the working directory and the path to the queue script file. + """ + if isinstance(command, list): + command = "".join(command) + if working_directory is None: + working_directory = "." + queue_script = self._job_submission_template( + queue=queue, + submission_template=submission_template, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + dependency_list=dependency_list, + command=command, + **kwargs, + ) + if not os.path.exists(working_directory): + os.makedirs(working_directory) + queue_script_path = os.path.join(working_directory, "run_queue.sh") + with open(queue_script_path, "w") as f: + f.writelines(queue_script) + return working_directory, queue_script_path + + def _job_submission_template( + self, + queue: Optional[str] = None, + submission_template: Optional[Union[str, Template]] = None, + job_name: str = "job.py", + working_directory: str = ".", + cores: Optional[int] = None, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[int]] = None, + command: Optional[str] = None, + **kwargs, + ) -> str: + """ + Generate the job submission template. + + Args: + queue (str, optional): The queue name. Defaults to None. + job_name (str, optional): The job name. Defaults to "job.py". + working_directory (str, optional): The working directory. Defaults to ".". + cores (int, optional): The number of cores. Defaults to None. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. + command (str, optional): The command to be executed. Defaults to None. + + Returns: + str: The job submission template. + """ + if queue is not None: + raise ValueError() + if submission_template is None: + submission_template = self._submission_template + return self._commands.render_submission_template( + command=command, + submission_template=submission_template, + working_directory=working_directory, + job_name=job_name, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + dependency_list=dependency_list, + **kwargs, + ) + + @staticmethod + def _get_user() -> str: + """ + Get the current user. + + Returns: + str: The current user. + """ + return getpass.getuser() diff --git a/pysqa/wrapper/generic.py b/pysqa/wrapper/abstract.py similarity index 56% rename from pysqa/wrapper/generic.py rename to pysqa/wrapper/abstract.py index dc46849..be127eb 100644 --- a/pysqa/wrapper/generic.py +++ b/pysqa/wrapper/abstract.py @@ -1,9 +1,22 @@ +import os from abc import ABC, abstractmethod +from typing import List, Optional, Union import pandas +from jinja2 import Template class SchedulerCommands(ABC): + @property + def enable_reservation_command(self) -> list[str]: + """ + Returns the command to enable job reservation on the scheduler. + + Returns: + list[str]: The command to enable job reservation. + """ + raise NotImplementedError() + @property @abstractmethod def submit_job_command(self) -> list[str]: @@ -26,16 +39,6 @@ def delete_job_command(self) -> list[str]: """ pass - @property - def enable_reservation_command(self) -> list[str]: - """ - Returns the command to enable job reservation on the scheduler. - - Returns: - list[str]: The command to enable job reservation. - """ - raise NotImplementedError() - @property @abstractmethod def get_queue_status_command(self) -> list[str]: @@ -47,6 +50,47 @@ def get_queue_status_command(self) -> list[str]: """ pass + @staticmethod + def render_submission_template( + command: str, + submission_template: Union[str, Template], + job_name: str = "pysqa", + working_directory: str = os.path.abspath("."), + cores: int = 1, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[int]] = None, + **kwargs, + ) -> str: + """ + Generate the job submission template. + + Args: + command (str, optional): The command to be executed. + job_name (str, optional): The job name. Defaults to "pysqa". + working_directory (str, optional): The working directory. Defaults to ".". + cores (int, optional): The number of cores. Defaults to 1. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. + submission_template (str): Submission script template pysqa.wrapper.torque.template + + Returns: + str: The rendered job submission template. + """ + if not isinstance(submission_template, Template): + submission_template = Template(submission_template) + return submission_template.render( + command=command, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + dependency_list=dependency_list, + **kwargs, + ) + @staticmethod def dependencies(dependency_list: list[str]) -> list: """ diff --git a/pysqa/wrapper/flux.py b/pysqa/wrapper/flux.py index 0f03734..c22921a 100644 --- a/pysqa/wrapper/flux.py +++ b/pysqa/wrapper/flux.py @@ -1,7 +1,30 @@ +import os +from typing import List, Optional, Union + import pandas from flux.job import JobID +from jinja2 import Template + +from pysqa.wrapper.abstract import SchedulerCommands -from pysqa.wrapper.generic import SchedulerCommands +template = """\ +#!/bin/bash +# flux: --job-name={{job_name}} +# flux: --env=CORES={{cores}} +# flux: --output=time.out +# flux: --error=error.out +# flux: -n {{cores}} +{%- if run_time_max %} +# flux: -t {{ [1, run_time_max // 60]|max }} +{%- endif %} +{%- if dependency %} +{%- for jobid in dependency %} +# flux: --dependency=afterok:{{jobid}} +{%- endfor %} +{%- endif %} + +{{command}} +""" class FluxCommands(SchedulerCommands): @@ -50,3 +73,43 @@ def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: df.loc[df.status == "C", "status"] = "error" df.loc[df.status == "CD", "status"] = "finished" return df + + def render_submission_template( + self, + command: str, + job_name: str = "pysqa", + working_directory: str = os.path.abspath("."), + cores: int = 1, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[int]] = None, + submission_template: Union[str, Template] = template, + **kwargs, + ) -> str: + """ + Generate the job submission template. + + Args: + command (str, optional): The command to be executed. + job_name (str, optional): The job name. Defaults to "pysqa". + working_directory (str, optional): The working directory. Defaults to ".". + cores (int, optional): The number of cores. Defaults to 1. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. + submission_template (str): Submission script template pysqa.wrapper.flux.template + + Returns: + str: The rendered job submission template. + """ + return super().render_submission_template( + command=command, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + dependency_list=dependency_list, + submission_template=submission_template, + **kwargs, + ) diff --git a/pysqa/wrapper/gent.py b/pysqa/wrapper/gent.py index 54f20d2..adf72d1 100644 --- a/pysqa/wrapper/gent.py +++ b/pysqa/wrapper/gent.py @@ -1,6 +1,9 @@ import pandas from pysqa.wrapper.slurm import SlurmCommands +from pysqa.wrapper.slurm import template as template_slurm + +template = template_slurm class GentCommands(SlurmCommands): diff --git a/pysqa/wrapper/lsf.py b/pysqa/wrapper/lsf.py index 2756d80..92b94c3 100644 --- a/pysqa/wrapper/lsf.py +++ b/pysqa/wrapper/lsf.py @@ -1,6 +1,28 @@ +import os +from typing import List, Optional, Union + import pandas +from jinja2 import Template + +from pysqa.wrapper.abstract import SchedulerCommands -from pysqa.wrapper.generic import SchedulerCommands +template = """\ +#!/bin/bash +#BSUB -q queue +#BSUB -J {{job_name}} +#BSUB -o time.out +#BSUB -n {{cores}} +#BSUB -cwd {{working_directory}} +#BSUB -e error.out +{%- if run_time_max %} +#BSUB -W {{run_time_max}} +{%- endif %} +{%- if memory_max %} +#BSUB -M {{memory_max}} +{%- endif %} + +{{command}} +""" class LsfCommands(SchedulerCommands): @@ -48,3 +70,43 @@ def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: df.loc[df.status == "RUN", "status"] = "running" df.loc[df.status == "PEND", "status"] = "pending" return df + + def render_submission_template( + self, + command: str, + job_name: str = "pysqa", + working_directory: str = os.path.abspath("."), + cores: int = 1, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[int]] = None, + submission_template: Union[str, Template] = template, + **kwargs, + ) -> str: + """ + Generate the job submission template. + + Args: + command (str, optional): The command to be executed. + job_name (str, optional): The job name. Defaults to "pysqa". + working_directory (str, optional): The working directory. Defaults to ".". + cores (int, optional): The number of cores. Defaults to 1. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. + submission_template (str): Submission script template pysqa.wrapper.flux.template + + Returns: + str: The rendered job submission template. + """ + return super().render_submission_template( + command=command, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + dependency_list=dependency_list, + submission_template=submission_template, + **kwargs, + ) diff --git a/pysqa/wrapper/moab.py b/pysqa/wrapper/moab.py index 7164a44..5760a4c 100644 --- a/pysqa/wrapper/moab.py +++ b/pysqa/wrapper/moab.py @@ -1,4 +1,22 @@ -from pysqa.wrapper.generic import SchedulerCommands +import os +from typing import List, Optional, Union + +from jinja2 import Template + +from pysqa.wrapper.abstract import SchedulerCommands + +template = """\ +#!/bin/bash +#MSUB -N {{job_name}} +{%- if memory_max %} +#MSUB -l pmem={{ memory_max| int }}gb +{%- endif %} +{%- if run_time_max %} +#$ -l walltime={{run_time_max}} +{%- endif %} + +{{command}} +""" class MoabCommands(SchedulerCommands): @@ -31,3 +49,43 @@ def get_queue_status_command(self) -> list[str]: list[str]: The command to get the queue status. """ return ["mdiag", "-x"] + + def render_submission_template( + self, + command: str, + job_name: str = "pysqa", + working_directory: str = os.path.abspath("."), + cores: int = 1, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[int]] = None, + submission_template: Union[str, Template] = template, + **kwargs, + ) -> str: + """ + Generate the job submission template. + + Args: + command (str, optional): The command to be executed. + job_name (str, optional): The job name. Defaults to "pysqa". + working_directory (str, optional): The working directory. Defaults to ".". + cores (int, optional): The number of cores. Defaults to 1. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. + submission_template (str): Submission script template pysqa.wrapper.flux.template + + Returns: + str: The rendered job submission template. + """ + return super().render_submission_template( + command=command, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + dependency_list=dependency_list, + submission_template=submission_template, + **kwargs, + ) diff --git a/pysqa/wrapper/sge.py b/pysqa/wrapper/sge.py index 0db5c3b..c95f6d7 100644 --- a/pysqa/wrapper/sge.py +++ b/pysqa/wrapper/sge.py @@ -1,7 +1,30 @@ +import os +from typing import List, Optional, Union + import defusedxml.ElementTree as ETree import pandas +from jinja2 import Template + +from pysqa.wrapper.abstract import SchedulerCommands + +template = """\ +#!/bin/bash +#$ -N {{job_name}} +#$ -wd {{working_directory}} +{%- if cores %} +#$ -pe {{partition}} {{cores}} +{%- endif %} +{%- if memory_max %} +#$ -l h_vmem={{memory_max}} +{%- endif %} +{%- if run_time_max %} +#$ -l h_rt={{run_time_max}} +{%- endif %} +#$ -o time.out +#$ -e error.out -from pysqa.wrapper.generic import SchedulerCommands +{{command}} +""" class SunGridEngineCommands(SchedulerCommands): @@ -58,3 +81,43 @@ def leaf_to_dict(leaf): "working_directory": [""] * len(df_merge), } ) + + def render_submission_template( + self, + command: str, + job_name: str = "pysqa", + working_directory: str = os.path.abspath("."), + cores: int = 1, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[int]] = None, + submission_template: Union[str, Template] = template, + **kwargs, + ) -> str: + """ + Generate the job submission template. + + Args: + command (str, optional): The command to be executed. + job_name (str, optional): The job name. Defaults to "pysqa". + working_directory (str, optional): The working directory. Defaults to ".". + cores (int, optional): The number of cores. Defaults to 1. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. + submission_template (str): Submission script template pysqa.wrapper.flux.template + + Returns: + str: The rendered job submission template. + """ + return super().render_submission_template( + command=command, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + dependency_list=dependency_list, + submission_template=submission_template, + **kwargs, + ) diff --git a/pysqa/wrapper/slurm.py b/pysqa/wrapper/slurm.py index a8378eb..aaa1a2d 100644 --- a/pysqa/wrapper/slurm.py +++ b/pysqa/wrapper/slurm.py @@ -1,6 +1,31 @@ +import os +from typing import List, Optional, Union + import pandas +from jinja2 import Template + +from pysqa.wrapper.abstract import SchedulerCommands -from pysqa.wrapper.generic import SchedulerCommands +template = """\ +#!/bin/bash +#SBATCH --output=time.out +#SBATCH --job-name={{job_name}} +#SBATCH --chdir={{working_directory}} +#SBATCH --get-user-env=L +#SBATCH --partition={{partition}} +{%- if run_time_max %} +#SBATCH --time={{ [1, run_time_max // 60]|max }} +{%- endif %} +{%- if dependency %} +#SBATCH --dependency=afterok:{{ dependency | join(',') }} +{%- endif %} +{%- if memory_max %} +#SBATCH --mem={{memory_max}}G +{%- endif %} +#SBATCH --cpus-per-task={{cores}} + +{{command}} +""" class SlurmCommands(SchedulerCommands): @@ -63,3 +88,43 @@ def dependencies(dependency_list: list[str]) -> list[str]: return ["--dependency=afterok:" + ",".join(dependency_list)] else: return [] + + def render_submission_template( + self, + command: str, + job_name: str = "pysqa", + working_directory: str = os.path.abspath("."), + cores: int = 1, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[int]] = None, + submission_template: Union[str, Template] = template, + **kwargs, + ) -> str: + """ + Generate the job submission template. + + Args: + command (str, optional): The command to be executed. + job_name (str, optional): The job name. Defaults to "pysqa". + working_directory (str, optional): The working directory. Defaults to ".". + cores (int, optional): The number of cores. Defaults to 1. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. + submission_template (str): Submission script template pysqa.wrapper.flux.template + + Returns: + str: The rendered job submission template. + """ + return super().render_submission_template( + command=command, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + dependency_list=dependency_list, + submission_template=submission_template, + **kwargs, + ) diff --git a/pysqa/wrapper/torque.py b/pysqa/wrapper/torque.py index 5f8c959..ed6a41f 100644 --- a/pysqa/wrapper/torque.py +++ b/pysqa/wrapper/torque.py @@ -1,8 +1,29 @@ +import os import re +from typing import List, Optional, Union import pandas - -from pysqa.wrapper.generic import SchedulerCommands +from jinja2 import Template + +from pysqa.wrapper.abstract import SchedulerCommands + +template = """\ +#!/bin/bash +#PBS -l ncpus={{cores}} +#PBS -N {{job_name}} +{%- if memory_max %} +#PBS -l mem={{ memory_max| int }}GB +{%- endif %} +{%- if run_time_max %} +#PBS -l walltime={{run_time_max}} +{%- endif %} +#PBS -l wd +{%- if dependency %} +#PBS -W depend=afterok:{{ dependency | join(':') }} +{%- endif %} + +{{command}} +""" class TorqueCommands(SchedulerCommands): @@ -88,3 +109,43 @@ def convert_queue_status(queue_status_output: str) -> pandas.DataFrame: ) return df + + def render_submission_template( + self, + command: str, + job_name: str = "pysqa", + working_directory: str = os.path.abspath("."), + cores: int = 1, + memory_max: Optional[int] = None, + run_time_max: Optional[int] = None, + dependency_list: Optional[List[int]] = None, + submission_template: Union[str, Template] = template, + **kwargs, + ) -> str: + """ + Generate the job submission template. + + Args: + command (str, optional): The command to be executed. + job_name (str, optional): The job name. Defaults to "pysqa". + working_directory (str, optional): The working directory. Defaults to ".". + cores (int, optional): The number of cores. Defaults to 1. + memory_max (int, optional): The maximum memory. Defaults to None. + run_time_max (int, optional): The maximum run time. Defaults to None. + dependency_list (list[int], optional): The list of dependency job IDs. Defaults to None. + submission_template (str): Submission script template pysqa.wrapper.flux.template + + Returns: + str: The rendered job submission template. + """ + return super().render_submission_template( + command=command, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + dependency_list=dependency_list, + submission_template=submission_template, + **kwargs, + ) diff --git a/tests/test_scheduler_commands.py b/tests/test_scheduler_commands.py index 3326fce..a2485df 100644 --- a/tests/test_scheduler_commands.py +++ b/tests/test_scheduler_commands.py @@ -1,5 +1,5 @@ import unittest -from pysqa.wrapper.generic import SchedulerCommands +from pysqa.wrapper.abstract import SchedulerCommands class TmpSchedularCommands(SchedulerCommands):