Skip to content

Commit

Permalink
prodigal and hmmer verbose bug (#90)
Browse files Browse the repository at this point in the history
- 🔥 Remove verbose flags.
- 🔥 Remove log flag from hmmer.py
- 🎨 Change prodigal.run and hmmer.hmmscan to annotate with respective funcs parallel and sequential.
- 🎨 Aggregation of ORFs in prodigal performed with aggregrate_orfs func.
- 🎨 Change subprocess.call to subprocess.run
- 🎨 when GNU parallel called, use shell=True for subprocess.run otherwise use default shell=False
- 🎨💚 Isolate main block and move if name == main to bottom.
- 🎨📝🐎 Add hmmer serial/parallel modes.
- 🎨 Add gnu-parallel arg to parameters.
- 🎨 Default hmmscan from standalone module now runs in serial mode.
- 🎨🔥 Remove unnecessary variable assignment.
- 🎨🔥 Remove unnecessary proc.check_returncode().
- 🔥📝 Remove unused log parameter in hmmscan func.
- 🔥📝 Add note to docstring.
  • Loading branch information
evanroyrees authored Jun 18, 2020
1 parent 67387c5 commit 16bfca4
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 188 deletions.
192 changes: 112 additions & 80 deletions autometa/common/external/hmmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import sys
import tempfile

import multiprocessing as mp
import pandas as pd

from glob import glob
Expand All @@ -42,9 +43,84 @@
logger = logging.getLogger(__name__)


def hmmscan(orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, log=None):
def annotate_parallel(orfs, hmmdb, outfpath, cpus):
outdir = os.path.dirname(os.path.realpath(outfpath))
outprefix = os.path.splitext(os.path.basename(outfpath))[0]
tmp_dirpath = tempfile.mkdtemp(dir=outdir)
__, tmp_fpath = tempfile.mkstemp(
suffix=".{#}.txt", prefix=outprefix, dir=tmp_dirpath
)
log = os.path.join(outdir, "hmmscan.parallel.log")
jobs = f"-j{cpus}"
cmd = [
"parallel",
"--retries",
"4",
"--joblog",
log,
jobs,
"--linebuffer",
"--pipe",
"--recstart",
"'>'",
"hmmscan",
"--cpu",
"0",
"-o",
os.devnull,
"--tblout",
tmp_fpath,
hmmdb,
"-",
"<",
orfs,
"2>",
os.devnull,
]
cmdline = subprocess.list2cmdline(cmd)
logger.debug(cmdline)
try:
subprocess.run(cmdline, shell=True, check=True)
except subprocess.CalledProcessError as err:
logger.warning(f"Make sure your hmm profiles are pressed! hmmpress -f {hmmdb}")
shutil.rmtree(tmp_dirpath)
raise err
tmp_fpaths = glob(os.path.join(tmp_dirpath, "*.txt"))
lines = ""
buffer_limit = 60000
with open(outfpath, "w") as out:
for fp in tmp_fpaths:
with open(fp) as fh:
for line in fh:
lines += line
if sys.getsizeof(lines) >= buffer_limit:
out.write(lines)
lines = ""
out.write(lines)
shutil.rmtree(tmp_dirpath)


def annotate_sequential(orfs, hmmdb, outfpath, cpus):
cmd = ["hmmscan", "--cpu", str(cpus), "--tblout", outfpath, hmmdb, orfs]
logger.debug(" ".join(cmd))
try:
subprocess.run(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
except subprocess.CalledProcessError as err:
logger.warning(f"Make sure your hmm profiles are pressed! hmmpress -f {hmmdb}")
raise err


def hmmscan(
orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, gnu_parallel=False
):
"""Runs hmmscan on dataset ORFs and provided hmm database.
Note
----
Only one of `parallel` and `gnu_parallel` may be provided as True
Parameters
----------
orfs : str
Expand All @@ -58,10 +134,9 @@ def hmmscan(orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, log=None)
force : bool, optional
Overwrite existing `outfpath` (the default is False).
parallel : bool, optional
Will parallelize hmmscan using GNU parallel (the default is True).
log : str, optional
</path/to/parallel.log> (the default is None). If provided will write
parallel log to `log`.
Will use multithreaded parallelization offered by hmmscan (the default is True).
gnu_parallel : bool, optional
Will parallelize hmmscan using GNU parallel (the default is False).
Returns
-------
Expand All @@ -70,73 +145,26 @@ def hmmscan(orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, log=None)
Raises
-------
ValueError
Both `parallel` and `gnu_parallel` were provided as True
FileExistsError
`outfpath` already exists
OSError
subprocess.CalledProcessError
hmmscan failed
"""
if gnu_parallel and parallel:
raise ValueError("Both parallel and gnu_parallel were provided as True")
# OPTIMIZE: we want to extend parallel to grid computing (workqueue?) via --sshlogin?
if os.path.exists(outfpath) and os.path.getsize(outfpath) > 0 and not force:
raise FileExistsError(f"{outfpath}. Use force to overwrite!")
if parallel:
outdir = os.path.dirname(os.path.realpath(outfpath))
outprefix = os.path.splitext(os.path.basename(outfpath))[0]
tmp_dirpath = tempfile.mkdtemp(dir=outdir)
__, tmp_fpath = tempfile.mkstemp(
suffix=".{#}.txt", prefix=outprefix, dir=tmp_dirpath
)
jobs = f"-j{cpus}"
cmd = [
"parallel",
"--retries",
"4",
jobs,
"--linebuffer",
"--pipe",
"--recstart",
"'>'",
"hmmscan",
"-o",
os.devnull,
"--tblout",
tmp_fpath,
hmmdb,
"-",
"<",
orfs,
"2>",
os.devnull,
]
if log:
cmd.insert(3, log)
cmd.insert(3, "--joblog")
if gnu_parallel:
annotate_parallel(orfs=orfs, hmmdb=hmmdb, outfpath=outfpath, cpus=cpus)
elif parallel:
cpus = mp.cpu_count() if not cpus else cpus
annotate_sequential(orfs=orfs, hmmdb=hmmdb, outfpath=outfpath, cpus=cpus)
else:
cmd = ["hmmscan", "--cpu", str(cpus), "--tblout", outfpath, hmmdb, orfs]
logger.debug(f'cmd: {" ".join(cmd)}')
if parallel:
proc = subprocess.run(" ".join(cmd), shell=True)
tmp_fpaths = glob(os.path.join(tmp_dirpath, "*.txt"))
lines = ""
buffer_limit = 60000
with open(outfpath, "w") as out:
for fp in tmp_fpaths:
with open(fp) as fh:
for line in fh:
lines += line
if sys.getsizeof(lines) >= buffer_limit:
out.write(lines)
lines = ""
out.write(lines)
shutil.rmtree(tmp_dirpath)
else:
proc = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
try:
proc.check_returncode()
except subprocess.CalledProcessError as err:
logger.warning(f"Make sure your hmm profiles are pressed!")
logger.warning(f"hmmpress -f {hmmdb}")
logger.error(f"Args:{cmd} ReturnCode:{proc.returncode}")
raise err
annotate_sequential(orfs=orfs, hmmdb=hmmdb, outfpath=outfpath, cpus=0)
if not os.path.exists(outfpath):
raise FileNotFoundError(f"{outfpath} not written.")
return outfpath
Expand All @@ -159,17 +187,16 @@ def hmmpress(fpath):
-------
FileNotFoundError
`fpath` not found.
ChildProcessError
subprocess.CalledProcessError
hmmpress failed
"""
if not os.path.exists(fpath):
raise FileNotFoundError(fpath)
cmd = f"hmmpress -f {fpath}"
logger.debug(cmd)
with open(os.devnull, "w") as STDOUT, open(os.devnull, "w") as STDERR:
retcode = subprocess.call(cmd, stdout=STDOUT, stderr=STDERR, shell=True)
if retcode:
raise ChildProcessError(f"{cmd} failed with returncode: {retcode}")
cmd = ["hmmpress", "-f", fpath]
logger.debug(" ".join(cmd))
subprocess.run(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
return fpath


Expand Down Expand Up @@ -206,13 +233,13 @@ def filter_markers(infpath, outfpath, cutoffs, orfs=None, force=False):
"""
for fp in [infpath, cutoffs]:
if not os.path.exists(fp):
raise FileNotFoundError(f"{fp} not found")
raise FileNotFoundError(fp)
if os.path.exists(outfpath) and os.path.getsize(outfpath) > 0 and not force:
raise FileExistsError(f"{outfpath} already exists")
hmmtab_header = ["sname", "sacc", "orf", "score"]
col_indices = [0, 1, 2, 5]
columns = {i: k for i, k in zip(col_indices, hmmtab_header)}
df = pd.read_csv(infpath, sep="\s+", usecols=col_indices, header=None, comment="#",)
df = pd.read_csv(infpath, sep="\s+", usecols=col_indices, header=None, comment="#")
df.rename(columns=columns, inplace=True)
# NaN may result from parsing issues while merging parallel results
df.dropna(inplace=True)
Expand Down Expand Up @@ -240,8 +267,9 @@ def main():
import logging as logger

logger.basicConfig(
format="%(asctime)s : %(name)s : %(levelname)s : %(message)s",
format="[%(asctime)s %(levelname)s] %(name)s: %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
level=logger.DEBUG,
)
parser = argparse.ArgumentParser(
description="Retrieves markers with provided input assembly"
Expand All @@ -251,17 +279,21 @@ def main():
parser.add_argument("cutoffs", help="</path/to/hmm/cutoffs.tsv>")
parser.add_argument("hmmscan", help="</path/to/hmmscan.out>")
parser.add_argument("markers", help="</path/to/markers.tsv>")
parser.add_argument("--log", help="</path/to/parallel.log>")
parser.add_argument(
"--force", help="force overwrite of out filepath", action="store_true"
)
parser.add_argument("--cpus", help="num cpus to use", default=0, type=int)
parser.add_argument("--parallel", help="enable GNU parallel", action="store_true")
parser.add_argument("--verbose", help="add verbosity", action="store_true")
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--parallel",
help="enable hmmer multithreaded parallelization",
action="store_true",
)
group.add_argument(
"--gnu-parallel", help="enable GNU parallelization", action="store_true"
)
args = parser.parse_args()

if args.verbose:
logger.setLevel(logger.DEBUG)
if (
os.path.exists(args.hmmscan)
and os.path.getsize(args.hmmscan) > 0
Expand All @@ -276,7 +308,7 @@ def main():
cpus=args.cpus,
force=args.force,
parallel=args.parallel,
log=args.log,
gnu_parallel=args.gnu_parallel,
)

result = filter_markers(
Expand Down
Loading

0 comments on commit 16bfca4

Please sign in to comment.