-
Notifications
You must be signed in to change notification settings - Fork 6
/
seq_table.py
executable file
·136 lines (106 loc) · 4.62 KB
/
seq_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python
'''
Create a sequence file using the original and dereplicated fastas.
Load all the dereplicated sequences and sequence IDs into memory. Then read through the
original fasta. If a sequence is in the dereplicated fasta, then keep track of the
abundance of the sequence in that sample.
'''
import sys, argparse, re
from Bio import SeqIO
import util, util_index
class SeqTableWriter:
def __init__(self, fasta, derep, output, samples=None, min_counts=0, assert_same_seqs=False, run=True):
self.fasta = fasta
self.derep = derep
self.output = output
self.samples = samples
self.min_counts = min_counts
self.assert_same_seqs = assert_same_seqs
if run:
self.run()
def run(self):
self.derep_names = self.fasta_to_dict(self.derep)
self.table, self.abund = self.fasta_to_abund(self.fasta, self.derep_names, self.assert_same_seqs)
if self.samples is None:
self.samples = self.table_to_samples(self.table)
self.write_table(self.table, self.abund, self.samples, self.min_counts, self.output)
@staticmethod
def fasta_to_dict(fasta):
'''
Create a hash {sequence => ID}
fasta : fasta fh or fn
lines in the fasta
returns : dict
{sequence => ID}
'''
names = {str(record.seq): util_index.parse_seq_sid(record.id) for record in SeqIO.parse(fasta, 'fasta')}
return names
@staticmethod
def fasta_to_abund(fasta, names, assert_same=False):
'''
Count the abundance of each sequence in each sample.
fasta : fasta fh or fn
lines in the big fasta file
names : dict
{seq => name}
assert_same : bool
if true, make sure each seq in the fasta is in names
returns : dict of dicts
{name => {samples => counts}, ...}
'''
table = {}
abund = {}
for record in SeqIO.parse(fasta, 'fasta'):
sample = util_index.sid_to_sample(record.id)
seq = str(record.seq)
if seq in names:
name = names[seq]
else:
if assert_same:
raise RuntimeError("sequence %s found in fasta but not dereplicated fasta" %(seq))
if name in abund:
abund[name] += 1
else:
abund[name] = 1
if name in table:
if sample in table[name]:
table[name][sample] += 1
else:
table[name][sample] = 1
else:
table[name] = {sample: 1}
return table, abund
@staticmethod
def table_to_samples(table):
'''get sorted list of sample names'''
samples = []
for seq in table:
for sample in table[seq]:
if sample not in samples:
samples.append(sample)
samples = sorted(samples)
return samples
@staticmethod
def write_table(table, abund, samples, min_counts, output):
'''fasta lines to seq table lines'''
# get the seq ids in abundance order
all_seq_ids = sorted(table.keys(), key=lambda i: abund[i], reverse=True)
# throw out sequences below minimum
seq_ids = [i for i in all_seq_ids if abund[i] >= min_counts]
# write the header
output.write("sequence_id\t" + "\t".join(samples) + "\n")
for i in seq_ids:
counts = [table[i].get(sample, 0) for sample in samples]
output.write(i + "\t" + "\t".join([str(x) for x in counts]) + "\n")
if __name__ == '__main__':
# parse command line arguments
parser = argparse.ArgumentParser(description='Make a sequence table (i.e., 100% identity OTUs)', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('fasta', help='full fasta file')
parser.add_argument('derep', help='dereplicated fasta file')
parser.add_argument('-s', '--samples', default=None, help='samples list (samples in first field; default: sorted names from fasta)')
parser.add_argument('-m', '--min_counts', type=int, default=0, help='minimum times a sequence is included, otherwise it gets thrown out')
parser.add_argument('-a', '--assert_same_seqs', action='store_true', help='assert that every seq in fasta is in dereplicated fasta?')
parser.add_argument('-o', '--output', default=sys.stdout, type=argparse.FileType('w'), help='output file')
args = parser.parse_args()
opts = vars(args)
SeqTableWriter(**opts)