-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmerge_sample.py
75 lines (71 loc) · 3.2 KB
/
merge_sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from multiprocessing import Pool
from os.path import basename
import subprocess
import os
import argparse
import re
def merge_bam(instance_id, library_ids, experiments, bam_paths, reference, picard_jar):
instance_id_filename = "%s.%s.bam" % (instance_id, reference)
# make a directory for this instance ID
os.makedirs(instance_id)
with open(instance_id + '/stdout_merge', 'w') as stdout_merge, \
open(instance_id + '/stderr_merge', 'w') as stderr_merge:
# write instance ID into read groups
bams_with_altered_read_groups = []
for library_id, experiment, bam in zip(library_ids, experiments, bam_paths):
read_group_id = library_id + '.' + experiment
bam_with_altered_read_groups = instance_id + '/' + read_group_id + '.' + basename(bam)
subprocess.run(["java", "-Xmx2700m", "-jar", picard_jar, "AddOrReplaceReadGroups",
"I=%s" % (bam,),
"O=%s" % (bam_with_altered_read_groups,),
"RGID=%s" % (read_group_id,),
"RGLB=%s" % (library_id,),
"RGPL=illumina",
"RGPU=%s" % (library_id,),
"RGSM=%s" % instance_id],
check=True, stdout=stdout_merge, stderr=stderr_merge)
bams_with_altered_read_groups.append(bam_with_altered_read_groups)
# merge
merge_file_list = 'I=' + ' I='.join(bams_with_altered_read_groups)
# output should be captured per instance in subdirectory
instance_stdout_merge = instance_id + '/stdout_merge'
instance_stderr_merge = instance_id + '/stderr_merge'
command = "java -Xmx2500m -jar %s MergeSamFiles %s O=%s SORT_ORDER=coordinate >> %s 2>> %s" % (picard_jar, merge_file_list, instance_id_filename, instance_stdout_merge, instance_stderr_merge)
#print('combine bam lists ' + command)
subprocess.check_output(command, shell=True)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Merge component library bams for a sample. Rewrite read groups on a per-library basis.")
parser.add_argument('-n', "--num_threads", help="size of thread pool", type=int, default =1)
parser.add_argument('-r', "--reference", help="", default='hg19')
parser.add_argument('-a', "--append", help="append to instance ids", default='')
parser.add_argument("picard_jar", help="jar file for the Broad Institute Picard toolset, used to merge bams and deduplicate")
parser.add_argument("bam_list", help="component libraries with experiment and paths")
args = parser.parse_args()
pool = Pool(processes=args.num_threads)
results = []
with open(args.bam_list) as f:
for instance_line in f:
instance_id = instance_line.strip()
augmented_instance_id = instance_id + args.append
if instance_id[0] != 'I':
raise ValueError('{} is invalid individual'.format(instance_id))
blank = False
library_ids = []
experiments = []
bam_paths = []
while not blank:
line = f.readline()
if len(line.strip()) == 0:
blank = True
fields = re.split('\t|\n', line)
if len(fields) >= 3:
bam_path = fields[2]
if len(bam_path) > 4:
library_ids.append(fields[0])
experiments.append(fields[1])
bam_paths.append(fields[2])
results.append(pool.apply_async(merge_bam, args=(augmented_instance_id, library_ids, experiments, bam_paths, args.reference, args.picard_jar)))
pool.close()
pool.join()
for result in results:
result.get()