-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrf_telomere_finder.py
executable file
·178 lines (162 loc) · 6.53 KB
/
trf_telomere_finder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/env python3
'''
Parse trf output and find potential telomeric repeats.
Require trf and samtools.
'''
import sys
import os
import re
import argparse
import subprocess
def rc_seq(seq):
# handle upper case only
complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A'}
return "".join(complement.get(base, base) for base in reversed(seq))
def SlidingWindowREObj(seq):
assert isinstance(seq, str)
seq = seq.upper()
l = len(seq)
seq = seq * 2
sw = []
for i in range(l):
sw.append(seq[i:i+l] + '$')
sw.append(rc_seq(seq[i:i+l]) + '$')
return re.compile('(?:{})'.format('|'.join(sw)))
def parse_faidx(handle):
ctg_len = {}
for line in handle:
line = line.strip().split()
ctg_len[line[0]] = int(line[1])
return ctg_len
'''
cols 1+2: Indices of the repeat relative to the start of the sequence
col 3: Period size of the repeat
col 4: Number of copies aligned with the consensus pattern
col 5: Size of consensus pattern (may differ slightly from the period size)
col 6: Percent of matches between adjacent copies overall
col 7: Percent of indels between adjacent copies overall
col 8: Alignment score
cols 9-12: Percent composition for each of the four nucleotides
col 13: Entropy measure based on percent composition
col 14: Consensus sequence
col 15: Repeat sequence
'''
def parse_trf_output(handle, ctg_len,
min_copy_number=5, max_copy_number=10000,
min_repeat_len=4, max_repeat_len=10,
search_window=5000,
repeat_seq=None, silent=False):
FoundInSTART = set()
FoundInEND = set()
FoundInMID = set()
if repeat_seq:
repeat_seq = SlidingWindowREObj(repeat_seq)
for line in handle:
if line.startswith('@'):
ctg = line[1:].strip().split()[0]
else:
line = line.strip()
# skip blank line
if len(line) == 0:
continue
line = line.split()
# filter copy number and repeat consensus size
if float(line[3]) < min_copy_number or \
float(line[3]) > max_copy_number or \
int(line[2]) < min_repeat_len or \
int(line[2]) > max_repeat_len:
continue
if repeat_seq and not repeat_seq.match(line[13]):
continue
line[0] = int(line[0])
line[1] = int(line[1])
# cols: ctg ctg_len label pos_start pos_end length repeat_size repeat_copies repeat_seq
# START
if line[1] < search_window:
FoundInSTART.add(ctg)
if not silent:
print('{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}'.format(
ctg, ctg_len[ctg], 'START',
line[0], line[1], line[1]-line[0]+1, line[2], line[3], line[13]))
# END
elif ctg_len[ctg] - line[0] < search_window:
FoundInEND.add(ctg)
if not silent:
print('{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}'.format(
ctg, ctg_len[ctg], 'END',
line[0], line[1], line[1]-line[0]+1, line[2], line[3], line[13]))
# MID
elif repeat_seq:
FoundInMID.add(ctg)
if not silent:
print('{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}'.format(
ctg, ctg_len[ctg], 'MID',
line[0], line[1], line[1]-line[0]+1, line[2], line[3], line[13]))
return FoundInSTART, FoundInEND, FoundInMID
def parse_args():
parser = argparse.ArgumentParser(description='Find telomeric sequences.')
parser.add_argument('fasta',
help='fasta')
parser.add_argument('-r', type=str, metavar='repeat', default=None,
help='user provided sequence to match telomeric sequences')
parser.add_argument('--window', type=int, default=5000, metavar='',
help='size of terminal window (%(default)s)')
parser.add_argument('--min_copy_number', type=int, default=5, metavar='',
help='minimal copy numbers (%(default)s)')
parser.add_argument('--min_repeat_len', type=int, default=5, metavar='',
help='minimal repeat length (%(default)s)')
parser.add_argument('--max_repeat_len', type=int, default=10, metavar='',
help='maximal repeat length (%(default)s)')
parser.add_argument('--redo', action='store_true',
help='force redo')
return parser.parse_args()
def main():
args = parse_args()
in_fa = args.fasta
repeat_seq = args.r
search_window = args.window
min_copy_number = args.min_copy_number
min_repeat_len = args.min_repeat_len
max_repeat_len = args.max_repeat_len
trf_opt = [2, 7, 7, 80, 10, 50, 500] # default
if repeat_seq:
match_mode = True
else:
match_mode = False
# run samtools faidx
fa_idx = in_fa + '.fai'
if args.redo or not os.path.isfile(fa_idx):
print('Building index...', file=sys.stderr)
cmd = ['samtools', 'faidx', in_fa]
subprocess.run(cmd, check=True)
# run trf
trf_opt = [str(x) for x in trf_opt]
trf_out = '{}.{}.dat'.format(in_fa, '.'.join(trf_opt))
if args.redo or not os.path.isfile(trf_out):
print('Running trf...', file=sys.stderr)
cmd = ['trf', in_fa] + trf_opt + ['-h', '-ngs']
with open(trf_out, 'w') as f:
subprocess.run(cmd, stdout=f, stderr=subprocess.DEVNULL, check=True)
# parse fasta index
with open(fa_idx) as f:
ctg_len = parse_faidx(f)
# parse trf output
with open(trf_out) as f:
FoundInSTART, FoundInEND, FoundInMID = parse_trf_output(
f, ctg_len,
min_copy_number=min_copy_number,
min_repeat_len=min_repeat_len, max_repeat_len=max_repeat_len,
search_window=search_window,
repeat_seq=repeat_seq)
if match_mode:
FoundInBoth = FoundInSTART.intersection(FoundInEND)
FoundInSingle = FoundInSTART.union(FoundInEND).difference(FoundInBoth)
print()
print('Found in both ends : {}\n'
'Found in single end: {}\n'
'Found in interval : {}'.format(len(FoundInBoth), len(FoundInSingle), len(FoundInMID)))
print('B: ' + ','.join(sorted(FoundInBoth)))
print('S: ' + ','.join(sorted(FoundInSingle)))
print('I: ' + ','.join(sorted(FoundInMID)))
if __name__ == '__main__':
main()