Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

verbose bug #90

Merged
merged 6 commits into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 112 additions & 80 deletions autometa/common/external/hmmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import sys
import tempfile

import multiprocessing as mp
import pandas as pd

from glob import glob
Expand All @@ -42,9 +43,84 @@
logger = logging.getLogger(__name__)


def hmmscan(orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, log=None):
def annotate_parallel(orfs, hmmdb, outfpath, cpus):
outdir = os.path.dirname(os.path.realpath(outfpath))
outprefix = os.path.splitext(os.path.basename(outfpath))[0]
tmp_dirpath = tempfile.mkdtemp(dir=outdir)
__, tmp_fpath = tempfile.mkstemp(
suffix=".{#}.txt", prefix=outprefix, dir=tmp_dirpath
)
log = os.path.join(outdir, "hmmscan.parallel.log")
jobs = f"-j{cpus}"
cmd = [
"parallel",
"--retries",
"4",
"--joblog",
log,
jobs,
"--linebuffer",
"--pipe",
"--recstart",
"'>'",
"hmmscan",
"--cpu",
"0",
"-o",
os.devnull,
"--tblout",
tmp_fpath,
hmmdb,
"-",
"<",
orfs,
"2>",
os.devnull,
]
cmdline = subprocess.list2cmdline(cmd)
logger.debug(cmdline)
try:
subprocess.run(cmdline, shell=True, check=True)
except subprocess.CalledProcessError as err:
logger.warning(f"Make sure your hmm profiles are pressed! hmmpress -f {hmmdb}")
shutil.rmtree(tmp_dirpath)
raise err
tmp_fpaths = glob(os.path.join(tmp_dirpath, "*.txt"))
lines = ""
buffer_limit = 60000
with open(outfpath, "w") as out:
for fp in tmp_fpaths:
with open(fp) as fh:
for line in fh:
lines += line
if sys.getsizeof(lines) >= buffer_limit:
out.write(lines)
lines = ""
out.write(lines)
shutil.rmtree(tmp_dirpath)


