-
Notifications
You must be signed in to change notification settings - Fork 1
/
homopolymers.py
executable file
·124 lines (111 loc) · 3.53 KB
/
homopolymers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python
# returns positions of homopolymers for each contig
import sys
import os
import argparse
import threading
import time
from collections import namedtuple
def progress(fp, fs, fin, err):
progress = ["-", "\\", "|", "/"]
prog = 0
if (fs <= 0):
return
while not fin.isSet() and not err.isSet():
if sys.stderr.isatty():
sys.stderr.write("\r{:3.0f}% {:s}\b"
.format(fp.tell()/float(fs)*100,
progress[prog]))
sys.stderr.flush()
prog = (prog + 1)%4
time.sleep(0.1)
else:
sys.stderr.write("{:3.0f}% "
.format(fp.tell()/float(fs)*100))
time.sleep(5)
else:
if sys.stderr.isatty() and fin.isSet():
sys.stderr.write("\r100% *")
sys.stderr.write("\n")
return
Homopolymer = namedtuple("Homopolymer", "seqname base length beg end")
# ----- command line parsing -----
parser = argparse.ArgumentParser(
description="Finds positions of homopolyomers.")
parser.add_argument("file", type=str,
help="Fasta file.")
parser.add_argument("min_length", type=int,
help="Minimum length of homopolymer.")
parser.add_argument("-s", "--case_sensitive",
dest="case", action="store_true",
help="Case sensitive bases.")
parser.add_argument("-b", "--bases", type=str,
help="Comma separated list of bases to consider.")
parser.set_defaults(case=False)
args = parser.parse_args()
# ----- end command line parsing -----
fasta_size = os.path.getsize(args.file)
fasta = open(args.file)
sys.stderr.write("Reading {:s}...\n".format(args.file))
fin = threading.Event()
err = threading.Event()
pthread = threading.Thread(name = "progress",
target = progress,
args = (fasta, fasta_size, fin, err))
homopolymers = []
name = ""
pos = 0
run = 0
last_base = ""
try:
pthread.start()
for line in fasta:
if line[0] == '>':
if run >= args.min_length:
homopolymers.append(
Homopolymer(name, last_base,
run, pos-run+1, pos))
name = line[1:-1]
pos = 0
run = 0
last_base = ""
continue
for base in line[:-1]:
pos += 1
if not args.case: base = base.upper()
if last_base == "":
last_base = base
run = 1
elif base == last_base:
run += 1
else:
if run >= args.min_length:
homopolymers.append(
Homopolymer(name, last_base,
run, pos-run, pos-1))
run = 1
last_base = base
if run >= args.min_length:
homopolymers.append(
Homopolymer(name, last_base,
run, pos-run+1, pos))
fin.set()
pthread.join()
except Exception as e:
sys.stderr.write("\nError: {:s}".format(e))
err.set()
except:
err.set()
pthread.join()
fasta.close()
if args.bases is not None:
if args.case:
bases = set(args.bases.split(","))
else:
bases = set(args.bases.upper().split(","))
else:
bases = None
for homo in homopolymers:
if bases is None or homo.base in bases:
sys.stdout.write("{:s}\t{:s}\t{:d}\t{:d}\t{:d}\n"
.format(*homo))