-
Notifications
You must be signed in to change notification settings - Fork 51
/
build_error_models.py
101 lines (90 loc) · 3.87 KB
/
build_error_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/python
### code to extract specific error calculations from a GemErr error model
### http://www.biomedcentral.com/1471-2164/13/74
import gzip
import cPickle
import argparse
import numpy as np
# parse file paths:
parser = argparse.ArgumentParser(description='get file paths from user')
parser.add_argument('model_path', type=str, help='directory containing GemSIM sequencing models')
parser.add_argument('outfolder', type=str, help='directory for output')
parser.add_argument('--prefix', type=str, help='prefix (custom error models only)')
parser.add_argument('--paired', action='store_true', help='is the error model paired-end? (custom error models only)')
args = parser.parse_args()
model_path = args.model_path
outfolder = args.outfolder
prefix = args.prefix
paired = args.paired
def get_matrices(gzipFile, paired):
f = gzip.open(gzipFile)
if paired:
readLen = cPickle.load(f)
mx1 = cPickle.load(f)
mx2 = cPickle.load(f)
f.close()
return mx1, mx2
else:
readLen = cPickle.load(f)
mx = cPickle.load(f)
f.close()
return mx
def get_model(m):
"""m is a 7-D matrix generated by GemErr.py"""
for dim in [2,3,4,5]:
m = np.add.reduce(m, 2)
# we will not address preceding / following bases
inds={'A':0, 'T':1, 'G':2, 'C':3, 'N':4}
ret = np.zeros((101,5,5))
for pos in xrange(len(ret)):
for refBase in ['A', 'T', 'G', 'C', 'N']:
for readBase in ['A', 'T', 'G', 'C', 'N']:
refi = inds[refBase]
readi = inds[readBase]
if m[pos][refi][5] == 0:
# there were no reference bases with this nucleotide at this position
# so we will just sequence this base as is and not worry about errors.
ret[pos][refi][readi] = 1 if refi==readi else 0
elif refi == readi:
num_errors = sum(m[pos][refi][j] for j in range(5) if j != refi)
num_correct = m[pos][refi][5] - num_errors
correct_prob = 1 - (float(num_errors) / num_correct)
ret[pos][refi][readi] = correct_prob
else:
# nucleotide-specific error probability
transition_prob = float(m[pos][refi][readi]) / m[pos][refi][5]
ret[pos][refi][readi] = transition_prob
return ret
def save_model(array, outf):
"""write text file to disk containing this error model specification
to be read into R later"""
inds={'A':0, 'T':1, 'G':2, 'C':3, 'N':4}
with open(outf, 'w') as f:
f.write('errmodel\treadA\treadT\treadG\treadC\treadN\tpos\n')
for pos in xrange(len(array)):
for refBase in ['A', 'T', 'G', 'C', 'N']:
f.write('ref'+refBase+'\t')
for readBase in ['A', 'T', 'G', 'C', 'N']:
f.write(str(array[pos][inds[refBase]][inds[readBase]])+'\t')
if readBase == 'N':
f.write(str(int(pos))+'\n')
def release_models(modelpath, modelname, outfolder, paired):
if paired:
mx1, mx2 = get_matrices(modelpath+'/'+modelname+'_p.gzip', True)
mate1 = get_model(mx1)
mate2 = get_model(mx2)
save_model(mate1, outfolder+'/'+modelname+'_mate1')
save_model(mate2, outfolder+'/'+modelname+'_mate2')
else:
mx = get_matrices(modelpath+'/'+modelname+'_s.gzip', False)
model = get_model(mx)
save_model(model, outfolder+'/'+modelname+'_single')
### create error model files for release
if prefix is not None:
# custom error model:
release_models(model_path, prefix, outfolder, paired)
else:
for model in ['ill100v4', 'ill100v5', 'r454ti']:
release_models(model_path, model, outfolder, False)
if model != 'r454ti':
release_models(model_path, model, outfolder, True)