def annotate_sequential(orfs, hmmdb, outfpath, cpus):
cmd = ["hmmscan", "--cpu", str(cpus), "--tblout", outfpath, hmmdb, orfs]
logger.debug(" ".join(cmd))
try:
subprocess.run(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
except subprocess.CalledProcessError as err:
logger.warning(f"Make sure your hmm profiles are pressed! hmmpress -f {hmmdb}")
raise err


def hmmscan(
orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, gnu_parallel=False
):
"""Runs hmmscan on dataset ORFs and provided hmm database.

Note
----
Only one of `parallel` and `gnu_parallel` may be provided as True

Parameters
----------
orfs : str
Expand All @@ -58,10 +134,9 @@ def hmmscan(orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, log=None)
force : bool, optional
Overwrite existing `outfpath` (the default is False).
parallel : bool, optional
Will parallelize hmmscan using GNU parallel (the default is True).
log : str, optional
</path/to/parallel.log> (the default is None). If provided will write
parallel log to `log`.
Will use multithreaded parallelization offered by hmmscan (the default is True).
gnu_parallel : bool, optional
Will parallelize hmmscan using GNU parallel (the default is False).

Returns
-------
Expand All @@ -70,73 +145,26 @@ def hmmscan(orfs, hmmdb, outfpath, cpus=0, force=False, parallel=True, log=None)

Raises
-------
ValueError
Both `parallel` and `gnu_parallel` were provided as True
FileExistsError
`outfpath` already exists
OSError
subprocess.CalledProcessError
hmmscan failed

"""
if gnu_parallel and parallel:
raise ValueError("Both parallel and gnu_parallel were provided as True")
# OPTIMIZE: we want to extend parallel to grid computing (workqueue?) via --sshlogin?
if os.path.exists(outfpath) and os.path.getsize(outfpath) > 0 and not force:
raise FileExistsError(f"{outfpath}. Use force to overwrite!")
if parallel:
outdir = os.path.dirname(os.path.realpath(outfpath))
outprefix = os.path.splitext(os.path.basename(outfpath))[0]
tmp_dirpath = tempfile.mkdtemp(dir=outdir)
__, tmp_fpath = tempfile.mkstemp(
suffix=".{#}.txt", prefix=outprefix, dir=tmp_dirpath
)
jobs = f"-j{cpus}"
cmd = [
"parallel",
"--retries",
"4",
jobs,
"--linebuffer",
"--pipe",
"--recstart",
"'>'",
"hmmscan",
"-o",
os.devnull,
"--tblout",
tmp_fpath,
hmmdb,
"-",
"<",
orfs,
"2>",
os.devnull,
]
if log:
cmd.insert(3, log)
cmd.insert(3, "--joblog")
if gnu_parallel:
annotate_parallel(orfs=orfs, hmmdb=hmmdb, outfpath=outfpath, cpus=cpus)
elif parallel:
cpus = mp.cpu_count() if not cpus else cpus
annotate_sequential(orfs=orfs, hmmdb=hmmdb, outfpath=outfpath, cpus=cpus)
else:
cmd = ["hmmscan", "--cpu", str(cpus), "--tblout", outfpath, hmmdb, orfs]
logger.debug(f'cmd: {" ".join(cmd)}')
if parallel:
proc = subprocess.run(" ".join(cmd), shell=True)
tmp_fpaths = glob(os.path.join(tmp_dirpath, "*.txt"))
lines = ""
buffer_limit = 60000
with open(outfpath, "w") as out:
for fp in tmp_fpaths:
with open(fp) as fh:
for line in fh:
lines += line
if sys.getsizeof(lines) >= buffer_limit:
out.write(lines)
lines = ""
out.write(lines)
shutil.rmtree(tmp_dirpath)
else:
proc = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
try:
proc.check_returncode()
except subprocess.CalledProcessError as err:
logger.warning(f"Make sure your hmm profiles are pressed!")
logger.warning(f"hmmpress -f {hmmdb}")
logger.error(f"Args:{cmd} ReturnCode:{proc.returncode}")
raise err
annotate_sequential(orfs=orfs, hmmdb=hmmdb, outfpath=outfpath, cpus=0)
if not os.path.exists(outfpath):
raise FileNotFoundError(f"{outfpath} not written.")
return outfpath
Expand All @@ -159,17 +187,16 @@ def hmmpress(fpath):
-------
FileNotFoundError
`fpath` not found.
ChildProcessError
subprocess.CalledProcessError
hmmpress failed
"""
if not os.path.exists(fpath):
raise FileNotFoundError(fpath)
cmd = f"hmmpress -f {fpath}"
logger.debug(cmd)
with open(os.devnull, "w") as STDOUT, open(os.devnull, "w") as STDERR:
retcode = subprocess.call(cmd, stdout=STDOUT, stderr=STDERR, shell=True)
if retcode:
raise ChildProcessError(f"{cmd} failed with returncode: {retcode}")
cmd = ["hmmpress", "-f", fpath]
logger.debug(" ".join(cmd))
subprocess.run(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
return fpath


Expand Down Expand Up @@ -206,13 +233,13 @@ def filter_markers(infpath, outfpath, cutoffs, orfs=None, force=False):
"""
for fp in [infpath, cutoffs]:
if not os.path.exists(fp):
raise FileNotFoundError(f"{fp} not found")
raise FileNotFoundError(fp)
if os.path.exists(outfpath) and os.path.getsize(outfpath) > 0 and not force:
raise FileExistsError(f"{outfpath} already exists")
hmmtab_header = ["sname", "sacc", "orf", "score"]
col_indices = [0, 1, 2, 5]
columns = {i: k for i, k in zip(col_indices, hmmtab_header)}
df = pd.read_csv(infpath, sep="\s+", usecols=col_indices, header=None, comment="#",)
df = pd.read_csv(infpath, sep="\s+", usecols=col_indices, header=None, comment="#")
df.rename(columns=columns, inplace=True)
# NaN may result from parsing issues while merging parallel results
df.dropna(inplace=True)
Expand Down Expand Up @@ -240,8 +267,9 @@ def main():
import logging as logger

logger.basicConfig(
format="%(asctime)s : %(name)s : %(levelname)s : %(message)s",
format="[%(asctime)s %(levelname)s] %(name)s: %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
level=logger.DEBUG,
)
parser = argparse.ArgumentParser(
description="Retrieves markers with provided input assembly"
Expand All @@ -251,17 +279,21 @@ def main():
parser.add_argument("cutoffs", help="</path/to/hmm/cutoffs.tsv>")
parser.add_argument("hmmscan", help="</path/to/hmmscan.out>")
parser.add_argument("markers", help="</path/to/markers.tsv>")
parser.add_argument("--log", help="</path/to/parallel.log>")
parser.add_argument(
"--force", help="force overwrite of out filepath", action="store_true"
)
parser.add_argument("--cpus", help="num cpus to use", default=0, type=int)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good practice to specify default=0. I generally prefer cpus=mp.cpu_count().
I think the user can set cpus=0 or something else if he thinks something is going awry during thread parallelization.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying cpus=0 will tell GNU parallel to use as many as possible. This is why I've set the default to 0 rather than mp.cpu_count()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was unable to find what specifying cpus=0 would do for hmmscan, but I found this on the hmmer manual:

If you specify--cpu 0, the program will run in serial-only mode, with no threads. This might be useful if you suspect something is awry with the threaded parallel implementation

Correct me if I am wrong and I am sure you must have double-checked this but is the application of cpus=0 different for different functions then?

Copy link
Collaborator Author

@evanroyrees evanroyrees Jun 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems cpus=0 causes different behavior between GNU parallel and hmmer. This may be why we were seeing conflicting performance results. We should update this to handle the cpu input appropriately. Nice find! 🔍 🐎

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it that in GNU parallel cpus=0 uses all cpus. Sorry I still was not able to find some documentation that states this behaviour. Is it possible to post it here?

Copy link
Collaborator Author

@evanroyrees evanroyrees Jun 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From man parallel

       --jobs N
       -j N
       --max-procs N
       -P N     Number of jobslots on each machine. Run up to N jobs in parallel.  0 means as many as possible.
                Default is 100% which will run one job per CPU core on each machine.

parser.add_argument("--parallel", help="enable GNU parallel", action="store_true")
parser.add_argument("--verbose", help="add verbosity", action="store_true")
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--parallel",
help="enable hmmer multithreaded parallelization",
action="store_true",
)
group.add_argument(
"--gnu-parallel", help="enable GNU parallelization", action="store_true"
)
args = parser.parse_args()

if args.verbose:
logger.setLevel(logger.DEBUG)
if (
os.path.exists(args.hmmscan)
and os.path.getsize(args.hmmscan) > 0
Expand All @@ -276,7 +308,7 @@ def main():
cpus=args.cpus,
force=args.force,
parallel=args.parallel,
log=args.log,
gnu_parallel=args.gnu_parallel,
)

result = filter_markers(
Expand Down
Loading