diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..04e9eda --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,44 @@ +# Change Log + +## Version 0.1.3 (08/27/2021) + +### Changed +- Migrated from Travis CI to GitHub Actions (#127). +- Made `--map-as-rank` default when only mapping file(s) are provided (#132). +- Renamed `--normalize|-z` as `--frac|-f` (#128). +- Modified core algorithm which slightly improved performance (#124). + +### Added +- Added `tool normalize` command, with multiple features (#124). +- Added the feature to collapse a stratified table (#126). +- Created an WoL FTP server, and added link to it (#118). +- Added an WoL standard operating procedure (`wolsop.sh`) and documentation (#116). +- Added first [citation](https://www.biorxiv.org/content/10.1101/2021.04.04.438427v1.abstract) of Woltka (#111). +- Added protocols for Bowtie2 / SHOGUN and Fastp (#121). +- Added discussion about mapping uniqueness (#131). + +### Fixed +- Fixed free-rank classification subject not found issue (#120). +- Corrected paths to example files and directories (#117). + + +## Version 0.1.2 (03/31/2021) + +### Changed +- Updated Qiita documentation (#107). +- Renamed "gOTU" with "OGU" (#104). + +### Added +- Published at PyPI. Can be installed by `pip install woltka` (#108). +- Added instructions for using MetaCyc and KEGG (#99, #101). +- Added `tool collapse` command, which supports one-to-many classification (#99). + +### Fixed +- Fixed Handling of zero length alignment (#105). + + +## Version 0.1.1 (02/17/2021) + +### Added +- First official release. + diff --git a/README.md b/README.md index be00300..208c6b4 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![CI Status](https://github.com/qiyunzhu/woltka/actions/workflows/main.yml/badge.svg)](https://github.com/qiyunzhu/woltka/actions) [![Coverage Status](https://coveralls.io/repos/github/qiyunzhu/woltka/badge.svg?branch=master)](https://coveralls.io/github/qiyunzhu/woltka?branch=master) -**Woltka** (Web of Life Toolkit App), is a bioinformatics package for shotgun metagenome data analysis. It takes full advantage of, and it not limited by, the [WoL](https://biocore.github.io/wol/) reference phylogeny. It bridges first-pass sequence aligners with advanced analytical platforms (such as QIIME 2). Highlights of this program include: +**Woltka** (Web of Life Toolkit App), is a bioinformatics package for shotgun metagenome data analysis. It takes full advantage of, and is not limited by, the [WoL](https://biocore.github.io/wol/) reference phylogeny. It bridges first-pass sequence aligners with advanced analytical platforms (such as QIIME 2). Highlights of this program include: - OGU: fine-grained community ecology. - Tree-based, rank-free classification. diff --git a/woltka/__init__.py b/woltka/__init__.py index 98f5557..2b72289 100644 --- a/woltka/__init__.py +++ b/woltka/__init__.py @@ -10,7 +10,7 @@ __name__ = 'woltka' __description__ = 'Web of Life ToolKit App' -__version__ = '0.1.2' +__version__ = '0.1.3' __license__ = 'BSD-3-Clause' __author__ = 'Qiyun Zhu' __email__ = 'qiyunzhu@gmail.com' diff --git a/woltka/align.py b/woltka/align.py index 080c9d7..2499f3a 100644 --- a/woltka/align.py +++ b/woltka/align.py @@ -30,7 +30,6 @@ def plain_mapper(fh, fmt=None, n=1000): """Read an alignment file in chunks and yield query-to-subject(s) maps. - Parameters ---------- fh : file handle @@ -39,14 +38,12 @@ def plain_mapper(fh, fmt=None, n=1000): Alignment file format. n : int, optional Number of lines per chunk. - Yields ------ deque of str Query queue. - deque of dict of str -> tuple + deque of set of str Subject(s) queue. - Notes ----- The design of this function aims to couple with the extremely large size of @@ -72,20 +69,18 @@ def plain_mapper(fh, fmt=None, n=1000): for i, line in enumerate(chain(iter(head), fh)): # parse current alignment line - parsed = parser(line) try: - query, subject = parsed[:2] + query, subject = parser(line)[:2] except (TypeError, IndexError): continue - start, end = parsed[4:6] if len(parsed) >= 6 else (None, None) - # add subject to subject set of the same query Id, - # keeping track of read indices + # add subject to subject set of the same query Id if query == this: - subque[-1].setdefault(subject, []).append((start, end)) + subque[-1].add(subject) # when query Id changes, else: + # line number has reached target if i >= target: @@ -102,9 +97,9 @@ def plain_mapper(fh, fmt=None, n=1000): # next target line number target = i + n - # create new query and subject map pair + # create new query and subject set pair qry_append(query) - sub_append({subject: [(start, end)]}) + sub_append({subject}) # update current query Id this = query @@ -113,6 +108,68 @@ def plain_mapper(fh, fmt=None, n=1000): yield qryque, subque +def range_mapper(fh, fmt=None, n=1000): + """Read an alignment file and yield maps of query to subject(s) and their + ranges. + + Parameters + ---------- + fh : file handle + Alignment file to parse. + fmt : str, optional + Alignment file format. + n : int, optional + Number of lines per chunk. + + Yields + ------ + deque of str + Query queue. + deque of dict of str to (int, int) + Subject-to-ranges queue. + + Notes + ----- + Same as `plain_mapper`, except that it also returns subject ranges. + + See Also + -------- + plain_mapper + """ + fmt, head = (fmt, []) if fmt else infer_align_format(fh) + parser = assign_parser(fmt) + qryque, subque = deque(), deque() + qry_append, sub_append = qryque.append, subque.append + this = None + target = n + for i, line in enumerate(chain(iter(head), fh)): + + # retain subject range + try: + query, subject, _, _, start, end = parser(line)[:6] + except (TypeError, IndexError): + continue + + # range must be positive integers + if start and end: + + if query == this: + subque[-1].setdefault(subject, []).append((start, end)) + else: + if i >= target: + yield qryque, subque + qryque, subque = deque(), deque() + qry_append, sub_append = qryque.append, subque.append + target = i + n + qry_append(query) + + # return subject Id and range + sub_append({subject: [(start, end)]}) + + this = query + yield qryque, subque + + def infer_align_format(fh): """Guess the format of an alignment file based on content. diff --git a/woltka/cover.py b/woltka/cover.py deleted file mode 100644 index 71f328e..0000000 --- a/woltka/cover.py +++ /dev/null @@ -1,57 +0,0 @@ -class SortedRangeList: - def __init__(self, autocompress=100000): - self.ranges = [] - self.autocompress = autocompress - self.since_compressed = 0 - - def add_range(self, start, end): - self.ranges.append((start, end)) - self.since_compressed += 1 - if self.autocompress is not None and \ - self.since_compressed > self.autocompress: - self.compress() - - def compress(self): - if self.since_compressed == 0: - return - # Sort ranges by start index - self.ranges.sort(key=lambda r: r[0]) - - new_ranges = [] - start_val = None - end_val = None - - for r in self.ranges: - if end_val is None: - # case 1: no active range, start active range. - start_val = r[0] - end_val = r[1] - elif end_val >= r[0] - 1: - # case 2: active range continues through this range - # extend active range - end_val = max(end_val, r[1]) - else: # if end_val < r[0] - 1: - # case 3: active range ends before this range begins - # write new range out, then start new active range - new_range = (start_val, end_val) - new_ranges.append(new_range) - start_val = r[0] - end_val = r[1] - - if end_val is not None: - new_range = (start_val, end_val) - new_ranges.append(new_range) - - self.ranges = new_ranges - self.since_compressed = 0 - - def compute_length(self): - if self.since_compressed > 0: - self.compress() - total = 0 - for r in self.ranges: - total += r[1] - r[0] + 1 - return total - - def __str__(self): - return str(self.ranges) diff --git a/woltka/coverage.py b/woltka/coverage.py new file mode 100644 index 0000000..d2505fa --- /dev/null +++ b/woltka/coverage.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 + +# ---------------------------------------------------------------------------- +# Copyright (c) 2020--, Qiyun Zhu. +# +# Distributed under the terms of the Modified BSD License. +# +# The full license is in the file LICENSE, distributed with this software. +# ---------------------------------------------------------------------------- + + +"""Functions for calculating coverage of subject sequences. + +The "coverage" refers to the ranges of a subject sequence (e.g., genome) that +are covered by at least one query sequence (e.g., short sequencing read). + +The functions `merge_ranges` and `parse_ranges` were modified based on the +`SortedRangeList` class implemented by Daniel Hakim (@dhakim87) in the Zebra +Filter program (https://github.com/ucsd-cmi/zebra_filter). +""" + +from os import makedirs +from os.path import join +from collections import deque + + +def merge_ranges(ranges): + """Merge short, fragmental ranges into long, continuous ones. + + Parameters + ---------- + ranges : list of (int, int) + Ranges (start, end) to merge. + + Returns + ------- + list of (int, int) + Merged ranges. + + Notes + ----- + Ranges that have overlaps will be merged into one. For example: + + >>> merge_ranges([(1, 3), (2, 4), (6, 8), (7, 9)]) + [(1, 4), (6, 9)] + """ + res = [] + res_append = res.append + cstart, cend = None, None + for start, end in sorted(ranges): + if cend is None: + # case 1: no active range, start active range. + cstart, cend = start, end + elif cend >= start - 1: + # case 2: active range continues through this range + # extend active range + cend = max(cend, end) + else: + # case 3: active range ends before this range begins + # write new range out, then start new active range + res_append((cstart, cend)) + cstart, cend = start, end + if cend is not None: + res_append((cstart, cend)) + return res + + +def parse_ranges(rmaps, covers, chunk=10000): + """Extract range information from read maps. + + Parameters + ---------- + rmaps : dict of dict + Read map data structure. + covers : dict of list + Subject coverage data structure. + chunk : int, optional + Auto-compress after adding this many new ranges. + + Notes + ----- + The `covers` data structure is a dictionary, where the key is a tuple of + (sample, subject), and the value is a list. The first element of the list + is an integer indicating how many new ranges have been added since last + merge, and the remaining elements are individual ranges. + + Once the range counter reaches the threshold specified by `chunk`, an + operation will be automatically triggered to merge small ranges into large, + continuous ones, so as to reduce memory space and to improve downstream + operations. + + See Also + -------- + merge_ranges + """ + add_cover = covers.setdefault + for sample, (_, subque) in rmaps.items(): + for subjects in subque: + for subject, ranges in subjects.items(): + cover = add_cover((sample, subject), [0, []]) + count = cover[0] + len(ranges) + if count >= chunk: + cover[0] = 0 + cover[1] = merge_ranges(cover[1] + ranges) + else: + cover[0] = count + cover[1].extend(ranges) + + +def calc_coverage(covers): + """Calculate coverage per sample per subject based on ranges. + + Parameters + ---------- + covers : dict of list + Subject coverage data structure. + + Returns + ------- + dict of dict of (int, int) + Subject coverage data. + + Notes + ----- + This function is executed at last after all ranges have been added. It + performs a final merge. Then it converts the dictionary with keys as + (sample, subject) into a nested dictionary of sample -> subject -> ranges. + + See Also + -------- + merge_ranges + """ + res = {} + add_sample = res.setdefault + for (sample, subject), (_, ranges) in covers.items(): + add_sample(sample, {})[subject] = merge_ranges(ranges) + return res + + +def write_coverage(covers, outdir): + """Write subject coverage to per sample output files. + + Parameters + ---------- + covers : dict of dict + Coverage data structure. + outdir : str + Directory of output files. + """ + makedirs(outdir, exist_ok=True) + for sample, cover in sorted(covers.items()): + with open(join(outdir, f'{sample}.cov'), 'w') as fh: + for subject, ranges in sorted(cover.items()): + for start, end in sorted(ranges): + print(subject, start, end, sep='\t', file=fh) diff --git a/woltka/ordinal.py b/woltka/ordinal.py index e3a0301..438f01a 100644 --- a/woltka/ordinal.py +++ b/woltka/ordinal.py @@ -36,7 +36,6 @@ def ordinal_mapper(fh, coords, fmt=None, n=1000000, th=0.8, prefix=False): prefix : bool Prefix gene IDs with nucleotide IDs. - See Also -------- align.plain_mapper @@ -81,9 +80,10 @@ def flush(): Subject(s) queue. """ # master read-to-gene(s) map - res = defaultdict(dict) + res = defaultdict(set) for nucl, loci in locmap.items(): + # merge and sort coordinates # question is to merge an unsorted list into a sorted one # Python's built-in timsort algorithm is efficient at this @@ -96,9 +96,9 @@ def flush(): # map reads to genes using the core algorithm for read, gene in match_func(queue, lenmap[nucl], th, nucl): + # merge read-gene pairs to the master map - # TODO: support coverage map for ordinal reads - res[rids[read]][gene] = [] + res[rids[read]].add(gene) # return matching read Ids and gene Ids return res.keys(), res.values() diff --git a/woltka/tests/test_align.py b/woltka/tests/test_align.py index 9c7b94c..fbdba92 100644 --- a/woltka/tests/test_align.py +++ b/woltka/tests/test_align.py @@ -16,9 +16,9 @@ from woltka.file import openzip from woltka.align import ( - plain_mapper, infer_align_format, assign_parser, parse_map_line, - parse_b6o_line, parse_sam_line, cigar_to_lens, parse_kraken, - parse_centrifuge) + plain_mapper, range_mapper, infer_align_format, assign_parser, + parse_map_line, parse_b6o_line, parse_sam_line, cigar_to_lens, + parse_kraken, parse_centrifuge) class AlignTests(TestCase): @@ -32,9 +32,7 @@ def tearDown(self): def test_plain_mapper(self): def _res2lst(res): - return tuple((list(y[0]), list([set(x.keys()) - for x in y[1]])) - for y in res) + return tuple(tuple(list(x) for x in y) for y in res) # simple case aln = StringIO('\n'.join(( @@ -58,7 +56,6 @@ def _res2lst(res): (['R3'], [{'G1', 'G3'}]), (['R4'], [{'G4'}]), (['R5'], [{'G5'}])) - self.assertTupleEqual(_res2lst(obs), exp) # chunk of 2 @@ -108,6 +105,29 @@ def _res2lst(res): self.assertEqual(str(ctx.exception), ( 'Cannot determine alignment file format.')) + def test_range_mapper(self): + + def _res2lst(res): + return tuple(tuple(list(x) for x in y) for y in res) + + aln = StringIO('\n'.join(( + 'R1 G1 95 20 0 0 1 20 10 29 1 1', + 'R2 G1 95 20 0 0 1 20 16 35 1 1', + 'R3 G2 95 20 0 0 1 20 39 21 1 1', + 'R4 G2 95 20 0 0 20 1 41 22 1 1', + 'R5 G3 95 20 0 0 20 1 30 49 1 1', + 'R5 G3 95 20 0 0 20 1 50 69 1 1', + 'Rx Gx 95 20 0 0 1 20 0 0 1 1', + '# this is not an alignment'))) + obs = _res2lst(range_mapper(aln))[0] + exp = [('R1', {'G1': [(10, 29)]}), + ('R2', {'G1': [(16, 35)]}), + ('R3', {'G2': [(21, 39)]}), + ('R4', {'G2': [(22, 41)]}), + ('R5', {'G3': [(30, 49), (50, 69)]})] + self.assertListEqual(list(obs[0]), [x[0] for x in exp]) + self.assertListEqual(list(obs[1]), [x[1] for x in exp]) + def test_infer_align_format(self): # simple cases # map diff --git a/woltka/tests/test_coverage.py b/woltka/tests/test_coverage.py new file mode 100644 index 0000000..3f4b0d2 --- /dev/null +++ b/woltka/tests/test_coverage.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 + +# ---------------------------------------------------------------------------- +# Copyright (c) 2020--, Qiyun Zhu. +# +# Distributed under the terms of the Modified BSD License. +# +# The full license is in the file LICENSE, distributed with this software. +# ---------------------------------------------------------------------------- + +from unittest import TestCase, main +from os.path import join, dirname, realpath +from os import makedirs +from shutil import rmtree +from tempfile import mkdtemp + +from woltka.coverage import ( + merge_ranges, parse_ranges, calc_coverage, write_coverage) + + +class CoverageTests(TestCase): + def setUp(self): + self.tmpdir = mkdtemp() + self.datdir = join(dirname(realpath(__file__)), 'data') + + def tearDown(self): + rmtree(self.tmpdir) + + def test_merge_ranges(self): + # ranges that are sorted and overlapped + obs = merge_ranges([(1, 3), (2, 4), (6, 8), (7, 9)]) + exp = [(1, 4), (6, 9)] + self.assertListEqual(obs, exp) + + # unsorted ranges + obs = merge_ranges([(4, 6), (1, 4), (5, 9)]) + exp = [(1, 9)] + self.assertListEqual(obs, exp) + + # ranges that are connected (but not overlapped) + obs = merge_ranges([(1, 2), (3, 4), (5, 6)]) + exp = [(1, 6)] + self.assertListEqual(obs, exp) + + # empty range list + self.assertListEqual(merge_ranges([]), []) + + def test_parse_ranges(self): + rmap = {'S1': (['R1', 'R2', 'R3'], [ + {'G1': [(1, 100)]}, + {'G1': [(1, 50), (251, 300)]}, + {'G1': [(1, 50)], 'G2': [(101, 150)]}]), + 'S2': (['R1', 'R3', 'R4'], [ + {'G2': [(151, 200), (51, 100)]}, + {'G3': [(76, 125)], 'G4': [(26, 75), (101, 150)]}, + {'G2': [(26, 75)], 'G3': [(1, 50)]}]), + 'S3': (['R2', 'R4', 'R5'], [ + {'G1': [(1, 50)], 'G2': [(51, 100)]}, + {'G1': [(51, 100), (151, 200)]}, + {'G1': [(101, 150)], 'G2': [(1, 50)]}])} + + # add ranges but don't merge + obs = {} + parse_ranges(rmap, obs) + exp = {('S1', 'G1'): [4, (1, 100), (1, 50), (251, 300), (1, 50)], + ('S1', 'G2'): [1, (101, 150)], + ('S2', 'G2'): [3, (151, 200), (51, 100), (26, 75)], + ('S2', 'G3'): [2, (76, 125), (1, 50)], + ('S2', 'G4'): [2, (26, 75), (101, 150)], + ('S3', 'G1'): [4, (1, 50), (51, 100), (151, 200), (101, 150)], + ('S3', 'G2'): [2, (51, 100), (1, 50)]} + self.assertDictEqual(obs, exp) + + # merge while adding + obs = {} + parse_ranges(rmap, obs, chunk=1) + exp = {('S1', 'G1'): [0, (1, 100), (251, 300)], + ('S1', 'G2'): [0, (101, 150)], + ('S2', 'G2'): [0, (26, 100), (151, 200)], + ('S2', 'G3'): [0, (1, 50), (76, 125)], + ('S2', 'G4'): [0, (26, 75), (101, 150)], + ('S3', 'G1'): [0, (1, 200)], + ('S3', 'G2'): [0, (1, 100)]} + self.assertDictEqual(obs, exp) + + def test_calc_coverage(self): + cov = {('S1', 'G1'): [4, (1, 100), (1, 50), (251, 300), (1, 50)], + ('S1', 'G2'): [1, (101, 150)], + ('S2', 'G2'): [3, (151, 200), (51, 100), (26, 75)], + ('S2', 'G3'): [2, (76, 125), (1, 50)], + ('S2', 'G4'): [2, (26, 75), (101, 150)], + ('S3', 'G1'): [4, (1, 50), (51, 100), (151, 200), (101, 150)], + ('S3', 'G2'): [2, (51, 100), (1, 50)]} + obs = calc_coverage(cov) + exp = {'S1': {'G1': [(1, 100), (251, 300)], 'G2': [(101, 150)]}, + 'S2': {'G2': [(26, 100), (151, 200)], 'G3': [(1, 50), ( + 76, 125)], 'G4': [(26, 75), (101, 150)]}, + 'S3': {'G1': [(1, 200)], 'G2': [(1, 100)]}} + self.assertDictEqual(obs, exp) + + def test_write_coverage(self): + cov = {'S1': {'G1': [(1, 100), (251, 300)], 'G2': [(101, 150)]}, + 'S2': {'G2': [(26, 100), (151, 200)], 'G3': [(1, 50), ( + 76, 125)], 'G4': [(26, 75), (101, 150)]}, + 'S3': {'G1': [(1, 200)], 'G2': [(1, 100)]}} + outdir = join(self.tmpdir, 'outdir') + makedirs(outdir) + write_coverage(cov, outdir) + with open(join(outdir, 'S1.cov'), 'r') as f: + obs = f.read().splitlines() + exp = ['G1\t1\t100', 'G1\t251\t300', 'G2\t101\t150'] + self.assertListEqual(obs, exp) + with open(join(outdir, 'S2.cov'), 'r') as f: + obs = f.read().splitlines() + exp = ['G2\t26\t100', 'G2\t151\t200', 'G3\t1\t50', + 'G3\t76\t125', 'G4\t26\t75', 'G4\t101\t150'] + self.assertListEqual(obs, exp) + with open(join(outdir, 'S3.cov'), 'r') as f: + obs = f.read().splitlines() + exp = ['G1\t1\t200', 'G2\t1\t100'] + self.assertListEqual(obs, exp) + rmtree(outdir) + + +if __name__ == '__main__': + main() diff --git a/woltka/tests/test_ordinal.py b/woltka/tests/test_ordinal.py index c51dab2..c899b87 100644 --- a/woltka/tests/test_ordinal.py +++ b/woltka/tests/test_ordinal.py @@ -152,32 +152,27 @@ def test_ordinal_mapper(self): ('r6', 'g2'), ('r8', 'g3')] self.assertListEqual(list(obs[0]), [x[0] for x in exp]) - self.assertListEqual([set(o.keys()) for o in list(obs[1])], - [{x[1]} for x in exp]) + self.assertListEqual(list(obs[1]), [{x[1]} for x in exp]) # specify format aln.seek(0) obs = list(ordinal_mapper(aln, coords, fmt='b6o'))[0] self.assertListEqual(list(obs[0]), [x[0] for x in exp]) - self.assertListEqual([set(o.keys()) for o in list(obs[1])], - [{x[1]} for x in exp]) + self.assertListEqual(list(obs[1]), [{x[1]} for x in exp]) # specify chunk size aln.seek(0) obs = list(ordinal_mapper(aln, coords, n=5)) self.assertListEqual(list(obs[0][0]), [x[0] for x in exp[:2]]) - self.assertListEqual([set(o.keys()) for o in list(obs[0][1])], - [{x[1]} for x in exp[:2]]) + self.assertListEqual(list(obs[0][1]), [{x[1]} for x in exp[:2]]) self.assertListEqual(list(obs[1][0]), [x[0] for x in exp[2:]]) - self.assertListEqual([set(o.keys()) for o in list(obs[1][1])], - [{x[1]} for x in exp[2:]]) + self.assertListEqual(list(obs[1][1]), [{x[1]} for x in exp[2:]]) # add prefix aln.seek(0) obs = list(ordinal_mapper(aln, coords, prefix=True))[0] self.assertListEqual(list(obs[0]), [x[0] for x in exp]) - self.assertListEqual([set(o.keys()) for o in list(obs[1])], - [{f'n1_{x[1]}'} for x in exp]) + self.assertListEqual(list(obs[1]), [{f'n1_{x[1]}'} for x in exp]) # specify threshold aln.seek(0) @@ -191,8 +186,7 @@ def test_ordinal_mapper(self): ('r8', 'g3'), ('r9', 'g3')] self.assertListEqual(list(obs[0]), [x[0] for x in exp]) - self.assertListEqual([set(o.keys()) for o in list(obs[1])], - [{x[1]} for x in exp]) + self.assertListEqual(list(obs[1]), [{x[1]} for x in exp]) def test_ordinal_parser(self): diff --git a/woltka/workflow.py b/woltka/workflow.py index ea2e2ab..9a7934a 100644 --- a/woltka/workflow.py +++ b/woltka/workflow.py @@ -28,7 +28,7 @@ from .file import ( openzip, readzip, path2stem, stem2rank, read_ids, id2file_from_dir, id2file_from_map, read_map_uniq, read_map_1st, write_readmap) -from .align import plain_mapper +from .align import plain_mapper, range_mapper from .classify import ( assign_none, assign_free, assign_rank, counter, counter_size, counter_strat, counter_size_strat) @@ -38,7 +38,8 @@ from .ordinal import ( ordinal_mapper, read_gene_coords, whether_prefix, calc_gene_lens) from .table import prep_table, write_table -from .cover import SortedRangeList +from .coverage import ( + parse_ranges, merge_ranges, calc_coverage, write_coverage) def workflow(input_fp: str, @@ -120,7 +121,8 @@ def workflow(input_fp: str, map_rank, zippers) # build mapping module - mapper, chunk = build_mapper(coords_fp, overlap, chunk, zippers) + mapper, chunk = build_mapper( + coords_fp, outcov_dir, overlap, chunk, zippers) # parse length map sizes = parse_sizes(sizes, mapper, zippers) @@ -264,8 +266,8 @@ def classify(mapper: object, 'sizes': sizes, 'unasgd': unasgd, 'rank2dir': rank2dir, 'outzip': outzip if outzip != 'none' else None} - # coverage map - covers = defaultdict(SortedRangeList) if outcov_dir else None + # (optional) subject coverage data + covers = {} if outcov_dir else None # current sample Id csample = False @@ -289,35 +291,16 @@ def classify(mapper: object, files[fp] if files else None: (qryque, subque)} # (optional) calculate subject coverage - if covers is not None: - for sample, rmap in rmaps.items(): - for subjects in rmap[1]: - for subject, ranges in subjects.items(): - for start, end in ranges: - if start and end: - covers[sample, subject].add_range( - start, end) - - # discard read start / end - for sample in rmaps: - # (optional) strip indices and generate tuples - if not trimsub: - rmaps[sample] = ( - rmaps[sample][0], - deque(tuple(sorted(x.keys())) - for x in rmaps[sample][1]) - ) - else: - rmaps[sample] = ( - rmaps[sample][0], - deque( - tuple(sorted([y.rsplit(trimsub, 1)[0] - for y in x.keys()])) - for x in rmaps[sample][1]) - ) + if outcov_dir: + parse_ranges(rmaps, covers) # assign reads at each rank - for sample, rmap in rmaps.items(): + for sample, (qryque, subque) in rmaps.items(): + + # (optional) strip suffixes from subject Ids + subque = deque(map(tuple, map(sorted, strip_suffix( + subque, trimsub) if trimsub else subque))) + # (optional) read strata of current sample into cache if stratmap and sample != csample: kwargs['strata'] = read_strata( @@ -326,7 +309,8 @@ def classify(mapper: object, # call assignment workflow for each rank for rank in ranks: - assign_readmap(*rmap, data, rank, sample, **kwargs) + assign_readmap( + qryque, subque, data, rank, sample, **kwargs) # show progress istep = nqry // 1000000 - nstep @@ -340,18 +324,7 @@ def classify(mapper: object, # write coverage maps if outcov_dir: click.echo('Calculating per sample coverage...', nl=False) - covmap = {} - for (sample, sub), cover in covers.items(): - cover.compress() - covmap.setdefault(sample, {}).setdefault(sub, []).extend( - cover.ranges) - # TODO: needs a mechanism to merge ranges of different chunks - makedirs(outcov_dir, exist_ok=True) - for sample in sorted(covmap): - with open(join(outcov_dir, f'{sample}.cov'), 'w') as f: - for sub, ranges in sorted(covmap[sample].items()): - for start, end in sorted(ranges): - print(sub, start, end, sep='\t', file=f) + write_coverage(calc_coverage(covers), outcov_dir) click.echo(' Done.') click.echo('Classification completed.') @@ -496,16 +469,19 @@ def parse_strata(fp: str = None, return {k: join(fp, v) for k, v in map_.items()} -def build_mapper(coords_fp: str = None, - overlap: int = None, - chunk: int = None, - zippers: dict = None) -> Tuple[callable, int]: +def build_mapper(coords_fp: str = None, + outcov_dir: str = None, + overlap: int = None, + chunk: int = None, + zippers: dict = None) -> Tuple[callable, int]: """Build mapper function (plain or ordinal). Parameters ---------- coords_fp : str, optional Path to gene coordinates file. + outcov_dir : str, optional + Path to output subject coverage directory. overlap : int, optional Read/gene overlapping percentage threshold. chunk : int, optional @@ -543,7 +519,7 @@ def build_mapper(coords_fp: str = None, th=overlap and overlap / 100), chunk else: chunk = chunk or 1000 - return plain_mapper, chunk + return range_mapper if outcov_dir else plain_mapper, chunk def parse_sizes(sizes: str, @@ -799,8 +775,6 @@ def strip_suffix(subque: list, and trim from it to the right end. If not found, the whole subject Id will be retained. """ - # for sub, ranges in subque: - # yield sub.rsplit(sep, 1)[0], ranges return map(set, map(partial( map, lambda x: x.rsplit(sep, 1)[0]), subque))