Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove credentials from ADAM pipeline #116

Merged
merged 3 commits into from
Feb 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions src/toil_scripts/adam_gatk_pipeline/align_and_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@

# import from python system libraries
import argparse
import copy
import multiprocessing
import os

Expand Down Expand Up @@ -143,10 +144,6 @@ def build_parser():
help = 'S3 Bucket URI')
parser.add_argument('-3r', '--bucket_region', default = "us-west-2",
help = 'Region of the S3 bucket. Defaults to us-east-1.')
parser.add_argument('-y', '--aws_access_key', required = True,
help = 'Amazon web services access key')
parser.add_argument('-S', '--aws_secret_key', required = True,
help = 'Amazon web services secret key')

# add file size argument
parser.add_argument('-se', '--file_size', default = '100G',
Expand Down Expand Up @@ -206,16 +203,22 @@ def sample_loop(job, bucket_region, s3_bucket, uuid_list, bwa_inputs, adam_input

for uuid in uuid_list:

uuid_bwa_inputs = copy.deepcopy(bwa_inputs)
uuid_adam_inputs = copy.deepcopy(adam_inputs)
uuid_gatk_preprocess_inputs = copy.deepcopy(gatk_preprocess_inputs)
uuid_gatk_adam_call_inputs = copy.deepcopy(gatk_adam_call_inputs)
uuid_gatk_gatk_call_inputs = copy.deepcopy(gatk_gatk_call_inputs)

## set uuid inputs
bwa_inputs['lb'] = uuid
bwa_inputs['uuid'] = uuid
adam_inputs['outDir'] = "s3://%s/analysis/%s" % (s3_bucket, uuid)
adam_inputs['bamName'] = "s3://%s/alignment/%s.bam" % (s3_bucket, uuid)
gatk_preprocess_inputs['s3_dir'] = "%s/analysis/%s" % (s3_bucket, uuid)
gatk_adam_call_inputs['s3_dir'] = "%s/analysis/%s" % (s3_bucket, uuid)
gatk_gatk_call_inputs['s3_dir'] = "%s/analysis/%s" % (s3_bucket, uuid)

job.addChildJobFn(static_dag, bucket_region, s3_bucket, uuid, bwa_inputs, adam_inputs, gatk_preprocess_inputs, gatk_adam_call_inputs, gatk_gatk_call_inputs, pipeline_to_run )
uuid_bwa_inputs['lb'] = uuid
uuid_bwa_inputs['uuid'] = uuid
uuid_adam_inputs['outDir'] = "s3://%s/analysis/%s" % (s3_bucket, uuid)
uuid_adam_inputs['bamName'] = "s3://%s/alignment/%s.bam" % (s3_bucket, uuid)
uuid_gatk_preprocess_inputs['s3_dir'] = "%s/analysis/%s" % (s3_bucket, uuid)
uuid_gatk_adam_call_inputs['s3_dir'] = "%s/analysis/%s" % (s3_bucket, uuid)
uuid_gatk_gatk_call_inputs['s3_dir'] = "%s/analysis/%s" % (s3_bucket, uuid)

job.addChildJobFn(static_dag, bucket_region, s3_bucket, uuid, uuid_bwa_inputs, uuid_adam_inputs, uuid_gatk_preprocess_inputs, uuid_gatk_adam_call_inputs, uuid_gatk_gatk_call_inputs, pipeline_to_run )



Expand Down Expand Up @@ -339,17 +342,13 @@ def static_dag(job, bucket_region, s3_bucket, uuid, bwa_inputs, adam_inputs, gat
's3_dir': "%s/alignment" % args.s3_bucket,
'cpu_count': None,
'file_size': args.file_size,
'use_bwakit': args.use_bwakit,
'aws_access_key': args.aws_access_key,
'aws_secret_key': args.aws_secret_key}
'use_bwakit': args.use_bwakit}

if args.num_nodes <= 1:
raise ValueError("--num_nodes allocates one Spark/HDFS master and n-1 workers, and thus must be greater than 1. %d was passed." % args.num_nodes)

adam_inputs = {'numWorkers': args.num_nodes - 1,
'knownSNPs': args.dbsnp.replace("https://s3-us-west-2.amazonaws.com/", "s3://"),
'accessKey': args.aws_access_key,
'secretKey': args.aws_secret_key,
'driverMemory': args.driver_memory,
'executorMemory': args.executor_memory,
'sudo': args.sudo,
Expand All @@ -376,8 +375,6 @@ def static_dag(job, bucket_region, s3_bucket, uuid, bwa_inputs, adam_inputs, gat
'cpu_count': str(multiprocessing.cpu_count()),
'ssec': None,
'file_size': args.file_size,
'aws_access_key': args.aws_access_key,
'aws_secret_key': args.aws_secret_key,
'suffix': '.adam',
'sudo': args.sudo}

Expand All @@ -392,8 +389,6 @@ def static_dag(job, bucket_region, s3_bucket, uuid, bwa_inputs, adam_inputs, gat
'cpu_count': str(multiprocessing.cpu_count()),
'ssec': None,
'file_size': args.file_size,
'aws_access_key': args.aws_access_key,
'aws_secret_key': args.aws_secret_key,
'suffix': '.gatk',
'sudo': args.sudo}

Expand Down
2 changes: 0 additions & 2 deletions src/toil_scripts/adam_gatk_pipeline/call_vs_grch38.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ python -m toil_scripts.adam_gatk_pipeline.align_and_call \
--uuid_manifest my_manifest_file \
--s3_bucket ${BUCKET} \
--bucket_region ${REGION} \
--aws_access_key ${AWS_ACCESS_KEY_ID} \
--aws_secret_key ${AWS_SECRET_ACCESS_KEY} \
--ref https://s3-us-west-2.amazonaws.com/cgl-pipeline-inputs/variant_grch38_reordered/GRCh38_full_analysis_set_plus_decoy_hla.reordered.fa \
--amb https://s3-us-west-2.amazonaws.com/cgl-pipeline-inputs/variant_grch38_reordered/GRCh38_full_analysis_set_plus_decoy_hla.reordered.fa.amb \
--ann https://s3-us-west-2.amazonaws.com/cgl-pipeline-inputs/variant_grch38_reordered/GRCh38_full_analysis_set_plus_decoy_hla.reordered.fa.ann \
Expand Down
13 changes: 2 additions & 11 deletions src/toil_scripts/adam_pipeline/spark_toil_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ def call_conductor(masterIP, inputs, src, dst):
docker_call(no_rm = True,
work_dir = os.getcwd(),
tool = "quay.io/ucsc_cgl/conductor",
docker_parameters = ["--net=host",
"-e", "AWS_ACCESS_KEY="+inputs['accessKey'],
"-e", "AWS_SECRET_KEY="+inputs['secretKey']],
docker_parameters = ["--net=host"],
tool_parameters = ["--master", "spark://"+masterIP+":"+SPARK_MASTER_PORT,
"--conf", "spark.driver.memory=%sg" % inputs["driverMemory"],
"--conf", "spark.executor.memory=%sg" % inputs["executorMemory"],
Expand All @@ -84,7 +82,6 @@ def call_conductor(masterIP, inputs, src, dst):

def call_adam(masterIP, inputs, arguments):

params = []
default_params = ["--master", ("spark://%s:%s" % (masterIP, SPARK_MASTER_PORT)),
"--conf", ("spark.driver.memory=%sg" % inputs["driverMemory"]),
"--conf", ("spark.executor.memory=%sg" % inputs["executorMemory"]),
Expand All @@ -95,7 +92,7 @@ def call_adam(masterIP, inputs, arguments):
work_dir = os.getcwd(),
tool = "quay.io/ucsc_cgl/adam:cd6ef41",
docker_parameters = ["--net=host"],
tool_parameters = params,
tool_parameters = default_params + arguments,
sudo = inputs['sudo'])


Expand Down Expand Up @@ -430,10 +427,6 @@ def build_parser():
help = 's3 directory url in which to place output files')
parser.add_argument('-k', '--known_SNPs', required = True,
help = 'The full s3 url of a VCF file of known snps')
parser.add_argument('-a', '--aws_access_key', required = True,
help = 'Amazon web services access key')
parser.add_argument('-s', '--aws_secret_key', required = True,
help = 'Amazon web services secret key')
parser.add_argument('-d', '--driver_memory', required = True,
help = 'Amount of memory to allocate for Spark Driver.')
parser.add_argument('-q', '--executor_memory', required = True,
Expand All @@ -457,8 +450,6 @@ def main(args):
'outDir': options.output_directory,
'bamName': options.input_file_name,
'knownSNPs': options.known_SNPs,
'accessKey': options.aws_access_key,
'secretKey': options.aws_secret_key,
'driverMemory': options.driver_memory,
'executorMemory': options.executor_memory,
'sudo': options.sudo,
Expand Down
4 changes: 1 addition & 3 deletions src/toil_scripts/batch_alignment/bwa_alignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,9 +607,7 @@ def main():
'uuid': None,
'cpu_count': None,
'file_size': args.file_size,
'use_bwakit': args.use_bwakit,
'aws_access_key': None,
'aws_secret_key': None}
'use_bwakit': args.use_bwakit}

# Launch Pipeline
Job.Runner.startToil(Job.wrapJobFn(download_shared_files, inputs), args)
Expand Down
71 changes: 61 additions & 10 deletions src/toil_scripts/gatk_germline/germline.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def download_url(job, url, filename):
subprocess.check_call(['curl', '-fs', '--retry', '5', '--create-dir', url, '-o', file_path])
except subprocess.CalledProcessError as cpe:
raise RuntimeError(
'\nNecessary file could not be acquired: %s. Got error "%s". Check input URL' % (url, e))
'\nNecessary file could not be acquired: %s. Got error "%s". Check input URL' % (url, cpe))
except OSError:
raise RuntimeError('Failed to find "curl". Install via "apt-get install curl"')
assert os.path.exists(file_path)
Expand Down Expand Up @@ -314,8 +314,8 @@ def index(job, shared_ids, input_args):

def haplotype_caller(job, shared_ids, input_args):
"""
Uses GATK HaplotypeCaller to identify SNPs and Indels.
Calls variant quality score recalibration functions.
Uses GATK HaplotypeCaller to identify SNPs and Indels and writes a gVCF.
Calls per-sample genotyper to genotype gVCF.

:param job: Job instance
:param shared_ids: dictionary of shared file promises
Expand All @@ -324,29 +324,82 @@ def haplotype_caller(job, shared_ids, input_args):
work_dir = job.fileStore.getLocalTempDir()
input_files = ['ref.fa', 'ref.fa.fai', 'ref.dict', 'toil.bam', 'toil.bam.bai']
read_from_filestore_hc(job, work_dir, shared_ids, *input_files)
output = 'unified.raw.BOTH.gatk.vcf'
output = '%s.raw.BOTH%s.gvcf' % (input_args['uuid'],
input_args['suffix'])

# Call GATK -- HaplotypeCaller
command = ['-nct', input_args['cpu_count'],
'-R', 'ref.fa',
'-T', 'HaplotypeCaller',
'--genotyping_mode', 'Discovery',
'--output_mode', 'EMIT_VARIANTS_ONLY',
'--emitRefConfidence', 'GVCF',
'-I', 'toil.bam',
'-o', 'unified.raw.BOTH.gatk.vcf',
'-o', output,
'-variant_index_type', 'LINEAR',
'-variant_index_parameter', '128000',
'--annotation', 'QualByDepth',
'--annotation', 'DepthPerSampleHC',
'--annotation', 'FisherStrand',
'--annotation', 'ReadPosRankSumTest']
try:
docker_call(work_dir = work_dir,
tool_parameters = command,
tool = 'quay.io/ucsc_cgl/gatk',
sudo = input_args['sudo'])
except:
sys.stderr.write("Running haplotype caller with %s in %s failed." % (
" ".join(command), work_dir))
raise

# Update fileStore and spawn child job
shared_ids[output] = job.fileStore.writeGlobalFile(os.path.join(work_dir, output))

# upload gvcf
upload_or_move_hc(work_dir, input_args, output)

# call variants prior to vqsr
job.addChildJobFn(genotype_gvcf, shared_ids, input_args)


def genotype_gvcf(job, shared_ids, input_args):
"""
Genotypes the gVCF generated by the HaplotypeCaller.
Calls variant quality score recalibration functions.

:param job: Job instance
:param shared_ids: dictionary of shared file promises
:param input_args: dictionary of input arguments
"""

work_dir = job.fileStore.getLocalTempDir()
input_files = ['%s.raw.BOTH%s.gvcf' % (input_args['uuid'],
input_args['suffix']),
'ref.fa', 'ref.fa.fai', 'ref.dict']
read_from_filestore_hc(job, work_dir, shared_ids, *input_files)
output = 'unified.raw.BOTH.gatk.vcf'

command = ['-nt', input_args['cpu_count'],
'-R', 'ref.fa',
'-T', 'GenotypeGVCFs',
'--variant', '%s.raw.BOTH.gatk.gvcf' % input_args['uuid'],
'--out', output,
'-stand_emit_conf', '10.0',
'-stand_call_conf', '30.0']

try:
docker_call(work_dir = work_dir,
tool_parameters = command,
tool = 'quay.io/ucsc_cgl/gatk',
sudo = input_args['sudo'])
except:
sys.stderr.write("Running haplotype caller with %s in %s failed." % (
sys.stderr.write("Running GenotypeGVCFs with %s in %s failed." % (
" ".join(command), work_dir))
raise

# Update fileStore and spawn child job
shared_ids['unified.raw.BOTH.gatk.vcf'] = job.fileStore.writeGlobalFile(os.path.join(work_dir, output))
shared_ids[output] = job.fileStore.writeGlobalFile(os.path.join(work_dir, output))

# run vqsr
job.addChildJobFn(vqsr_snp, shared_ids, input_args)
job.addChildJobFn(vqsr_indel, shared_ids, input_args)

Expand Down Expand Up @@ -523,8 +576,6 @@ def apply_vqsr_indel(job, shared_ids, input_args):
'cpu_count': str(multiprocessing.cpu_count()),
'file_size': args.file_size,
'ssec': None,
'aws_access_key': None,
'aws_secret_key': None,
'sudo': False}

Job.Runner.startToil(Job.wrapJobFn(batch_start, inputs), args)