From 58d3d105704ef7b0bd08803fc6a8649c46859206 Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 14:51:14 -0400 Subject: [PATCH 01/12] refactor: import shared functions from ccbr_tools --- pyproject.toml | 1 + src/renee/__main__.py | 38 +++-- src/renee/cache.py | 63 -------- src/renee/gui.py | 19 ++- src/renee/initialize.py | 29 +--- src/renee/orchestrate.py | 165 +++++++++++++++++++++ src/renee/run.py | 14 +- src/renee/setup.py | 7 +- src/renee/util.py | 301 +-------------------------------------- 9 files changed, 212 insertions(+), 425 deletions(-) delete mode 100644 src/renee/cache.py create mode 100644 src/renee/orchestrate.py diff --git a/pyproject.toml b/pyproject.toml index 953aaec..114b02a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ classifiers = [ requires-python = ">=3.11" dependencies = [ "argparse", + "ccbr_tools@git+https://github.com/CCBR/Tools", "Click >= 8.1.3", "PySimpleGui < 5", "snakemake >= 7, < 8", diff --git a/src/renee/__main__.py b/src/renee/__main__.py index 2a46b9c..ee7f974 100755 --- a/src/renee/__main__.py +++ b/src/renee/__main__.py @@ -22,22 +22,22 @@ # 3rd party imports from pypi import argparse - -# local imports -from .cache import get_sif_cache_dir -from .run import run -from .dryrun import dryrun -from .gui import launch_gui -from .conditions import fatal -from .util import ( +from ccbr_tools.pipeline.util import ( get_hpcname, get_tmp_dir, get_genomes_list, - get_version, check_python_version, _cp_r_safe_, - orchestrate, ) +from ccbr_tools.pipeline.cache import get_sif_cache_dir + +# local imports +from .run import run +from .dryrun import dryrun +from .gui import launch_gui +from .conditions import fatal +from .util import renee_base, get_version +from .orchestrate import orchestrate # Pipeline Metadata and globals RENEE_PATH = os.path.dirname( @@ -770,7 +770,12 @@ def parsed_arguments(name, description): {2}{3}Prebuilt genome+annotation combos:{4} {5} """.format( - "renee", __version__, c.bold, c.url, c.end, list(get_genomes_list()) + "renee", + __version__, + c.bold, + c.url, + c.end, + list(get_genomes_list(repo_base=renee_base)), ) ) @@ -817,7 +822,9 @@ def parsed_arguments(name, description): "--genome", required=True, type=lambda option: str( - genome_options(subparser_run, option, get_genomes_list()) + genome_options( + subparser_run, option, get_genomes_list(repo_base=renee_base) + ) ), help=argparse.SUPPRESS, ) @@ -1126,7 +1133,12 @@ def parsed_arguments(name, description): {2}{3}Prebuilt genome+annotation combos:{4} {5} """.format( - "renee", __version__, c.bold, c.url, c.end, list(get_genomes_list()) + "renee", + __version__, + c.bold, + c.url, + c.end, + list(get_genomes_list(repo_base=renee_base)), ) ) diff --git a/src/renee/cache.py b/src/renee/cache.py deleted file mode 100644 index a908634..0000000 --- a/src/renee/cache.py +++ /dev/null @@ -1,63 +0,0 @@ -import json -import os -import sys - - -def get_singularity_cachedir(output_dir, cache_dir=None): - """Returns the singularity cache directory. - If no user-provided cache directory is provided, - the default singularity cache is in the output directory. - """ - if not cache_dir: - cache_dir = os.path.join(output_dir, ".singularity") - return cache_dir - - -def get_sif_cache_dir(hpc=None): - sif_dir = None - if hpc == "biowulf": - sif_dir = "/data/CCBR_Pipeliner/SIFS" - elif hpc == "frce": - sif_dir = "/mnt/projects/CCBR-Pipelines/SIFs" - return sif_dir - - -def image_cache(sub_args, config): - """Adds Docker Image URIs, or SIF paths to config if singularity cache option is provided. - If singularity cache option is provided and a local SIF does not exist, a warning is - displayed and the image will be pulled from URI in 'config/containers/images.json'. - @param sub_args : - Parsed arguments for run sub-command - @params config : - Docker Image config file - @return config : - Updated config dictionary containing user information (username and home directory) - """ - images = os.path.join(sub_args.output, "config", "containers", "images.json") - - # Read in config for docker image uris - with open(images, "r") as fh: - data = json.load(fh) - # Check if local sif exists - for image, uri in data["images"].items(): - if sub_args.sif_cache: - sif = os.path.join( - sub_args.sif_cache, - "{}.sif".format(os.path.basename(uri).replace(":", "_")), - ) - if not os.path.exists(sif): - # If local sif does not exist on in cache, print warning - # and default to pulling from URI in config/containers/images.json - print( - 'Warning: Local image "{}" does not exist in singularity cache'.format( - sif - ), - file=sys.stderr, - ) - else: - # Change pointer to image from Registry URI to local SIF - data["images"][image] = sif - - config.update(data) - - return config diff --git a/src/renee/gui.py b/src/renee/gui.py index 12aad81..f139f2e 100755 --- a/src/renee/gui.py +++ b/src/renee/gui.py @@ -7,17 +7,16 @@ import sys from tkinter import Tk -from .util import ( +from ccbr_tools.pipeline.util import ( get_genomes_dict, get_tmp_dir, - get_shared_resources_dir, - renee_base, - get_version, - get_singularity_cachedir, get_hpcname, ) -from .cache import get_sif_cache_dir -from .run import run_in_context +from ccbr_tools.pipeline.cache import get_sif_cache_dir, get_singularity_cachedir +from ccbr_tools.shell import exec_in_context + +from .util import get_version, renee_base, get_shared_resources_dir +from .run import run # TODO: get rid of all the global variables # TODO: let's use a tmp dir and put these files there instead. see for inspiration:https://github.com/CCBR/RENEE/blob/16d13dca1d5f0f43c7dfda379efb882a67635d17/tests/test_cache.py#L14-L28 @@ -27,7 +26,7 @@ def launch_gui(sub_args, debug=True): # get drop down genome+annotation options - jsons = get_genomes_dict(error_on_warnings=True) + jsons = get_genomes_dict(repo_base=renee_base, error_on_warnings=True) genome_annotation_combinations = list(jsons.keys()) genome_annotation_combinations.sort() if debug: @@ -191,7 +190,7 @@ def launch_gui(sub_args, debug=True): threads=2, ) # execute dry run and capture stdout/stderr - allout = run_in_context(run_args) + allout = exec_in_context(run, run_args) sg.popup_scrolled( allout, title="Dryrun:STDOUT/STDERR", @@ -211,7 +210,7 @@ def launch_gui(sub_args, debug=True): if ch == "Yes": run_args.dry_run = False # execute live run - allout = run_in_context(run_args) + allout = exec_in_context(run, run_args) sg.popup_scrolled( allout, title="Dryrun:STDOUT/STDERR", diff --git a/src/renee/initialize.py b/src/renee/initialize.py index 75d2207..b00daa0 100644 --- a/src/renee/initialize.py +++ b/src/renee/initialize.py @@ -2,9 +2,7 @@ import re import sys -from .util import ( - _cp_r_safe_, -) +from ccbr_tools.pipeline.util import _cp_r_safe_, _sym_safe_ def initialize(sub_args, repo_path, output_path): @@ -53,31 +51,6 @@ def initialize(sub_args, repo_path, output_path): return inputs -def _sym_safe_(input_data, target): - """Creates re-named symlinks for each FastQ file provided - as input. If a symlink already exists, it will not try to create a new symlink. - If relative source PATH is provided, it will be converted to an absolute PATH. - @param input_data ]>: - List of input files to symlink to target location - @param target : - Target path to copy templates and required resources - @return input_fastqs list[]: - List of renamed input FastQs - """ - input_fastqs = [] # store renamed fastq file names - for file in input_data: - filename = os.path.basename(file) - renamed = os.path.join(target, rename(filename)) - input_fastqs.append(renamed) - - if not os.path.exists(renamed): - # Create a symlink if it does not already exist - # Follow source symlinks to resolve any binding issues - os.symlink(os.path.abspath(os.path.realpath(file)), renamed) - - return input_fastqs - - def rename(filename): """Dynamically renames FastQ file to have one of the following extensions: *.R1.fastq.gz, *.R2.fastq.gz To automatically rename the fastq files, a few assumptions are made. If the extension of the diff --git a/src/renee/orchestrate.py b/src/renee/orchestrate.py new file mode 100644 index 0000000..4610f60 --- /dev/null +++ b/src/renee/orchestrate.py @@ -0,0 +1,165 @@ +import os +from ccbr_tools.util import ( + get_hpcname, + get_tmp_dir, + get_singularity_cachedir, +) + + +def orchestrate( + mode, + outdir, + additional_bind_paths, + alt_cache, + threads=2, + submission_script="runner", + masterjob="pl:renee", + tmp_dir=None, + wait="", + hpcname="", +): + """Runs RENEE pipeline via selected executor: local or slurm. + If 'local' is selected, the pipeline is executed locally on a compute node/instance. + If 'slurm' is selected, jobs will be submitted to the cluster using SLURM job scheduler. + Support for additional job schedulers (i.e. PBS, SGE, LSF) may be added in the future. + @param outdir : + Pipeline output PATH + @param mode : + Execution method or mode: + local runs serially a compute instance without submitting to the cluster. + slurm will submit jobs to the cluster using the SLURM job scheduler. + @param additional_bind_paths : + Additional paths to bind to container filesystem (i.e. input file paths) + @param alt_cache : + Alternative singularity cache location + @param threads : + Number of threads to use for local execution method + @param submission_script : + Path to master jobs submission script: + renee run = /path/to/output/resources/runner + renee build = /path/to/output/resources/builder + @param masterjob : + Name of the master job + @param tmp_dir : + Absolute Path to temp dir for compute node + @param wait : + "--wait" to wait for master job to finish. This waits when pipeline is called via NIDAP API + @param hpcname : + "biowulf" if run on biowulf, "frce" if run on frce, blank otherwise. hpcname is determined in setup() function + @return masterjob : + """ + # Add additional singularity bind PATHs + # to mount the local filesystem to the + # containers filesystem, NOTE: these + # PATHs must be an absolute PATHs + outdir = os.path.abspath(outdir) + # Add any default PATHs to bind to + # the container's filesystem, like + # tmp directories, /lscratch + addpaths = [] + # set tmp_dir depending on hpc + tmp_dir = get_tmp_dir(tmp_dir, outdir) + temp = os.path.dirname(tmp_dir.rstrip("/")) + if temp == os.sep: + temp = tmp_dir.rstrip("/") + if outdir not in additional_bind_paths.split(","): + addpaths.append(outdir) + if temp not in additional_bind_paths.split(","): + addpaths.append(temp) + bindpaths = ",".join(addpaths) + + # Set ENV variable 'SINGULARITY_CACHEDIR' + # to output directory + my_env = {} + my_env.update(os.environ) + + cache = get_singularity_cachedir(output_dir=outdir, cache_dir=alt_cache) + my_env["SINGULARITY_CACHEDIR"] = cache + + if additional_bind_paths: + # Add Bind PATHs for outdir and tmp dir + if bindpaths: + bindpaths = ",{}".format(bindpaths) + bindpaths = "{}{}".format(additional_bind_paths, bindpaths) + + if not os.path.exists(os.path.join(outdir, "logfiles")): + # Create directory for logfiles + os.makedirs(os.path.join(outdir, "logfiles")) + + if os.path.exists(os.path.join(outdir, "logfiles", "snakemake.log")): + mtime = _get_file_mtime(os.path.join(outdir, "logfiles", "snakemake.log")) + newname = os.path.join(outdir, "logfiles", "snakemake." + str(mtime) + ".log") + os.rename(os.path.join(outdir, "logfiles", "snakemake.log"), newname) + + # Create .singularity directory for installations of snakemake + # without setuid which create a sandbox in the SINGULARITY_CACHEDIR + if not os.path.exists(cache): + # Create directory for sandbox and image layers + os.makedirs(cache) + + # Run on compute node or instance without submitting jobs to a scheduler + if mode == "local": + # Run RENEE: instantiate main/master process + # Look into later: it maybe worth replacing Popen subprocess with a direct + # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html + # Create log file for pipeline + logfh = open(os.path.join(outdir, "logfiles", "snakemake.log"), "w") + masterjob = subprocess.Popen( + [ + "snakemake", + "-pr", + "--use-singularity", + "--singularity-args", + "'-B {}'".format(bindpaths), + "--cores", + str(threads), + "--configfile=config.json", + ], + cwd=outdir, + env=my_env, + ) + + # Submitting jobs to cluster via SLURM's job scheduler + elif mode == "slurm": + # Run RENEE: instantiate main/master process + # Look into later: it maybe worth replacing Popen subprocess with a direct + # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html + # snakemake --latency-wait 120 -s $R/Snakefile -d $R --printshellcmds + # --cluster-config $R/cluster.json --keep-going --restart-times 3 + # --cluster "sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname}" + # -j 500 --rerun-incomplete --stats $R/Reports/initialqc.stats -T + # 2>&1| tee -a $R/Reports/snakemake.log + + # Create log file for master job information + logfh = open(os.path.join(outdir, "logfiles", "master.log"), "w") + # submission_script for renee run is /path/to/output/resources/runner + # submission_script for renee build is /path/to/output/resources/builder + cmdlist = [ + str(os.path.join(outdir, "resources", str(submission_script))), + mode, + "-j", + str(masterjob), + "-b", + str(bindpaths), + "-o", + str(outdir), + "-c", + str(cache), + "-t", + str(tmp_dir), + ] + if str(wait) == "--wait": + cmdlist.append("-w") + if str(hpcname) != "": + cmdlist.append("-n") + cmdlist.append(hpcname) + else: + cmdlist.append("-n") + cmdlist.append("unknown") + + print(" ".join(cmdlist)) + masterjob = subprocess.Popen( + cmdlist, cwd=outdir, stderr=subprocess.STDOUT, stdout=logfh, env=my_env + ) + + return masterjob diff --git a/src/renee/run.py b/src/renee/run.py index bc7cd0e..90adcf3 100644 --- a/src/renee/run.py +++ b/src/renee/run.py @@ -4,12 +4,14 @@ import os import pathlib import sys +from ccbr_tools.pipeline.util import get_hpcname, get_tmp_dir -from .util import renee_base, get_hpcname, get_tmp_dir, orchestrate +from .util import renee_base from .conditions import fatal from .initialize import initialize from .setup import setup from .dryrun import dryrun +from .orchestrate import orchestrate def run(sub_args): @@ -204,13 +206,3 @@ def get_fastq_screen_paths(fastq_screen_confs, match="DATABASE", file_index=-1): db_path = line.strip().split()[file_index] databases.append(db_path) return databases - - -def run_in_context(args): - """Execute the run function in a context manager to capture stdout/stderr""" - with contextlib.redirect_stdout(io.StringIO()) as out_f, contextlib.redirect_stderr( - io.StringIO() - ) as err_f: - run(args) - allout = out_f.getvalue() + "\n" + err_f.getvalue() - return allout diff --git a/src/renee/setup.py b/src/renee/setup.py index 000e3c8..c69f20d 100644 --- a/src/renee/setup.py +++ b/src/renee/setup.py @@ -4,12 +4,13 @@ import subprocess import sys -from .util import ( +from ccbr_tools.pipeline.util import ( get_hpcname, - get_version, get_tmp_dir, ) -from .cache import image_cache +from ccbr_tools.pipeline.cache import image_cache + +from .util import get_version def setup(sub_args, ifiles, repo_path, output_path): diff --git a/src/renee/util.py b/src/renee/util.py index 1aa2e66..1401cad 100644 --- a/src/renee/util.py +++ b/src/renee/util.py @@ -1,21 +1,13 @@ -import datetime -import glob -import os -import subprocess -import shutil -import sys -import warnings -from .cache import get_singularity_cachedir +import pathlib +from ccbr_tools.pipeline.util import get_hpcname def renee_base(*paths): """Get the absolute path to a file in the repository @return abs_path """ - basedir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.realpath(__file__))) - ) - return os.path.join(basedir, *paths) + basedir = pathlib.Path(__file__).absolute().parent.parent.parent + return basedir.joinpath(*paths) def get_version(): @@ -27,45 +19,6 @@ def get_version(): return version -def scontrol_show(): - """Run scontrol show config and parse the output as a dictionary - @return scontrol_dict : - """ - scontrol_dict = dict() - scontrol_out = subprocess.run( - "scontrol show config", shell=True, capture_output=True, text=True - ).stdout - if len(scontrol_out) > 0: - for line in scontrol_out.split("\n"): - line_split = line.split("=") - if len(line_split) > 1: - scontrol_dict[line_split[0].strip()] = line_split[1].strip() - return scontrol_dict - - -def get_hpcname(): - """Get the HPC name (biowulf, frce, or an empty string) - @return hpcname - """ - scontrol_out = scontrol_show() - hpc = scontrol_out["ClusterName"] if "ClusterName" in scontrol_out.keys() else "" - if hpc == "fnlcr": - hpc = "frce" - return hpc - - -def get_tmp_dir(tmp_dir, outdir, hpc=get_hpcname()): - """Get default temporary directory for biowulf and frce. Allow user override.""" - if not tmp_dir: - if hpc == "biowulf": - tmp_dir = "/lscratch/$SLURM_JOBID" - elif hpc == "frce": - tmp_dir = outdir - else: - tmp_dir = None - return tmp_dir - - def get_shared_resources_dir(shared_dir, hpc=get_hpcname()): """Get default shared resources directory for biowulf and frce. Allow user override.""" if not shared_dir: @@ -76,249 +29,3 @@ def get_shared_resources_dir(shared_dir, hpc=get_hpcname()): elif hpc == "frce": shared_dir = "/mnt/projects/CCBR-Pipelines/pipelines/RENEE/resources/shared_resources" return shared_dir - - -def get_genomes_list(hpcname=get_hpcname(), error_on_warnings=False): - """Get list of genome annotations available for the current platform - @return genomes_list - """ - return sorted( - list( - get_genomes_dict( - hpcname=hpcname, error_on_warnings=error_on_warnings - ).keys() - ) - ) - - -def get_genomes_dict(hpcname=get_hpcname(), error_on_warnings=False): - """Get dictionary of genome annotation versions and the paths to the corresponding JSON files - @return genomes_dict { genome_name: json_file_path } - """ - if error_on_warnings: - warnings.filterwarnings("error") - genomes_dir = renee_base(os.path.join("config", "genomes", hpcname)) - if not os.path.exists(genomes_dir): - warnings.warn(f"Folder does not exist: {genomes_dir}") - search_term = genomes_dir + "/*.json" - json_files = glob.glob(search_term) - if len(json_files) == 0: - warnings.warn( - f"No Genome+Annotation JSONs found in {genomes_dir}. Please specify a custom genome json file with `--genome`" - ) - genomes_dict = { - os.path.basename(json_file).replace(".json", ""): json_file - for json_file in json_files - } - warnings.resetwarnings() - return genomes_dict - - -def check_python_version(): - # version check - # glob.iglob requires 3.11 for using "include_hidden=True" - MIN_PYTHON = (3, 11) - try: - assert sys.version_info >= MIN_PYTHON - print( - "Python version: {0}.{1}.{2}".format( - sys.version_info.major, sys.version_info.minor, sys.version_info.micro - ) - ) - except AssertionError: - exit( - f"{sys.argv[0]} requires Python {'.'.join([str(n) for n in MIN_PYTHON])} or newer" - ) - - -def _cp_r_safe_( - source, target, resources=["workflow", "resources", "config"], safe_mode=True -): - """Private function: Given a list paths it will recursively copy each to the - target location. If a target path already exists, it will not over-write the - existing paths data when `safe_mode` is on. - @param resources : - List of paths to copy over to target location. - Default: ["workflow", "resources", "config"] - @params source : - Add a prefix PATH to each resource - @param target : - Target path to copy templates and required resources (aka destination) - @param safe_mode : - Only copy the resources to the target path - if they do not exist in the target path (default: True) - """ - for resource in resources: - destination = os.path.join(target, resource) - if os.path.exists(destination) and safe_mode: - print(f"🚫 path exists and `safe_mode` is ON, not copying: {destination}") - else: - # Required resources do not exist, or safe mode is off - shutil.copytree( - os.path.join(source, resource), destination, dirs_exist_ok=not safe_mode - ) - - -def orchestrate( - mode, - outdir, - additional_bind_paths, - alt_cache, - threads=2, - submission_script="runner", - masterjob="pl:renee", - tmp_dir=None, - wait="", - hpcname="", -): - """Runs RENEE pipeline via selected executor: local or slurm. - If 'local' is selected, the pipeline is executed locally on a compute node/instance. - If 'slurm' is selected, jobs will be submitted to the cluster using SLURM job scheduler. - Support for additional job schedulers (i.e. PBS, SGE, LSF) may be added in the future. - @param outdir : - Pipeline output PATH - @param mode : - Execution method or mode: - local runs serially a compute instance without submitting to the cluster. - slurm will submit jobs to the cluster using the SLURM job scheduler. - @param additional_bind_paths : - Additional paths to bind to container filesystem (i.e. input file paths) - @param alt_cache : - Alternative singularity cache location - @param threads : - Number of threads to use for local execution method - @param submission_script : - Path to master jobs submission script: - renee run = /path/to/output/resources/runner - renee build = /path/to/output/resources/builder - @param masterjob : - Name of the master job - @param tmp_dir : - Absolute Path to temp dir for compute node - @param wait : - "--wait" to wait for master job to finish. This waits when pipeline is called via NIDAP API - @param hpcname : - "biowulf" if run on biowulf, "frce" if run on frce, blank otherwise. hpcname is determined in setup() function - @return masterjob : - """ - # Add additional singularity bind PATHs - # to mount the local filesystem to the - # containers filesystem, NOTE: these - # PATHs must be an absolute PATHs - outdir = os.path.abspath(outdir) - # Add any default PATHs to bind to - # the container's filesystem, like - # tmp directories, /lscratch - addpaths = [] - # set tmp_dir depending on hpc - tmp_dir = get_tmp_dir(tmp_dir, outdir) - temp = os.path.dirname(tmp_dir.rstrip("/")) - if temp == os.sep: - temp = tmp_dir.rstrip("/") - if outdir not in additional_bind_paths.split(","): - addpaths.append(outdir) - if temp not in additional_bind_paths.split(","): - addpaths.append(temp) - bindpaths = ",".join(addpaths) - - # Set ENV variable 'SINGULARITY_CACHEDIR' - # to output directory - my_env = {} - my_env.update(os.environ) - - cache = get_singularity_cachedir(output_dir=outdir, cache_dir=alt_cache) - my_env["SINGULARITY_CACHEDIR"] = cache - - if additional_bind_paths: - # Add Bind PATHs for outdir and tmp dir - if bindpaths: - bindpaths = ",{}".format(bindpaths) - bindpaths = "{}{}".format(additional_bind_paths, bindpaths) - - if not os.path.exists(os.path.join(outdir, "logfiles")): - # Create directory for logfiles - os.makedirs(os.path.join(outdir, "logfiles")) - - if os.path.exists(os.path.join(outdir, "logfiles", "snakemake.log")): - mtime = _get_file_mtime(os.path.join(outdir, "logfiles", "snakemake.log")) - newname = os.path.join(outdir, "logfiles", "snakemake." + str(mtime) + ".log") - os.rename(os.path.join(outdir, "logfiles", "snakemake.log"), newname) - - # Create .singularity directory for installations of snakemake - # without setuid which create a sandbox in the SINGULARITY_CACHEDIR - if not os.path.exists(cache): - # Create directory for sandbox and image layers - os.makedirs(cache) - - # Run on compute node or instance without submitting jobs to a scheduler - if mode == "local": - # Run RENEE: instantiate main/master process - # Look into later: it maybe worth replacing Popen subprocess with a direct - # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html - # Create log file for pipeline - logfh = open(os.path.join(outdir, "logfiles", "snakemake.log"), "w") - masterjob = subprocess.Popen( - [ - "snakemake", - "-pr", - "--use-singularity", - "--singularity-args", - "'-B {}'".format(bindpaths), - "--cores", - str(threads), - "--configfile=config.json", - ], - cwd=outdir, - env=my_env, - ) - - # Submitting jobs to cluster via SLURM's job scheduler - elif mode == "slurm": - # Run RENEE: instantiate main/master process - # Look into later: it maybe worth replacing Popen subprocess with a direct - # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html - # snakemake --latency-wait 120 -s $R/Snakefile -d $R --printshellcmds - # --cluster-config $R/cluster.json --keep-going --restart-times 3 - # --cluster "sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname}" - # -j 500 --rerun-incomplete --stats $R/Reports/initialqc.stats -T - # 2>&1| tee -a $R/Reports/snakemake.log - - # Create log file for master job information - logfh = open(os.path.join(outdir, "logfiles", "master.log"), "w") - # submission_script for renee run is /path/to/output/resources/runner - # submission_script for renee build is /path/to/output/resources/builder - cmdlist = [ - str(os.path.join(outdir, "resources", str(submission_script))), - mode, - "-j", - str(masterjob), - "-b", - str(bindpaths), - "-o", - str(outdir), - "-c", - str(cache), - "-t", - str(tmp_dir), - ] - if str(wait) == "--wait": - cmdlist.append("-w") - if str(hpcname) != "": - cmdlist.append("-n") - cmdlist.append(hpcname) - else: - cmdlist.append("-n") - cmdlist.append("unknown") - - print(" ".join(cmdlist)) - masterjob = subprocess.Popen( - cmdlist, cwd=outdir, stderr=subprocess.STDOUT, stdout=logfh, env=my_env - ) - - return masterjob - - -def _get_file_mtime(f): - timestamp = datetime.fromtimestamp(os.path.getmtime(os.path.abspath(f))) - mtime = timestamp.strftime("%y%m%d%H%M%S") - return mtime From 295e3347982d7ff2db9a20e0d8897037a345b345 Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 14:56:14 -0400 Subject: [PATCH 02/12] test: update tests with new functions from ccbr_tools --- tests/test_cache.py | 2 +- tests/test_run.py | 19 ++++++++++--------- tests/test_util.py | 13 ++++++++----- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/tests/test_cache.py b/tests/test_cache.py index 846929f..ebdafdc 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -3,7 +3,7 @@ import os.path import subprocess -from renee.src.renee.cache import get_sif_cache_dir, get_singularity_cachedir +from ccbr_tools.pipeline.cache import get_sif_cache_dir, get_singularity_cachedir renee_run = ( "./bin/renee run " diff --git a/tests/test_run.py b/tests/test_run.py index 229d40a..de28a0c 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -3,25 +3,26 @@ import os import tempfile -from renee.src.renee.util import ( +from ccbr_tools.pipeline.util import ( get_tmp_dir, get_shared_resources_dir, renee_base, + get_hpcname, ) -from renee.src.renee.cache import get_sif_cache_dir -from renee.src.renee.run import run, run_in_context -from renee.src.renee.util import get_hpcname +from ccbr_tools.pipeline.cache import get_sif_cache_dir +from ccbr_tools.shell import exec_in_context + +from renee.src.renee.util import renee_base +from renee.src.renee.run import run def test_dryrun(): if get_hpcname() == "biowulf": with tempfile.TemporaryDirectory() as tmp_dir: run_args = argparse.Namespace( - input=list(glob.glob(os.path.join(renee_base(".tests"), "*.fastq.gz"))), + input=list(glob.glob(renee_base(".tests", "*.fastq.gz"))), output=tmp_dir, - genome=os.path.join( - renee_base("config"), "genomes", "biowulf", "hg38_36.json" - ), + genome=renee_base("config", "genomes", "biowulf", "hg38_36.json"), mode="slurm", runmode="run", dry_run=True, @@ -36,7 +37,7 @@ def test_dryrun(): threads=2, ) # execute dry run and capture stdout/stderr - allout = run_in_context(run_args) + allout = exec_in_context(run, run_args) assert ( "This was a dry-run (flag -n). The order of jobs does not reflect the order of execution." in allout diff --git a/tests/test_util.py b/tests/test_util.py index d344e21..4860f50 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -6,13 +6,14 @@ import tempfile import warnings -from renee.src.renee.util import ( - renee_base, +from ccbr_tools.pipeline.util import ( _cp_r_safe_, get_genomes_dict, get_genomes_list, ) +from renee.src.renee.util import renee_base + def test_renee_base(): renee_bin = renee_base(os.path.join("bin", "renee")) @@ -52,7 +53,7 @@ def test_cp_unsafe(): def test_get_genomes_warnings(): with warnings.catch_warnings(record=True) as raised_warnings: - genomes = get_genomes_list(hpcname="notAnOption") + genomes = get_genomes_list(repo_base=renee_base, hpcname="notAnOption") assertions = [ "len(genomes) == 0", "len(raised_warnings) == 2", @@ -68,10 +69,12 @@ def test_get_genomes_warnings(): def test_get_genomes_error(): with pytest.raises(UserWarning) as exception_info: - get_genomes_list(hpcname="notAnOption", error_on_warnings=True) + get_genomes_list( + repo_base=renee_base, hpcname="notAnOption", error_on_warnings=True + ) assert "Folder does not exist" in str(exception_info.value) def test_get_genomes_biowulf(): - genomes_dict = get_genomes_dict(hpcname="biowulf") + genomes_dict = get_genomes_dict(repo_base=renee_base, hpcname="biowulf") assert len(genomes_dict) > 10 From ce191fbe7dc19741e8c88a038f311d72d3e6c2bf Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 15:04:25 -0400 Subject: [PATCH 03/12] fix: remove __future__ imports --- src/renee/__main__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/renee/__main__.py b/src/renee/__main__.py index ee7f974..e5818ee 100755 --- a/src/renee/__main__.py +++ b/src/renee/__main__.py @@ -12,7 +12,6 @@ """ # Python standard library -from __future__ import print_function from shutil import copy import json import os From 4706f3a12d1835178ba1c72b0fbe705b0e3e955e Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 15:06:21 -0400 Subject: [PATCH 04/12] fix: import statements --- src/renee/orchestrate.py | 2 +- tests/test_run.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/renee/orchestrate.py b/src/renee/orchestrate.py index 4610f60..c4975e8 100644 --- a/src/renee/orchestrate.py +++ b/src/renee/orchestrate.py @@ -1,5 +1,5 @@ import os -from ccbr_tools.util import ( +from ccbr_tools.pipeline.util import ( get_hpcname, get_tmp_dir, get_singularity_cachedir, diff --git a/tests/test_run.py b/tests/test_run.py index de28a0c..2e599bc 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -5,14 +5,13 @@ from ccbr_tools.pipeline.util import ( get_tmp_dir, - get_shared_resources_dir, renee_base, get_hpcname, ) from ccbr_tools.pipeline.cache import get_sif_cache_dir from ccbr_tools.shell import exec_in_context -from renee.src.renee.util import renee_base +from renee.src.renee.util import renee_base, get_shared_resources_dir from renee.src.renee.run import run From 1b4c0421697bf44a8206cabe19565b1db54ebe17 Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 15:09:34 -0400 Subject: [PATCH 05/12] fix: more import statements --- src/renee/orchestrate.py | 2 +- tests/test_run.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/renee/orchestrate.py b/src/renee/orchestrate.py index c4975e8..47079f9 100644 --- a/src/renee/orchestrate.py +++ b/src/renee/orchestrate.py @@ -2,8 +2,8 @@ from ccbr_tools.pipeline.util import ( get_hpcname, get_tmp_dir, - get_singularity_cachedir, ) +from ccbr_tools.pipeline.cache import get_singularity_cachedir def orchestrate( diff --git a/tests/test_run.py b/tests/test_run.py index 2e599bc..793a6db 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -5,7 +5,6 @@ from ccbr_tools.pipeline.util import ( get_tmp_dir, - renee_base, get_hpcname, ) from ccbr_tools.pipeline.cache import get_sif_cache_dir From 9b55bc947d2b6bf3f4d5fc3507178b398b984859 Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 15:50:22 -0400 Subject: [PATCH 06/12] refactor: move rename() to ccbr_tools --- src/renee/initialize.py | 66 ----------------------------------------- 1 file changed, 66 deletions(-) diff --git a/src/renee/initialize.py b/src/renee/initialize.py index b00daa0..4fcf6db 100644 --- a/src/renee/initialize.py +++ b/src/renee/initialize.py @@ -49,69 +49,3 @@ def initialize(sub_args, repo_path, output_path): inputs = _sym_safe_(input_data=sub_args.input, target=output_path) return inputs - - -def rename(filename): - """Dynamically renames FastQ file to have one of the following extensions: *.R1.fastq.gz, *.R2.fastq.gz - To automatically rename the fastq files, a few assumptions are made. If the extension of the - FastQ file cannot be inferred, an exception is raised telling the user to fix the filename - of the fastq files. - @param filename : - Original name of file to be renamed - @return filename : - A renamed FastQ filename - """ - # Covers common extensions from SF, SRA, EBI, TCGA, and external sequencing providers - # key = regex to match string and value = how it will be renamed - extensions = { - # Matches: _R[12]_fastq.gz, _R[12].fastq.gz, _R[12]_fq.gz, etc. - ".R1.f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.f(ast)?q.gz$": ".R2.fastq.gz", - # Matches: _R[12]_001_fastq_gz, _R[12].001.fastq.gz, _R[12]_001.fq.gz, etc. - # Capture lane information as named group - ".R1.(?P...).f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.(?P...).f(ast)?q.gz$": ".R2.fastq.gz", - # Matches: _[12].fastq.gz, _[12].fq.gz, _[12]_fastq_gz, etc. - "_1.f(ast)?q.gz$": ".R1.fastq.gz", - "_2.f(ast)?q.gz$": ".R2.fastq.gz", - } - - if filename.endswith(".R1.fastq.gz") or filename.endswith(".R2.fastq.gz"): - # Filename is already in the correct format - return filename - - converted = False - for regex, new_ext in extensions.items(): - matched = re.search(regex, filename) - if matched: - # regex matches with a pattern in extensions - converted = True - # Try to get substring for named group lane, retain this in new file extension - # Come back to this later, I am not sure if this is necessary - # That string maybe static (i.e. always the same) - # https://support.illumina.com/help/BaseSpace_OLH_009008/Content/Source/Informatics/BS/NamingConvention_FASTQ-files-swBS.htm# - try: - new_ext = "_{}{}".format(matched.group("lane"), new_ext) - except IndexError: - pass # Does not contain the named group lane - - filename = re.sub(regex, new_ext, filename) - break # only rename once - - if not converted: - raise NameError( - """\n\tFatal: Failed to rename provided input '{}'! - Cannot determine the extension of the user provided input file. - Please rename the file list above before trying again. - Here is example of acceptable input file extensions: - sampleName.R1.fastq.gz sampleName.R2.fastq.gz - sampleName_R1_001.fastq.gz sampleName_R2_001.fastq.gz - sampleName_1.fastq.gz sampleName_2.fastq.gz - Please also check that your input files are gzipped? - If they are not, please gzip them before proceeding again. - """.format( - filename - ) - ) - - return filename From caf47a051b41e25d21d9c95d0066acde71301899 Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 15:51:31 -0400 Subject: [PATCH 07/12] test: use shell_run() --- tests/test_cache.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/test_cache.py b/tests/test_cache.py index ebdafdc..0c1c431 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -4,6 +4,7 @@ import subprocess from ccbr_tools.pipeline.cache import get_sif_cache_dir, get_singularity_cachedir +from ccbr_tools.shell import shell_run renee_run = ( "./bin/renee run " @@ -16,12 +17,7 @@ def run_in_temp(command_str): with tempfile.TemporaryDirectory() as tmp_dir: outdir = os.path.join(tmp_dir, "testout") - output = subprocess.run( - f"{command_str} --output {outdir}", - capture_output=True, - shell=True, - text=True, - ) + output = shell_run(f"{command_str} --output {outdir}") if os.path.exists(os.path.join(outdir, "config.json")): with open(os.path.join(outdir, "config.json"), "r") as infile: config = json.load(infile) @@ -36,7 +32,7 @@ def test_cache_sif(): config["images"]["arriba"].endswith( "tests/data/sifs/ccbr_arriba_2.0.0_v0.0.1.sif" ), - "does not exist in singularity cache" in output.stderr, + "does not exist in singularity cache" in output, ] assert all(assertions) From 879c6db221e06e650ac1bd6675d902788eef8d82 Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 15:51:46 -0400 Subject: [PATCH 08/12] test: fix usage of posixpath --- tests/test_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_util.py b/tests/test_util.py index 4860f50..3b45454 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -17,7 +17,7 @@ def test_renee_base(): renee_bin = renee_base(os.path.join("bin", "renee")) - assert renee_bin.endswith("/bin/renee") and os.path.exists(renee_bin) + assert str(renee_bin).endswith("/bin/renee") and renee_bin.exists() def test_cp_safe(): From 6583dcecc315b222b4e8607dbe34e59d22548f9c Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 17:14:38 -0400 Subject: [PATCH 09/12] test: update tests for new behavior --- src/renee/util.py | 2 +- tests/test_cache.py | 2 +- tests/test_run.py | 2 +- tests/test_util.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/renee/util.py b/src/renee/util.py index 1401cad..4a6e273 100644 --- a/src/renee/util.py +++ b/src/renee/util.py @@ -7,7 +7,7 @@ def renee_base(*paths): @return abs_path """ basedir = pathlib.Path(__file__).absolute().parent.parent.parent - return basedir.joinpath(*paths) + return str(basedir.joinpath(*paths)) def get_version(): diff --git a/tests/test_cache.py b/tests/test_cache.py index 0c1c431..76d87f8 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -38,7 +38,7 @@ def test_cache_sif(): def test_cache_nosif(): - output, config = run_in_temp(f"{renee_run}") + output, config = run_in_temp(f"{renee_run} --sif-cache not/a/path") assertions = [ config["images"]["arriba"] == "docker://nciccbr/ccbr_arriba_2.0.0:v0.0.1" ] diff --git a/tests/test_run.py b/tests/test_run.py index 793a6db..a2b05be 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -18,7 +18,7 @@ def test_dryrun(): if get_hpcname() == "biowulf": with tempfile.TemporaryDirectory() as tmp_dir: run_args = argparse.Namespace( - input=list(glob.glob(renee_base(".tests", "*.fastq.gz"))), + input=list(glob.glob(f"{renee_base('.tests')}/*.fastq.gz")), output=tmp_dir, genome=renee_base("config", "genomes", "biowulf", "hg38_36.json"), mode="slurm", diff --git a/tests/test_util.py b/tests/test_util.py index 3b45454..17f8b5f 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -17,7 +17,7 @@ def test_renee_base(): renee_bin = renee_base(os.path.join("bin", "renee")) - assert str(renee_bin).endswith("/bin/renee") and renee_bin.exists() + assert str(renee_bin).endswith("/bin/renee") and os.path.exists(renee_bin) def test_cp_safe(): From ca385125dd63c7fe22602645540d026d891f3c07 Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 17:28:28 -0400 Subject: [PATCH 10/12] fix: import subprocess --- src/renee/orchestrate.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/renee/orchestrate.py b/src/renee/orchestrate.py index 47079f9..35e758f 100644 --- a/src/renee/orchestrate.py +++ b/src/renee/orchestrate.py @@ -1,4 +1,6 @@ import os +import subprocess + from ccbr_tools.pipeline.util import ( get_hpcname, get_tmp_dir, @@ -16,7 +18,7 @@ def orchestrate( masterjob="pl:renee", tmp_dir=None, wait="", - hpcname="", + hpcname=get_hpcname(), ): """Runs RENEE pipeline via selected executor: local or slurm. If 'local' is selected, the pipeline is executed locally on a compute node/instance. From a81420d5ec6dd3a91cf408b8940809ec38e102d7 Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 17:29:04 -0400 Subject: [PATCH 11/12] fix: close file after reading --- src/renee/run.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/renee/run.py b/src/renee/run.py index 90adcf3..69ffbae 100644 --- a/src/renee/run.py +++ b/src/renee/run.py @@ -115,11 +115,11 @@ def run(sub_args): ) ) elif sub_args.mode == "slurm": - jobid = ( - open(os.path.join(sub_args.output, "logfiles", "mjobid.log")) - .read() - .strip() - ) + with open( + os.path.join(sub_args.output, "logfiles", "mjobid.log"), "r" + ) as file: + jobid = file.read().strip() + if int(masterjob.returncode) == 0: print("Successfully submitted master job: ", end="") else: From 93092baafe9d2a42a99e9127b8121401fc6a94dc Mon Sep 17 00:00:00 2001 From: Kelly Sovacool Date: Tue, 13 Aug 2024 17:34:20 -0400 Subject: [PATCH 12/12] fix: close files after done error: sys:1: ResourceWarning: unclosed ResourceWarning: Enable tracemalloc to get the object allocation traceback --- src/renee/__main__.py | 8 +++++--- src/renee/orchestrate.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/renee/__main__.py b/src/renee/__main__.py index e5818ee..7d3c181 100755 --- a/src/renee/__main__.py +++ b/src/renee/__main__.py @@ -397,9 +397,11 @@ def build(sub_args): ) ) elif sub_args.mode == "slurm": - jobid = ( - open(os.path.join(sub_args.output, "logfiles", "bjobid.log")).read().strip() - ) + with open( + os.path.join(sub_args.output, "logfiles", "bjobid.log"), "r" + ) as infile: + jobid = infile.read().strip() + if int(masterjob.returncode) == 0: print("Successfully submitted master job: ", end="") else: diff --git a/src/renee/orchestrate.py b/src/renee/orchestrate.py index 35e758f..4792e17 100644 --- a/src/renee/orchestrate.py +++ b/src/renee/orchestrate.py @@ -163,5 +163,5 @@ def orchestrate( masterjob = subprocess.Popen( cmdlist, cwd=outdir, stderr=subprocess.STDOUT, stdout=logfh, env=my_env ) - + logfh.close() return masterjob