forked from swiftbiosciences/16S-SNAPP-py3
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lineage_parser.py
executable file
·61 lines (55 loc) · 2.09 KB
/
lineage_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python
## Swift Biosciences 16S snapp workflow
## Author Benli Chai & Sukhinder Sandhu 20200502
## to parse the classifier results into a hash of counts keyed by lineage name
## Ported to Python 3 on 20210106
import os
def get_lineages(tax_filename, CONF):
Hash = {}
lines = open(tax_filename.strip(), 'r').readlines()
for row in lines:
cols = row.strip().split('\t')
ID = cols[0]
ID = ID.split(';')[0]
levels = cols[2:]
i = 0
lineage = []
while i < len(levels):
level = levels[i:i+3]
name, rank, conf = level
name = rank[0] + '__' + name.replace('"', '')
if float(conf) < CONF:
break
else:
lineage.append(name.replace('"', ''))
i += 3
lineage = ";".join(lineage).strip()
if lineage == '':#rare cases the sequence can't be classified to domain level
lineage = 'Unclassified'
Hash[ID] = lineage
return Hash
def get_best_tax(ref_tax, read_taxs):#choose the better between reftax and readtax
taxs = {}
for tax in read_taxs:
level = tax.count(';')
if not level in taxs:
taxs[level] = []
taxs[level].append(tax)
resolution = taxs.keys()
#resolution.sort() #changed to next line for Python 3
resolution = sorted(resolution)
resolution.reverse()
top_lineage= taxs[resolution[0]][0]
if ref_tax.count(';') >= resolution[0]:
top_lineage = ref_tax
return top_lineage #the most resolved lineage
def add_lineages(wd, sample_id, tax_dict, refset):#refset pass through to add lineage information
ref_tax_file = os.path.join(wd, sample_id + '.cls')
ref_tax_hash = get_lineages(ref_tax_file, 0.7)
for ref_id in refset.keys():
read_ids = list(refset[ref_id].getReadIDs())
ref_lineage = ref_tax_hash[ref_id]
read_lineage_list = list(set([tax_dict[ID][0] for ID in read_ids]))
best_lineage = get_best_tax(ref_lineage, read_lineage_list)
refset[ref_id].addAssign(best_lineage)
return refset