From 7cc21a9ee10cd198997fba265a5f6ad9ad00e9e3 Mon Sep 17 00:00:00 2001 From: Jacob Pfeil Date: Mon, 18 Jul 2016 19:08:23 -0700 Subject: [PATCH] Local changes --- src/toil_scripts/gatk_germline/germline.py | 36 +++++++++-------- src/toil_scripts/tools/preprocessing.py | 45 +++++++++------------- 2 files changed, 38 insertions(+), 43 deletions(-) diff --git a/src/toil_scripts/gatk_germline/germline.py b/src/toil_scripts/gatk_germline/germline.py index 7f3ce745..b1890d15 100644 --- a/src/toil_scripts/gatk_germline/germline.py +++ b/src/toil_scripts/gatk_germline/germline.py @@ -36,26 +36,33 @@ def gatk_germline_pipeline(job, uuid, url, config, bai_url=None): if config ['xmx'] is None: config['xmx'] = '{}G'.format(os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / (1024 ** 3)) - download_bam = job.wrapJobFn(download_url_job, url, name='toil.bam', s3_key_path=config['ssec']) + if config['run_bwa']: + pass + else: + get_bam = job.wrapJobFn(download_url_job, url, name='toil.bam', s3_key_path=config['ssec']) if bai_url: - bam_index = job.wrapJobFn(download_url_job, bai_url, name='toil.bam.bai', s3_key_path=config['ssec']) + get_bai = job.wrapJobFn(download_url_job, bai_url, name='toil.bam.bai', s3_key_path=config['ssec']) else: - bam_index = job.wrapJobFn(samtools_index, download_bam.rv(), config) + get_bai = job.wrapJobFn(samtools_index, get_bam.rv(), config) + job.addChild(get_bam) + get_bam.addChild(get_bai) + if config['preprocess']: - preprocess = job.wrapJobFn(run_gatk_preprocessing, cores, download_bam.rv(), bam_index.rv(), + job.fileStore.logToMaster('Preprocessing sample: {}'.format(uuid)) + preprocess = job.wrapJobFn(run_gatk_preprocessing, cores, get_bam.rv(), get_bai.rv(), config['genome.fa'], config['genome.dict'], config['genome.fa.fai'], config['phase.vcf'], config['mills.vcf'], config['dbsnp.vcf'], config['output_dir'], - mem=config['xmx'], - mock=config['mock_mode']).encapsulate() + mem=config['xmx'], mock=config['mock_mode']).encapsulate() haplotype_caller = job.wrapJobFn(gatk_haplotype_caller, preprocess.rv(0), preprocess.rv(1), config) - + get_bai.addChild(preprocess) else: - haplotype_caller = job.wrapJobFn(gatk_haplotype_caller, download_bam.rv(), bam_index.rv(), config) + haplotype_caller = job.wrapJobFn(gatk_haplotype_caller, get_bam.rv(), get_bai.rv(), config) genotype_gvcf = job.wrapJobFn(gatk_genotype_gvcf, haplotype_caller.rv(), config) + snp_recal = job.wrapJobFn(gatk_variant_recalibrator_snp, genotype_gvcf.rv(), config) apply_snp_recal = job.wrapJobFn(gatk_apply_variant_recalibration_snp, genotype_gvcf.rv(), snp_recal.rv(0), snp_recal.rv(1), config) @@ -69,14 +76,8 @@ def gatk_germline_pipeline(job, uuid, url, config, bai_url=None): output_raw = job.wrapJobFn(upload_or_move_job, raw_name, genotype_gvcf.rv(), config['output_dir']) output_vqsr = job.wrapJobFn(upload_or_move_job, vqsr_name, apply_indel_recal.rv(), config['output_dir']) - # Create DAG - job.addChild(download_bam) - download_bam.addChild(bam_index) - if config['preprocess']: - job.fileStore.logToMaster('Preprocessing sample: {}'.format(uuid)) - bam_index.addChild(preprocess) # Wait for bam index or preprocessing - bam_index.addFollowOn(haplotype_caller) + get_bai.addFollowOn(haplotype_caller) haplotype_caller.addChild(genotype_gvcf) @@ -472,6 +473,8 @@ def generate_config(): hapmap: # Required: URL (1000G_omni.5.b37.vcf) omni: + # Run BWA on fastqs + run_bwa: # GATK Preprocessing preprocess: # Approximate input file size. Should be given as %d[TGMK], e.g., @@ -605,7 +608,7 @@ def main(): config = {x.replace('-', '_'): y for x, y in yaml.load(open(options.config).read()).iteritems()} config_fields = set(config) required_fields = {'genome', 'mills', 'dbsnp', 'phase', 'hapmap', 'omni', 'output_dir', - 'preprocess', 'mock_mode', 'unsafe_mode', 'xmx', 'suffix'} + 'run_bwa', 'preprocess', 'mock_mode', 'unsafe_mode', 'xmx', 'suffix'} require(config_fields > required_fields, 'Missing config parameters:\n{}'.format(', '.join(required_fields - config_fields))) @@ -613,7 +616,6 @@ def main(): config['output_dir'] = options.output_dir if options.output_dir \ else os.path.join(os.getcwd(), 'output') - if config['suffix'] is None: config['suffix'] = options.suffix if options.suffix else '' diff --git a/src/toil_scripts/tools/preprocessing.py b/src/toil_scripts/tools/preprocessing.py index bb33139f..69be95b4 100644 --- a/src/toil_scripts/tools/preprocessing.py +++ b/src/toil_scripts/tools/preprocessing.py @@ -168,9 +168,9 @@ def picard_sort_sam(job, bam_id, xmx='8G', mock=False): return bam_id, bai_id -def picard_mark_duplicates(job, bam_id, bai_id, xmx='8G', mock=False): +def picard_mark_duplicates(job, bam_id, bai_id, xmx='10G', mock=False): """ - Uses picardtools MarkDuplicates + Runs picardtools MarkDuplicates. Assumes the bam file is coordinate sorted. :param job: :param bam_id: @@ -188,8 +188,6 @@ def picard_mark_duplicates(job, bam_id, bai_id, xmx='8G', mock=False): job.fileStore.readGlobalFile(bam_id, os.path.join(work_dir, 'sample.sorted.bam')) job.fileStore.readGlobalFile(bai_id, os.path.join(work_dir, 'sample.sorted.bai')) - cores = multiprocessing.cpu_count() / 2 - # Call: picardtools command = ['MarkDuplicates', 'INPUT=sample.sorted.bam', @@ -198,7 +196,7 @@ def picard_mark_duplicates(job, bam_id, bai_id, xmx='8G', mock=False): 'ASSUME_SORTED=true', 'CREATE_INDEX=true'] docker_call(work_dir=work_dir, parameters=command, - env={'_JAVA_OPTIONS':'-Djava.io.tmpdir=/data/ -Xmx10G'}, + env={'_JAVA_OPTIONS':'-Djava.io.tmpdir=/data/ -Xmx{}'.format(xmx)}, tool='quay.io/ucsc_cgl/picardtools:1.95--dd5ac549b95eb3e5d166a5e310417ef13651994e', outputs=outputs, mock=mock) @@ -238,8 +236,7 @@ def run_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, db def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, dbsnp, - output_dir, - mem='10G', unsafe=False, mock=False): + output_dir, mem='10G', unsafe=False, mock=False): """ Pre-processing steps for running the GATK Germline pipeline @@ -257,32 +254,28 @@ def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mill :return: """ rm_secondary = job.wrapJobFn(samtools_view, bam, flag='0x800', mock=mock) - output_rm_secondary = job.wrapJobFn(upload_or_move_job, 'rm_secondary.bam', rm_secondary.rv(), output_dir) picard_sort = job.wrapJobFn(picard_sort_sam, rm_secondary.rv(), xmx=mem, mock=mock) - output_picard_sort = job.wrapJobFn(upload_or_move_job, 'picard_sort.bam', picard_sort.rv(0), output_dir) mdups = job.wrapJobFn(picard_mark_duplicates, picard_sort.rv(0), picard_sort.rv(1), xmx=mem, mock=mock) - output_mdups = job.wrapJobFn(upload_or_move_job, 'mdups.bam', mdups.rv(0), output_dir) - rtc = job.wrapJobFn(run_realigner_target_creator, cores, mdups.rv(0), mdups.rv(1), ref, - ref_dict, fai, phase, mills, mem, unsafe=unsafe, mock=mock) - ir = job.wrapJobFn(run_indel_realignment, rtc.rv(), bam, bai, ref, ref_dict, fai, phase, - mills, mem, unsafe=unsafe, mock=mock) - br = job.wrapJobFn(run_base_recalibration, cores, ir.rv(0), ir.rv(1), ref, ref_dict, fai, - dbsnp, mem, unsafe=unsafe, mock=mock) - pr = job.wrapJobFn(run_print_reads, cores, br.rv(), ir.rv(0), ir.rv(1), ref, ref_dict, fai, - mem, unsafe=unsafe, mock=mock) + realigner_target = job.wrapJobFn(run_realigner_target_creator, cores, mdups.rv(0), mdups.rv(1), + ref, ref_dict, fai, phase, mills, mem, unsafe=unsafe, + mock=mock) + indel_realign = job.wrapJobFn(run_indel_realignment, realigner_target.rv(), bam, bai, ref, + ref_dict, fai, phase, mills, mem, unsafe=unsafe, mock=mock) + base_recal = job.wrapJobFn(run_base_recalibration, cores, indel_realign.rv(0), + indel_realign.rv(1), ref, ref_dict, fai, dbsnp, mem, unsafe=unsafe, + mock=mock) + print_reads = job.wrapJobFn(run_print_reads, cores, base_recal.rv(), indel_realign.rv(0), + indel_realign.rv(1), ref, ref_dict, fai, mem, unsafe=unsafe, mock=mock) # Wiring job.addChild(rm_secondary) rm_secondary.addChild(picard_sort) - rm_secondary.addChild(output_rm_secondary) picard_sort.addChild(mdups) - picard_sort.addChild(output_picard_sort) - mdups.addChild(rtc) - mdups.addChild(output_mdups) - rtc.addChild(ir) - ir.addChild(br) - br.addChild(pr) - return pr.rv(0), pr.rv(1) + mdups.addChild(realigner_target) + realigner_target.addChild(indel_realign) + indel_realign.addChild(base_recal) + base_recal.addChild(print_reads) + return print_reads.rv(0), print_reads.rv(1) def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem,