diff --git a/autometa/common/external/hmmer.py b/autometa/common/external/hmmer.py index c1db738f2..d7e12ba5a 100644 --- a/autometa/common/external/hmmer.py +++ b/autometa/common/external/hmmer.py @@ -31,6 +31,7 @@ import sys import tempfile +import multiprocessing as mp import pandas as pd from glob import glob @@ -42,9 +43,84 @@ logger = logging.getLogger(__name__) -def hmmscan(orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, log=None): +def annotate_parallel(orfs, hmmdb, outfpath, cpus): + outdir = os.path.dirname(os.path.realpath(outfpath)) + outprefix = os.path.splitext(os.path.basename(outfpath))[0] + tmp_dirpath = tempfile.mkdtemp(dir=outdir) + __, tmp_fpath = tempfile.mkstemp( + suffix=".{#}.txt", prefix=outprefix, dir=tmp_dirpath + ) + log = os.path.join(outdir, "hmmscan.parallel.log") + jobs = f"-j{cpus}" + cmd = [ + "parallel", + "--retries", + "4", + "--joblog", + log, + jobs, + "--linebuffer", + "--pipe", + "--recstart", + "'>'", + "hmmscan", + "--cpu", + "0", + "-o", + os.devnull, + "--tblout", + tmp_fpath, + hmmdb, + "-", + "<", + orfs, + "2>", + os.devnull, + ] + cmdline = subprocess.list2cmdline(cmd) + logger.debug(cmdline) + try: + subprocess.run(cmdline, shell=True, check=True) + except subprocess.CalledProcessError as err: + logger.warning(f"Make sure your hmm profiles are pressed! hmmpress -f {hmmdb}") + shutil.rmtree(tmp_dirpath) + raise err + tmp_fpaths = glob(os.path.join(tmp_dirpath, "*.txt")) + lines = "" + buffer_limit = 60000 + with open(outfpath, "w") as out: + for fp in tmp_fpaths: + with open(fp) as fh: + for line in fh: + lines += line + if sys.getsizeof(lines) >= buffer_limit: + out.write(lines) + lines = "" + out.write(lines) + shutil.rmtree(tmp_dirpath) + + +def annotate_sequential(orfs, hmmdb, outfpath, cpus): + cmd = ["hmmscan", "--cpu", str(cpus), "--tblout", outfpath, hmmdb, orfs] + logger.debug(" ".join(cmd)) + try: + subprocess.run( + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True + ) + except subprocess.CalledProcessError as err: + logger.warning(f"Make sure your hmm profiles are pressed! hmmpress -f {hmmdb}") + raise err + + +def hmmscan( + orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, gnu_parallel=False +): """Runs hmmscan on dataset ORFs and provided hmm database. + Note + ---- + Only one of `parallel` and `gnu_parallel` may be provided as True + Parameters ---------- orfs : str @@ -58,10 +134,9 @@ def hmmscan(orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, log=None) force : bool, optional Overwrite existing `outfpath` (the default is False). parallel : bool, optional - Will parallelize hmmscan using GNU parallel (the default is True). - log : str, optional - (the default is None). If provided will write - parallel log to `log`. + Will use multithreaded parallelization offered by hmmscan (the default is True). + gnu_parallel : bool, optional + Will parallelize hmmscan using GNU parallel (the default is False). Returns ------- @@ -70,73 +145,26 @@ def hmmscan(orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, log=None) Raises ------- + ValueError + Both `parallel` and `gnu_parallel` were provided as True FileExistsError `outfpath` already exists - OSError + subprocess.CalledProcessError hmmscan failed + """ + if gnu_parallel and parallel: + raise ValueError("Both parallel and gnu_parallel were provided as True") # OPTIMIZE: we want to extend parallel to grid computing (workqueue?) via --sshlogin? if os.path.exists(outfpath) and os.path.getsize(outfpath) > 0 and not force: raise FileExistsError(f"{outfpath}. Use force to overwrite!") - if parallel: - outdir = os.path.dirname(os.path.realpath(outfpath)) - outprefix = os.path.splitext(os.path.basename(outfpath))[0] - tmp_dirpath = tempfile.mkdtemp(dir=outdir) - __, tmp_fpath = tempfile.mkstemp( - suffix=".{#}.txt", prefix=outprefix, dir=tmp_dirpath - ) - jobs = f"-j{cpus}" - cmd = [ - "parallel", - "--retries", - "4", - jobs, - "--linebuffer", - "--pipe", - "--recstart", - "'>'", - "hmmscan", - "-o", - os.devnull, - "--tblout", - tmp_fpath, - hmmdb, - "-", - "<", - orfs, - "2>", - os.devnull, - ] - if log: - cmd.insert(3, log) - cmd.insert(3, "--joblog") + if gnu_parallel: + annotate_parallel(orfs=orfs, hmmdb=hmmdb, outfpath=outfpath, cpus=cpus) + elif parallel: + cpus = mp.cpu_count() if not cpus else cpus + annotate_sequential(orfs=orfs, hmmdb=hmmdb, outfpath=outfpath, cpus=cpus) else: - cmd = ["hmmscan", "--cpu", str(cpus), "--tblout", outfpath, hmmdb, orfs] - logger.debug(f'cmd: {" ".join(cmd)}') - if parallel: - proc = subprocess.run(" ".join(cmd), shell=True) - tmp_fpaths = glob(os.path.join(tmp_dirpath, "*.txt")) - lines = "" - buffer_limit = 60000 - with open(outfpath, "w") as out: - for fp in tmp_fpaths: - with open(fp) as fh: - for line in fh: - lines += line - if sys.getsizeof(lines) >= buffer_limit: - out.write(lines) - lines = "" - out.write(lines) - shutil.rmtree(tmp_dirpath) - else: - proc = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - try: - proc.check_returncode() - except subprocess.CalledProcessError as err: - logger.warning(f"Make sure your hmm profiles are pressed!") - logger.warning(f"hmmpress -f {hmmdb}") - logger.error(f"Args:{cmd} ReturnCode:{proc.returncode}") - raise err + annotate_sequential(orfs=orfs, hmmdb=hmmdb, outfpath=outfpath, cpus=0) if not os.path.exists(outfpath): raise FileNotFoundError(f"{outfpath} not written.") return outfpath @@ -159,17 +187,16 @@ def hmmpress(fpath): ------- FileNotFoundError `fpath` not found. - ChildProcessError + subprocess.CalledProcessError hmmpress failed """ if not os.path.exists(fpath): raise FileNotFoundError(fpath) - cmd = f"hmmpress -f {fpath}" - logger.debug(cmd) - with open(os.devnull, "w") as STDOUT, open(os.devnull, "w") as STDERR: - retcode = subprocess.call(cmd, stdout=STDOUT, stderr=STDERR, shell=True) - if retcode: - raise ChildProcessError(f"{cmd} failed with returncode: {retcode}") + cmd = ["hmmpress", "-f", fpath] + logger.debug(" ".join(cmd)) + subprocess.run( + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True + ) return fpath @@ -206,13 +233,13 @@ def filter_markers(infpath, outfpath, cutoffs, orfs=None, force=False): """ for fp in [infpath, cutoffs]: if not os.path.exists(fp): - raise FileNotFoundError(f"{fp} not found") + raise FileNotFoundError(fp) if os.path.exists(outfpath) and os.path.getsize(outfpath) > 0 and not force: raise FileExistsError(f"{outfpath} already exists") hmmtab_header = ["sname", "sacc", "orf", "score"] col_indices = [0, 1, 2, 5] columns = {i: k for i, k in zip(col_indices, hmmtab_header)} - df = pd.read_csv(infpath, sep="\s+", usecols=col_indices, header=None, comment="#",) + df = pd.read_csv(infpath, sep="\s+", usecols=col_indices, header=None, comment="#") df.rename(columns=columns, inplace=True) # NaN may result from parsing issues while merging parallel results df.dropna(inplace=True) @@ -240,8 +267,9 @@ def main(): import logging as logger logger.basicConfig( - format="%(asctime)s : %(name)s : %(levelname)s : %(message)s", + format="[%(asctime)s %(levelname)s] %(name)s: %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p", + level=logger.DEBUG, ) parser = argparse.ArgumentParser( description="Retrieves markers with provided input assembly" @@ -251,17 +279,21 @@ def main(): parser.add_argument("cutoffs", help="") parser.add_argument("hmmscan", help="") parser.add_argument("markers", help="") - parser.add_argument("--log", help="") parser.add_argument( "--force", help="force overwrite of out filepath", action="store_true" ) parser.add_argument("--cpus", help="num cpus to use", default=0, type=int) - parser.add_argument("--parallel", help="enable GNU parallel", action="store_true") - parser.add_argument("--verbose", help="add verbosity", action="store_true") + group = parser.add_mutually_exclusive_group() + group.add_argument( + "--parallel", + help="enable hmmer multithreaded parallelization", + action="store_true", + ) + group.add_argument( + "--gnu-parallel", help="enable GNU parallelization", action="store_true" + ) args = parser.parse_args() - if args.verbose: - logger.setLevel(logger.DEBUG) if ( os.path.exists(args.hmmscan) and os.path.getsize(args.hmmscan) > 0 @@ -276,7 +308,7 @@ def main(): cpus=args.cpus, force=args.force, parallel=args.parallel, - log=args.log, + gnu_parallel=args.gnu_parallel, ) result = filter_markers( diff --git a/autometa/common/external/prodigal.py b/autometa/common/external/prodigal.py index b3ec9968f..8df7f7e69 100644 --- a/autometa/common/external/prodigal.py +++ b/autometa/common/external/prodigal.py @@ -29,6 +29,7 @@ import os import subprocess import shutil +import tempfile from glob import glob from Bio import SeqIO @@ -40,6 +41,89 @@ logger = logging.getLogger(__name__) +def aggregate_orfs(search_str, outfpath): + tmpfpaths = glob(search_str) + lines = "" + for fp in tmpfpaths: + with open(fp) as fh: + for line in fh: + lines += line + out = open(outfpath, "w") + out.write(lines) + out.close() + + +def annotate_sequential(assembly, prots_out, nucls_out): + cmd = [ + "prodigal", + "-i", + assembly, + "-a", + prots_out, + "-d", + nucls_out, + "-p", + "meta", + "-m", + "-q", + ] + cmd = [str(arg) for arg in cmd] + logger.debug(" ".join(cmd)) + subprocess.run( + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True + ) + + +def annotate_parallel(assembly, prots_out, nucls_out, cpus): + outdir = os.path.dirname(os.path.realpath(nucls_out)) + log = os.path.join(outdir, "prodigal.parallel.log") + outprefix = os.path.splitext(os.path.basename(nucls_out))[0] + tmpdir = tempfile.mkdtemp(dir=outdir) + if not os.path.exists(tmpdir): + os.makedirs(tmpdir) + tmpnucl = ".".join([outprefix, "{#}", "fna"]) + tmpprot = ".".join([outprefix, "{#}", "faa"]) + tmpnucl_fpath = os.path.join(tmpdir, tmpnucl) + tmpprot_fpath = os.path.join(tmpdir, tmpprot) + jobs = f"-j{cpus}" + cmd = [ + "parallel", + "--retries", + "4", + "--joblog", + log, + jobs, + "--pipe", + "--recstart", + "'>'", + "--linebuffer", + "prodigal", + "-a", + tmpprot_fpath, + "-d", + tmpnucl_fpath, + "-q", + "-p", + "meta", + "-m", + "-o", + os.devnull, + "<", + assembly, + "2>", + os.devnull, + ] + cmd = [str(arg) for arg in cmd] + cmdline = subprocess.list2cmdline(cmd) + logger.debug(cmdline) + subprocess.run(cmdline, shell=True, check=True) + search_path = os.path.join(tmpdir, "*.faa") + aggregate_orfs(search_path, prots_out) + search_path = os.path.join(tmpdir, "*.fna") + aggregate_orfs(search_path, nucls_out) + shutil.rmtree(tmpdir) + + def run(assembly, nucls_out, prots_out, force=False, cpus=0, parallel=True): """Calls ORFs from provided input assembly @@ -67,8 +151,12 @@ def run(assembly, nucls_out, prots_out, force=False, cpus=0, parallel=True): ------- FileExistsError `nucls_out` or `prots_out` already exists + subprocess.CalledProcessError + prodigal Failed ChildProcessError - Prodigal Failed + `nucls_out` or `prots_out` not written + IOError + `nucls_out` or `prots_out` incorrectly formatted """ if not os.path.exists(assembly): raise FileNotFoundError(f"{assembly} does not exists!") @@ -84,98 +172,16 @@ def run(assembly, nucls_out, prots_out, force=False, cpus=0, parallel=True): if os.path.exists(fpath) and not force: raise FileExistsError(f"{fpath} To overwrite use --force") if parallel: - outdir = os.path.dirname(os.path.realpath(nucls_out)) - # parallel log should indicate time & dataset. i.e. time_dataset.prodigal.parallel.log - log = os.path.join(outdir, "prodigal.parallel.log") - outprefix = os.path.splitext(os.path.basename(nucls_out))[0] - tmpdir = os.path.join(outdir, "tmp") - if not os.path.exists(tmpdir): - os.makedirs(tmpdir) - tmpnucl = ".".join([outprefix, "{#}", "fna"]) - tmpprot = ".".join([outprefix, "{#}", "faa"]) - tmpnucl_fpath = os.path.join(tmpdir, tmpnucl) - tmpprot_fpath = os.path.join(tmpdir, tmpprot) - jobs = f"-j{cpus}" - cmd = [ - "parallel", - "--retries", - "4", - "--joblog", - log, - jobs, - "--pipe", - "--recstart", - "'>'", - "--linebuffer", - "prodigal", - "-a", - tmpprot_fpath, - "-d", - tmpnucl_fpath, - "-q", - "-p", - "meta", - "-m", - "-o", - os.devnull, - "<", - assembly, - "2>", - os.devnull, - ] - else: - cmd = [ - "prodigal", - "-i", - assembly, - "-a", - prots_out, - "-d", - nucls_out, - "-p", - "meta", - "-m", - "-q", - ] - cmd = [str(arg) for arg in cmd] - logger.debug(f'cmd: {" ".join(cmd)}') - if parallel: - try: - returncode = subprocess.call(" ".join(cmd), shell=True) - tmpfpaths = glob(os.path.join(tmpdir, "*.faa")) - lines = "" - for fp in tmpfpaths: - with open(fp) as fh: - for line in fh: - lines += line - out = open(prots_out, "w") - out.write(lines) - out.close() - tmpfpaths = glob(os.path.join(tmpdir, "*.fna")) - lines = "" - for fp in tmpfpaths: - with open(fp) as fh: - for line in fh: - lines += line - out = open(nucls_out, "w") - out.write(lines) - out.close() - except Exception as err: - # COMBAK: Should probably be more descriptive as to what errors could occur here. - logger.exception(err) - finally: - shutil.rmtree(tmpdir) + annotate_parallel( + assembly=assembly, prots_out=prots_out, nucls_out=nucls_out, cpus=cpus + ) else: - with open(os.devnull, "w") as stdout, open(os.devnull, "w") as stderr: - proc = subprocess.run(cmd, stdout=stdout, stderr=stderr) - returncode = proc.returncode - if returncode: - logger.warning(f"Args:{cmd} ReturnCode:{returncode}") - # COMBAK: Check all possible return codes for GNU parallel + annotate_sequential(assembly=assembly, prots_out=prots_out, nucls_out=nucls_out) for fp in [nucls_out, prots_out]: - if not os.path.exists(fp) or os.stat(fp).st_size == 0: + if not os.path.exists(fp) or os.path.getsize(fp) == 0: raise ChildProcessError(f"{fp} not written") try: + # Fasta file format check by simply reading orf annotations with open(fp) as fh: for _ in SimpleFastaParser(fh): pass @@ -295,26 +301,12 @@ def orf_records_from_contigs(contigs, fpath): return records -def main(args): - if args.verbose: - logger.setLevel(logger.DEBUG) - nucls_out, prots_out = run( - assembly=args.assembly, - nucls_out=args.nucls_out, - prots_out=args.prots_out, - force=args.force, - cpus=args.cpus, - parallel=args.parallel, - ) - logger.info(f"written:\nnucls fpath: {nucls_out}\nprots fpath: {prots_out}") - - -if __name__ == "__main__": +def main(): import argparse import logging as logger logger.basicConfig( - format="%(asctime)s : %(name)s : %(levelname)s : %(message)s", + format="[%(asctime)s %(levelname)s] %(name)s: %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p", level=logger.DEBUG, ) @@ -329,8 +321,20 @@ def main(args): ) parser.add_argument("--cpus", help="num cpus to use", type=int, default=0) parser.add_argument( - "--parallel", help="Enable GNU parallel", action="store_true", default=False + "--parallel", help="Enable GNU parllel", action="store_true", default=False ) - parser.add_argument("--verbose", help="add verbosity", action="store_true") args = parser.parse_args() - main(args) + + nucls_out, prots_out = run( + assembly=args.assembly, + nucls_out=args.nucls_out, + prots_out=args.prots_out, + force=args.force, + cpus=args.cpus, + parallel=args.parallel, + ) + logger.info(f"written:\nnucls fpath: {nucls_out}\nprots fpath: {prots_out}") + + +if __name__ == "__main__": + main()