Skip to content

Commit

Permalink
Local changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfeil committed Jul 19, 2016
1 parent df28d33 commit 7cc21a9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 43 deletions.
36 changes: 19 additions & 17 deletions src/toil_scripts/gatk_germline/germline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,33 @@ def gatk_germline_pipeline(job, uuid, url, config, bai_url=None):
if config ['xmx'] is None:
config['xmx'] = '{}G'.format(os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / (1024 ** 3))

download_bam = job.wrapJobFn(download_url_job, url, name='toil.bam', s3_key_path=config['ssec'])
if config['run_bwa']:
pass
else:
get_bam = job.wrapJobFn(download_url_job, url, name='toil.bam', s3_key_path=config['ssec'])

if bai_url:
bam_index = job.wrapJobFn(download_url_job, bai_url, name='toil.bam.bai', s3_key_path=config['ssec'])
get_bai = job.wrapJobFn(download_url_job, bai_url, name='toil.bam.bai', s3_key_path=config['ssec'])
else:
bam_index = job.wrapJobFn(samtools_index, download_bam.rv(), config)
get_bai = job.wrapJobFn(samtools_index, get_bam.rv(), config)

job.addChild(get_bam)
get_bam.addChild(get_bai)

if config['preprocess']:
preprocess = job.wrapJobFn(run_gatk_preprocessing, cores, download_bam.rv(), bam_index.rv(),
job.fileStore.logToMaster('Preprocessing sample: {}'.format(uuid))
preprocess = job.wrapJobFn(run_gatk_preprocessing, cores, get_bam.rv(), get_bai.rv(),
config['genome.fa'], config['genome.dict'],
config['genome.fa.fai'], config['phase.vcf'],
config['mills.vcf'], config['dbsnp.vcf'], config['output_dir'],
mem=config['xmx'],
mock=config['mock_mode']).encapsulate()
mem=config['xmx'], mock=config['mock_mode']).encapsulate()
haplotype_caller = job.wrapJobFn(gatk_haplotype_caller, preprocess.rv(0), preprocess.rv(1), config)

get_bai.addChild(preprocess)
else:
haplotype_caller = job.wrapJobFn(gatk_haplotype_caller, download_bam.rv(), bam_index.rv(), config)
haplotype_caller = job.wrapJobFn(gatk_haplotype_caller, get_bam.rv(), get_bai.rv(), config)

genotype_gvcf = job.wrapJobFn(gatk_genotype_gvcf, haplotype_caller.rv(), config)

snp_recal = job.wrapJobFn(gatk_variant_recalibrator_snp, genotype_gvcf.rv(), config)
apply_snp_recal = job.wrapJobFn(gatk_apply_variant_recalibration_snp, genotype_gvcf.rv(),
snp_recal.rv(0), snp_recal.rv(1), config)
Expand All @@ -69,14 +76,8 @@ def gatk_germline_pipeline(job, uuid, url, config, bai_url=None):
output_raw = job.wrapJobFn(upload_or_move_job, raw_name, genotype_gvcf.rv(), config['output_dir'])
output_vqsr = job.wrapJobFn(upload_or_move_job, vqsr_name, apply_indel_recal.rv(), config['output_dir'])

# Create DAG
job.addChild(download_bam)
download_bam.addChild(bam_index)
if config['preprocess']:
job.fileStore.logToMaster('Preprocessing sample: {}'.format(uuid))
bam_index.addChild(preprocess)
# Wait for bam index or preprocessing
bam_index.addFollowOn(haplotype_caller)
get_bai.addFollowOn(haplotype_caller)

haplotype_caller.addChild(genotype_gvcf)

Expand Down Expand Up @@ -472,6 +473,8 @@ def generate_config():
hapmap:
# Required: URL (1000G_omni.5.b37.vcf)
omni:
# Run BWA on fastqs
run_bwa:
# GATK Preprocessing
preprocess:
# Approximate input file size. Should be given as %d[TGMK], e.g.,
Expand Down Expand Up @@ -605,15 +608,14 @@ def main():
config = {x.replace('-', '_'): y for x, y in yaml.load(open(options.config).read()).iteritems()}
config_fields = set(config)
required_fields = {'genome', 'mills', 'dbsnp', 'phase', 'hapmap', 'omni', 'output_dir',
'preprocess', 'mock_mode', 'unsafe_mode', 'xmx', 'suffix'}
'run_bwa', 'preprocess', 'mock_mode', 'unsafe_mode', 'xmx', 'suffix'}
require(config_fields > required_fields,
'Missing config parameters:\n{}'.format(', '.join(required_fields - config_fields)))

if config['output_dir'] is None:
config['output_dir'] = options.output_dir if options.output_dir \
else os.path.join(os.getcwd(), 'output')


if config['suffix'] is None:
config['suffix'] = options.suffix if options.suffix else ''

Expand Down
45 changes: 19 additions & 26 deletions src/toil_scripts/tools/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ def picard_sort_sam(job, bam_id, xmx='8G', mock=False):
return bam_id, bai_id


