-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvar2vcf.py
executable file
·156 lines (129 loc) · 4.82 KB
/
var2vcf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/data2/external_data/Abyzov_Alexej_m124423/apps/pyenv/versions/3.5.1/bin/python
import argparse
import os
import subprocess
import sys
from tempfile import NamedTemporaryFile
config = os.path.dirname(os.path.realpath(__file__)) + "/job.config"
with open(config) as f:
for line in f:
if line[:5] == "JAVA=":
exec(line)
elif line[:11] == "PICARD_JAR=":
exec(line)
def generic_parse(line):
values = line.strip().split()
chrom, pos, ref, alt = values[:4]
return "{}\t{}\t.\t{}\t{}\t.\t.\t.".format(chrom, pos, ref, alt)
def varscan_parse(line, header):
values = line.strip().split()
chrom, pos, ref, alt = values[:4]
info = ';'.join(["%s=%s" % (key, val) for key, val in zip(header[4:],values[4:])])
if(alt.startswith('+')):
alt = alt.replace('+', ref)
elif(alt.startswith('-')):
alt = alt.replace('-', ref)
ref, alt = alt, ref
return "{}\t{}\t.\t{}\t{}\t.\t.\t{}".format(chrom, pos, ref, alt, info)
def vcf_header_generic():
vcf_header = "##fileformat=VCFv4.2" + "\n" + \
"#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO"
return vcf_header
def vcf_header_varscan(header):
info_header = "\n".join(["##INFO=<ID=%s,Number=1,Type=String,Description=\"VarScan output\">" % x for x in header[4:]])
vcf_header = "##fileformat=VCFv4.2" + "\n" + \
info_header + "\n" + \
"#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO"
return vcf_header
def main():
##
## Build and parse options
##
parser = argparse.ArgumentParser(description='VCF converter for VarScan output file')
parser.add_argument('infile' , nargs='?',
type=argparse.FileType('rt'), default=sys.stdin,
help='''
Input variant file.
It should include CHROM, POS, REF, and ALT as the first 4 columns for generic format.
If this is omitted, default is STDIN.
''')
parser.add_argument('outfile', nargs='?',
type=argparse.FileType('wt'), default=sys.stdout,
help='''
Output vcf file.
If this is ommited, default is STDOUT.
''')
parser.add_argument('-d', '--seq-dict', required=True,
dest='seq_dict', action='store',
help='Sequence dictionary needed to sort vcf')
parser.add_argument('-f', '--format', required=True,
dest='format', choices=['generic', 'varscan'],
help='''
Input file format.
No header is assumed for generic format.
''')
class CondAction(argparse._StoreAction):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
x = kwargs.pop('to_be_required', [])
super(CondAction, self).__init__(option_strings, dest, **kwargs)
self.make_required = x
def __call__(self, parser, namespace, values, option_string=None):
for x in self.make_required:
x.required = True
try:
return super(CondAction, self).__call__(
parser, namespace, values, option_string)
except NotImplementedError:
pass
x = parser.add_argument('-s', '--sample-name',
dest='sample_name',
help='It\'s needed when you assign default genotype.')
parser.add_argument('-g', '--genotype',
dest='gt', action=CondAction, to_be_required=[x],
help='Assign default genotype for all snps.')
try:
args = parser.parse_args()
except IOError as msg:
parser.error(str(msg))
##
## Process options
##
if(args.format == 'generic'):
line_parse = generic_parse
vcf_header = vcf_header_generic()
elif(args.format == 'varscan'):
header = args.infile.readline().strip().split()
line_parse = lambda x: varscan_parse(x, header)
vcf_header = vcf_header_varscan(header)
if(args.gt == None):
sample_gt = ""
else:
sample_gt = "\tGT\t" + args.gt
vcf_header += "\tFORMAT\t" + args.sample_name
##
## Make Vcf
##
with NamedTemporaryFile('wt', delete=False) as f_tmp:
print(vcf_header, file=f_tmp)
for line in args.infile:
print(line_parse(line) + sample_gt, file=f_tmp)
args.infile.close()
##
## Sort Vcf
##
try:
subprocess.check_call([
JAVA, "-Xmx4g", "-jar" , PICARD_JAR,
"SortVcf", "CREATE_INDEX=false",
"I=" + f_tmp.name,
"O=" + f_tmp.name + ".vcf",
"SD=" + args.seq_dict])
finally:
os.remove(f_tmp.name)
with open(f_tmp.name + ".vcf", 'rt') as f:
for line in f:
print(line.strip(), file=args.outfile)
os.remove(f_tmp.name + ".vcf")
args.outfile.close()
if __name__ == '__main__':
main()