Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aln2counts_simplify tool #1023

Merged
merged 8 commits into from
Oct 16, 2023
88 changes: 58 additions & 30 deletions micall/utils/aln2counts_simplify.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
from collections import defaultdict
import logging
import os
import re
from csv import DictReader
from typing import Union

from micall.core.trim_fastqs import trim
from micall.utils.dd import DD
from micall.core.aln2counts import aln2counts

logger = logging.getLogger(__name__)

ALIGNED_CSV_HEADER = 'refname,qcut,rank,count,offset,seq'
SUBSEQ_ENV_VARNAME = 'MICALL_DD_SUBSEQ'

def parse_args(argv):
parser = ArgumentParser(
Expand Down Expand Up @@ -38,11 +42,15 @@ def __init__(self,
super(MicallDD, self).__init__()
self.filename = filename
self.simple = simple
base, ext = os.path.splitext(simple)
self.best = base + '_best' + ext
self.get_result = getattr(self, 'check_' + test_name)
self.reads = read_aligned(self.filename)

expected_subsequence = os.environ.get(SUBSEQ_ENV_VARNAME, None)
if expected_subsequence is None:
raise RuntimeError(f"Expected ${SUBSEQ_ENV_VARNAME!r} environment variable to be set for the {'subseq'!r} test")
Donaim marked this conversation as resolved.
Show resolved Hide resolved

self.expected_subsequence_re = re.compile(expected_subsequence)
donkirkby marked this conversation as resolved.
Show resolved Hide resolved

def _test(self, read_indexes):
read_count = len(read_indexes)
self.write_simple_aligned(self.simple, read_indexes)
Expand All @@ -52,56 +60,65 @@ def writer(filename):
return open(os.path.join(workdir, filename), 'w+')

with open(self.simple, 'r') as aligned_csv, \
writer('stitched.csv') as output:
writer('nuc.csv') as nuc_csv, \
writer('amino.csv') as amino_csv, \
writer('insertions.csv') as insertions_csv, \
writer('conseq.csv') as conseq_csv, \
writer('failed_align.csv') as failed_align_csv, \
writer('nuc_detail.csv') as nuc_detail_csv, \
writer('stitched.csv') as stitched_csv:

# noinspection PyBroadException
try:
aln2counts(aligned_csv,
# TODO: maybe redirect to os.devnull instead.
writer('nuc.csv'),
writer('amino.csv'),
writer('insertions.csv'),
writer('conseq.csv'),
writer('failed_align.csv'),
# --- #
conseq_stitched_csv=output,
nuc_detail_csv=writer("nuc_detail.csv"),
aln2counts(# Inputs #
aligned_csv,
nuc_csv,
amino_csv,
insertions_csv,
conseq_csv,
failed_align_csv,
# Outputs #
nuc_detail_csv=nuc_detail_csv,
conseq_stitched_csv=stitched_csv,
)

exception = None
except Exception as ex:
logger.warning(f'Read counting failed: {ex!r}.', exc_info=True)
print(f'Read counting failed: {ex!r}.')
exception = ex

output.seek(0)
result = self.get_result(output, read_count, exception)
stitched_csv.seek(0)
result = self.get_result(stitched_csv, read_count, exception)
if result == DD.FAIL:
os.rename(self.simple, self.best)
save_best(aligned_csv)
save_best(nuc_csv)
save_best(amino_csv)
save_best(insertions_csv)
save_best(conseq_csv)
save_best(failed_align_csv)
save_best(stitched_csv)

return result

@staticmethod
def check_subseq(output, read_count, exception):
def check_subseq(self, stitched_csv, read_count, exception):
if exception is not None:
return DD.UNRESOLVED

simple_count = len(output.readlines()) - 1
simple_count = len(stitched_csv.readlines()) - 1

logger.debug('Result: %d simplified reads from %d selected reads.',
simple_count,
read_count)
logger.debug('Result: %d stitched sequences from %d selected reads.',
simple_count, read_count)

expected_substring = os.environ.get('MICALL_DD_SUBSEQ', None)
if expected_substring is None:
raise RuntimeError(f"Expected ${'MICALL_DD_SUBSEQ'!r} environment value to be set for the {'subseq'!r} test")
output.seek(0)
success = any((expected_substring in line) for line in output)
stitched_csv.seek(0)
success = self.expected_subsequence_re.search(stitched_csv.read())

return DD.FAIL if success else DD.PASS

def write_simple_aligned(self, filename, read_indexes):
selected_reads = (self.reads[i] for i in read_indexes)
with open(filename, 'w') as f:
f.write('refname,qcut,rank,count,offset,seq\n')
f.write(ALIGNED_CSV_HEADER)
f.write('\n')
for line in selected_reads:
f.write(line)

Expand All @@ -119,6 +136,17 @@ def coerce(self, c):
for block in blocks) + ']'


def save_best(file: Union[str, '_io.TextIOWrapper']):
""" Save the current best version of a file.
"""

filename = file if type(file) is str else file.name
base, ext = os.path.splitext(filename)
best = base + '_best' + ext

os.rename(filename, best)


def read_aligned(filename):
""" Load all the reads from an aligned reads file into a dictionary.

Expand All @@ -130,7 +158,7 @@ def read_aligned(filename):
header = next(f)

# Sanity check that may detect instances where an incorrect file has been passed as input.
if header.strip() != 'refname,qcut,rank,count,offset,seq':
if header.strip() != ALIGNED_CSV_HEADER.strip():
raise ValueError(f'Aligned reads file {filename!r} does not start with a known header')

return f.readlines()
Expand Down
Loading