def picard_mark_duplicates(job, bam_id, bai_id, xmx='8G', mock=False):
def picard_mark_duplicates(job, bam_id, bai_id, xmx='10G', mock=False):
"""
Uses picardtools MarkDuplicates
Runs picardtools MarkDuplicates. Assumes the bam file is coordinate sorted.
:param job:
:param bam_id:
Expand All @@ -188,8 +188,6 @@ def picard_mark_duplicates(job, bam_id, bai_id, xmx='8G', mock=False):
job.fileStore.readGlobalFile(bam_id, os.path.join(work_dir, 'sample.sorted.bam'))
job.fileStore.readGlobalFile(bai_id, os.path.join(work_dir, 'sample.sorted.bai'))

cores = multiprocessing.cpu_count() / 2

# Call: picardtools
command = ['MarkDuplicates',
'INPUT=sample.sorted.bam',
Expand All @@ -198,7 +196,7 @@ def picard_mark_duplicates(job, bam_id, bai_id, xmx='8G', mock=False):
'ASSUME_SORTED=true',
'CREATE_INDEX=true']
docker_call(work_dir=work_dir, parameters=command,
env={'_JAVA_OPTIONS':'-Djava.io.tmpdir=/data/ -Xmx10G'},
env={'_JAVA_OPTIONS':'-Djava.io.tmpdir=/data/ -Xmx{}'.format(xmx)},
tool='quay.io/ucsc_cgl/picardtools:1.95--dd5ac549b95eb3e5d166a5e310417ef13651994e',
outputs=outputs, mock=mock)

Expand Down Expand Up @@ -238,8 +236,7 @@ def run_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, db


def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, dbsnp,
output_dir,
mem='10G', unsafe=False, mock=False):
output_dir, mem='10G', unsafe=False, mock=False):
"""
Pre-processing steps for running the GATK Germline pipeline
Expand All @@ -257,32 +254,28 @@ def run_gatk_preprocessing(job, cores, bam, bai, ref, ref_dict, fai, phase, mill
:return:
"""
rm_secondary = job.wrapJobFn(samtools_view, bam, flag='0x800', mock=mock)
output_rm_secondary = job.wrapJobFn(upload_or_move_job, 'rm_secondary.bam', rm_secondary.rv(), output_dir)
picard_sort = job.wrapJobFn(picard_sort_sam, rm_secondary.rv(), xmx=mem, mock=mock)
output_picard_sort = job.wrapJobFn(upload_or_move_job, 'picard_sort.bam', picard_sort.rv(0), output_dir)
mdups = job.wrapJobFn(picard_mark_duplicates, picard_sort.rv(0), picard_sort.rv(1),
xmx=mem, mock=mock)
output_mdups = job.wrapJobFn(upload_or_move_job, 'mdups.bam', mdups.rv(0), output_dir)
rtc = job.wrapJobFn(run_realigner_target_creator, cores, mdups.rv(0), mdups.rv(1), ref,
ref_dict, fai, phase, mills, mem, unsafe=unsafe, mock=mock)
ir = job.wrapJobFn(run_indel_realignment, rtc.rv(), bam, bai, ref, ref_dict, fai, phase,
mills, mem, unsafe=unsafe, mock=mock)
br = job.wrapJobFn(run_base_recalibration, cores, ir.rv(0), ir.rv(1), ref, ref_dict, fai,
dbsnp, mem, unsafe=unsafe, mock=mock)
pr = job.wrapJobFn(run_print_reads, cores, br.rv(), ir.rv(0), ir.rv(1), ref, ref_dict, fai,
mem, unsafe=unsafe, mock=mock)
realigner_target = job.wrapJobFn(run_realigner_target_creator, cores, mdups.rv(0), mdups.rv(1),
ref, ref_dict, fai, phase, mills, mem, unsafe=unsafe,
mock=mock)
indel_realign = job.wrapJobFn(run_indel_realignment, realigner_target.rv(), bam, bai, ref,
ref_dict, fai, phase, mills, mem, unsafe=unsafe, mock=mock)
base_recal = job.wrapJobFn(run_base_recalibration, cores, indel_realign.rv(0),
indel_realign.rv(1), ref, ref_dict, fai, dbsnp, mem, unsafe=unsafe,
mock=mock)
print_reads = job.wrapJobFn(run_print_reads, cores, base_recal.rv(), indel_realign.rv(0),
indel_realign.rv(1), ref, ref_dict, fai, mem, unsafe=unsafe, mock=mock)
# Wiring
job.addChild(rm_secondary)
rm_secondary.addChild(picard_sort)
rm_secondary.addChild(output_rm_secondary)
picard_sort.addChild(mdups)
picard_sort.addChild(output_picard_sort)
mdups.addChild(rtc)
mdups.addChild(output_mdups)
rtc.addChild(ir)
ir.addChild(br)
br.addChild(pr)
return pr.rv(0), pr.rv(1)
mdups.addChild(realigner_target)
realigner_target.addChild(indel_realign)
indel_realign.addChild(base_recal)
base_recal.addChild(print_reads)
return print_reads.rv(0), print_reads.rv(1)


def run_realigner_target_creator(job, cores, bam, bai, ref, ref_dict, fai, phase, mills, mem,
Expand Down

0 comments on commit 7cc21a9

Please sign in to comment.