-
Notifications
You must be signed in to change notification settings - Fork 4
/
filter_alts.py
137 lines (115 loc) · 4.06 KB
/
filter_alts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import hgsc_vcf
import os, os.path, sys
import json, re, logging
import threading
from Queue import Queue
logger = logging.getLogger('wheeljack.filter_alts')
logger.addHandler(logging.NullHandler())
logger.setLevel(logging.DEBUG)
def safe_div(a, b):
try:
a = float(a)
b = float(b)
return a / b
except:
return 0.0
def safe_float(a):
try:
return float(a)
except:
return 0.0
##
# Do the lambdas select this allele in this sample
#
# The allele in this sample must pass all of the selelction lambdas
def _sample_select_filter(sample, lambdas, i, ref_i):
for f in lambdas:
try:
if not eval(f):
return False
except:
logger.error("Error in processing sample: %s", sample)
logger.error("Info: i: %s, ref_i: %s", i, ref_i)
raise
return True
def _samples_filter(samples, lambdas, i, ref_i):
for s in samples.values():
if _sample_select_filter(s, lambdas, i, ref_i):
return True
return False
def selection_function(record, lambdas):
gt = hgsc_vcf.split_gt(record)
ref_i = hgsc_vcf.ref_index(record, gt)
samples = record['SAMPLES']
for i, g in enumerate(gt):
if i == ref_i:
continue
if _samples_filter(samples, lambdas, i, ref_i):
yield i
def build_hgsc_vcf_select_function(lambdas):
def _custom_select_function(record):
for i in selection_function(record, lambdas):
yield i
return _custom_select_function
def process_vcf(reader, writer, config, simplify = True):
_process_select_function = build_hgsc_vcf_select_function(config['samplefilter'])
writer_q = Queue()
processor_q = Queue()
def _writer_function():
i = 0
while True:
out_record = writer_q.get()
try:
writer.write_record(out_record)
i += 1
if not i % 10000:
logger.info("Processed %s lines", i)
except Exception as inst:
logger.exception("Exception in record writing: %s", str(inst))
finally:
writer_q.task_done()
writer_thread = threading.Thread(target=_writer_function)
writer_thread.daemon = True
writer_thread.start()
def _processor_function():
while True:
in_record = processor_q.get()
try:
for b_record in hgsc_vcf.select_allele(in_record, _process_select_function, simplify):
writer_q.put(b_record)
except Exception as inst:
logger.exception("Exception in record processing: %s", str(inst))
finally:
processor_q.task_done()
for i in xrange(1):
processor_thread = threading.Thread(target=_processor_function)
processor_thread.daemon = True
processor_thread.start()
for record in reader:
if len(record['ALT']) > 0 and record['ALT'][0] != '.':
processor_q.put(record)
processor_q.join()
writer_q.join()
def main(args):
reader = hgsc_vcf.Reader(args.INFILE)
header = reader.header
writer = hgsc_vcf.Writer(args.OUTFILE, header)
writer.header.add_header('##COMMAND=<ID=filter-alts,ARGS="%s">' % re.escape(' '.join(sys.argv)))
writer.write_header()
config = json.load(args.CONFIG)
process_vcf(reader, writer, config)
logger.info("Done")
if __name__ == '__main__':
mainlogger = logging.getLogger()
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
mainlogger.addHandler(ch)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('CONFIG', type = argparse.FileType('r'), help = 'config file')
parser.add_argument('INFILE', type = argparse.FileType('r'), help = 'input VCF file')
parser.add_argument('OUTFILE', type = argparse.FileType('w'), help = 'output VCF file')
args = parser.parse_args()
main(args)