From 1e0c5d6a43da6eb38bf6927af1db41474c683e8d Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Tue, 24 Oct 2017 12:43:14 -0700 Subject: [PATCH 1/4] finished mango workflow --- .../workflows/deca_pipeline/call_cnvs.py | 9 +- .../workflows/mango_pipeline/__init__.py | 15 + .../mango_pipeline/run_mango_browser.py | 311 ++++++++++++++++++ .../mango_pipeline/run_mango_notebook.py | 175 ++++++++++ bdgenomics/workflows/tools/functions.py | 11 + bdgenomics/workflows/tools/spark_tools.py | 193 +++++++++++ setup.py | 8 +- 7 files changed, 713 insertions(+), 9 deletions(-) create mode 100644 bdgenomics/workflows/mango_pipeline/__init__.py create mode 100644 bdgenomics/workflows/mango_pipeline/run_mango_browser.py create mode 100644 bdgenomics/workflows/mango_pipeline/run_mango_notebook.py create mode 100644 bdgenomics/workflows/tools/functions.py diff --git a/bdgenomics/workflows/deca_pipeline/call_cnvs.py b/bdgenomics/workflows/deca_pipeline/call_cnvs.py index e2470b6..1713c15 100644 --- a/bdgenomics/workflows/deca_pipeline/call_cnvs.py +++ b/bdgenomics/workflows/deca_pipeline/call_cnvs.py @@ -31,6 +31,8 @@ from toil_lib.urls import download_url_job from bdgenomics.workflows.spark import spawn_spark_cluster + +from bdgenomics.workflows.tools.functions import is_s3 from bdgenomics.workflows.tools.spark_tools import call_deca, \ MasterAddress, \ HDFS_MASTER_PORT, \ @@ -71,12 +73,7 @@ def setup_deca_state(job, job.addFollowOn(call_cnvs) else: - - # all files must have s3 urls - def is_s3(f): - require(f.startswith("s3a"), - "url for file %s did not start with s3a scheme" % f) - + is_s3(targets) is_s3(output) for f in input_files: diff --git a/bdgenomics/workflows/mango_pipeline/__init__.py b/bdgenomics/workflows/mango_pipeline/__init__.py new file mode 100644 index 0000000..c7b5dae --- /dev/null +++ b/bdgenomics/workflows/mango_pipeline/__init__.py @@ -0,0 +1,15 @@ +# Licensed to Big Data Genomics (BDG) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The BDG licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/bdgenomics/workflows/mango_pipeline/run_mango_browser.py b/bdgenomics/workflows/mango_pipeline/run_mango_browser.py new file mode 100644 index 0000000..3b95391 --- /dev/null +++ b/bdgenomics/workflows/mango_pipeline/run_mango_browser.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python2.7 +# +# Licensed to Big Data Genomics (BDG) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The BDG licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import multiprocessing +import os +import sys +import textwrap +from subprocess import check_call, check_output + +from toil.job import Job + +from toil_lib import require +from toil_lib.files import generate_file, move_files +from toil_lib.urls import download_url_job + + +from bdgenomics.workflows.spark import spawn_spark_cluster +from bdgenomics.workflows.tools.functions import is_s3 +from bdgenomics.workflows.tools.spark_tools import call_mango_browser, \ + MasterAddress, \ + HDFS_MASTER_PORT, \ + SPARK_MASTER_PORT + +_log = logging.getLogger(__name__) + +def setup_mango_state(job, + reference, + genes, + reads, + variants, + features, + show_genotypes, + host, + port, + memory, + run_local, run_mac, num_nodes, + aws_access_key_id, aws_secret_access_key): + + if run_local: + # import reference + file_name = os.path.basename(reference) + file_id = job.wrapJobFn(download_url_job, reference) + job.addChild(file_id) + + loaded_reference = (file_name, file_id.rv()) + + + loaded_reads = [] + + if reads is not None: + for f in reads.split(','): + file_name = os.path.basename(f) + file_id = job.wrapJobFn(download_url_job, f) + job.addChild(file_id) + + loaded_reads.append((file_name, file_id.rv())) + + # if file is bam, index is required + if file_name.endswith('bam'): + index_name = file_name + ".bai" + index_id = job.wrapJobFn(download_url_job, f + ".bai") + job.addChild(index_id) + + loaded_variants = [] + if variants is not None: + for f in variants.split(','): + file_name = os.path.basename(f) + file_id = job.wrapJobFn(download_url_job, f) + job.addChild(file_id) + + loaded_variants.append((file_name, file_id.rv())) + + loaded_features = [] + if features is not None: + for f in features.split(','): + file_name = os.path.basename(f) + file_id = job.wrapJobFn(download_url_job, f) + job.addChild(file_id) + + loaded_features.append((file_name, file_id.rv())) + + run_mango = job.wrapJobFn(run_mango_browser, + loaded_reference, + genes, + loaded_reads, + loaded_variants, + loaded_features, + show_genotypes, + host, + port, + memory, + run_local, + run_mac, + None, + aws_access_key_id, aws_secret_access_key) + job.addFollowOn(run_mango) + + else: + + is_s3(reference) + if reads is not None: + for f in reads.split(','): + is_s3(f) + if f.endswith('bam'): + is_s3(f + '.bai') + + if variants is not None: + for f in variants.split(','): + is_s3(f) + + if features is not None: + for f in features.split(','): + is_s3(f) + + # launch the spark cluster + master_ip = spawn_spark_cluster(job, + int(num_nodes) - 1, + cores=multiprocessing.cpu_count(), + memory=memory) + + run_mango = job.wrapJobFn(run_mango_browser, + reference, + genes, # usually just url + reads, + variants, + features, + show_genotypes, + host, + port, + memory, + False, + False, + master_ip, + aws_access_key_id, aws_secret_access_key) + job.addChild(run_mango) + + +def run_mango_browser(job, + reference, + genes, + reads, + variants, + features, + show_genotypes, + host, + port, + memory, + run_local, + run_mac, + master_ip, + aws_access_key_id, + aws_secret_access_key): + + if run_local: + + # holds mango arguments + arguments = [] + + # get work dir + work_dir = job.fileStore.getLocalTempDir() + + # load reference + job.fileStore.readGlobalFile(reference[1], os.path.join(work_dir, reference[0])) + arguments.append('/data/%s' % reference[0]) + + # load genes + if genes: + arguments.extend(['-genes', genes[0]]) + + # format reads, variants and features + + # load reads + formatted_reads = [] + for (f, f_id) in reads: + formatted_reads.append('/data/%s' % f) + job.fileStore.readGlobalFile(f_id, os.path.join(work_dir, f)) + + if formatted_reads: + arguments.extend(['-reads', ','.join(formatted_reads)]) + + # load variants + formatted_variants = [] + for (f, f_id) in variants: + formatted_variants.append('/data/%s' % f) + job.fileStore.readGlobalFile(f_id, os.path.join(work_dir, f)) + + if formatted_variants: + arguments.extend(['-variants', ','.join(formatted_variants)]) + + # load features + formatted_features = [] + for (f, f_id) in features: + formatted_features.append('/data/%s' % f) + job.fileStore.readGlobalFile(bam_id, os.path.join(work_dir, f)) + + if formatted_features: + arguments.extend(['-features', ','.join(formatted_features)]) + + call_mango_browser(job, master_ip=None, + arguments=arguments, + memory=memory, + run_local=True, + run_mac=run_mac, + work_dir=work_dir, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key) + + else: + + # TODO what about file formatting? + + # TODO parameters + + call_mango_browser(job, master_ip=master_ip, + arguments=[reference, + '-genes', genes, + '-reads', ' '.join(reads), + '-variants', ' '.join(variants), + '-features', ' '.join(features)], + host=host, + port=port, + memory=memory, + run_local=False, + run_mac=False, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key) + +def main(): + + parser = argparse.ArgumentParser() + parser.add_argument('--reference', help='Path to a file containing the S3 URL or local paths to the reference .2bit, fasta, or adam file.', + required=True) + parser.add_argument('--genes', help='URL to genes.') + parser.add_argument('--reads', help='Comma separated (,) list of paths to files containing the S3 URL or local paths to input bam or adam files.') + parser.add_argument('--variants', help='Comma separated (,) list of paths to files containing the S3 URL or local paths to input vcf or adam files.') + parser.add_argument('--features', help='Comma separated (,) list of paths to files containing the S3 URL or local paths to input bed, narrowpeak or adam files.') + parser.add_argument('--show_genotypes', help='If set, shows genotypes from variant files.',default=False) + parser.add_argument('--run-local', default=False, action='store_true', + help='if specified, runs locally. exclusive of --num-nodes') + parser.add_argument('--host', default='localhost', action='store_true', + help='host to forward web UI to. Default is localhost.') + parser.add_argument('--port', default=8080, action='store_true', + help='pot to forward web UI to. Default is 8080.') + parser.add_argument('--run-mac', default=False, action='store_true', + help='if specified, runs on mac.') + parser.add_argument('--num-nodes', default=None, + help='the number of nodes to use for the spark cluster.' + 'exclusive of --run-local') + parser.add_argument('--memory', required=True, default=None, + help='Amount of memory (in gb) to allocate for mango') + parser.add_argument('--aws_access_key', required=False, default=None, + help='AWS access key for authenticating with S3') + parser.add_argument('--aws_secret_key', required=False, default=None, + help='AWS secret key for authenticating with S3') + + Job.Runner.addToilOptions(parser) + args = parser.parse_args() + cwd = os.getcwd() + + require(not (args.run_local and args.num_nodes), + 'Only one of --run-local and --num-nodes can be provided.') + require((not args.aws_access_key and not args.aws_secret_key) or + (args.aws_access_key and args.aws_secret_key), + 'If AWS access key is provided, AWS secret key must also be provided') + + if not args.run_local: + require(args.num_nodes, + 'neither --run-local or --num-nodes was specified.') + require(int(args.num_nodes) > 1, + 'num_nodes allocates one Spark/HDFS master and n-1 workers, and ' + 'thus must be greater than 1. %s was passed.' % args.num_nodes) + + + _log.info("startToil") + + Job.Runner.startToil(Job.wrapJobFn(setup_mango_state, + args.reference, + args.genes, + args.reads, + args.variants, + args.features, + args.show_genotypes, + args.host, + args.port, + args.memory, + args.run_local, + args.run_mac, + args.num_nodes, + args.aws_access_key, + args.aws_secret_key), args) + + +if __name__ == "__main__": + main() diff --git a/bdgenomics/workflows/mango_pipeline/run_mango_notebook.py b/bdgenomics/workflows/mango_pipeline/run_mango_notebook.py new file mode 100644 index 0000000..146aaa5 --- /dev/null +++ b/bdgenomics/workflows/mango_pipeline/run_mango_notebook.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python2.7 +# +# Licensed to Big Data Genomics (BDG) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The BDG licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import logging +import multiprocessing +import os +import sys +import textwrap +from subprocess import check_call, check_output + +from toil.job import Job + +from toil_lib import require +from toil_lib.files import generate_file, move_files +from toil_lib.urls import download_url_job + + +from bdgenomics.workflows.spark import spawn_spark_cluster +from bdgenomics.workflows.tools.functions import is_s3 +from bdgenomics.workflows.tools.spark_tools import call_mango_notebook, \ + MasterAddress, \ + HDFS_MASTER_PORT, \ + SPARK_MASTER_PORT + +_log = logging.getLogger(__name__) + +def setup_mango_state(job, + host, + port, + memory, + run_local, run_mac, num_nodes, + aws_access_key_id, aws_secret_access_key): + + if run_local: + + + run_mango = job.wrapJobFn(run_mango_notebook, + host, + port, + memory, + run_local, + run_mac, + None, + aws_access_key_id, aws_secret_access_key) + job.addFollowOn(run_mango) + + else: + + # launch the spark cluster + master_ip = spawn_spark_cluster(job, + int(num_nodes) - 1, + cores=multiprocessing.cpu_count(), + memory=memory) + + run_mango = job.wrapJobFn(run_mango_notebook, + host, + port, + memory, + False, + False, + master_ip, + aws_access_key_id, aws_secret_access_key) + job.addChild(run_mango) + + +def run_mango_notebook(job, + host, + port, + memory, + run_local, + run_mac, + master_ip, + aws_access_key_id, + aws_secret_access_key): + + # get work dir + work_dir = job.fileStore.getLocalTempDir() + + arguments = [] + arguments.append('--allow-root') # required for npm in docker + + if run_local: + + # TODO: NOT SURE IF WE NEED THIS WHEN NET-HOST IS SET + arguments.append('--ip=0.0.0.0') + + call_mango_notebook(job, master_ip=None, arguments=arguments, + memory=memory, + run_local=True, + run_mac=run_mac, + work_dir=work_dir, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key) + + else: + + call_mango_notebook(job, master_ip=master_ip, arguments=arguments, + host=host, + port=port, + memory=memory, + run_local=False, + run_mac=False, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key) + +def main(): + + parser = argparse.ArgumentParser() + parser.add_argument('--run-local', default=False, action='store_true', + help='if specified, runs locally. exclusive of --num-nodes') + parser.add_argument('--host', default='localhost', action='store_true', + help='host to forward web UI to. Default is localhost.') + parser.add_argument('--port', default=10000, action='store_true', + help='pot to forward web UI to. Default is 10000.') + parser.add_argument('--run-mac', default=False, action='store_true', + help='if specified, runs on mac.') + parser.add_argument('--num-nodes', default=None, + help='the number of nodes to use for the spark cluster.' + 'exclusive of --run-local') + parser.add_argument('--memory', required=True, default=None, + help='Amount of memory (in gb) to allocate for mango') + parser.add_argument('--aws_access_key', required=False, default=None, + help='AWS access key for authenticating with S3') + parser.add_argument('--aws_secret_key', required=False, default=None, + help='AWS secret key for authenticating with S3') + + Job.Runner.addToilOptions(parser) + args = parser.parse_args() + cwd = os.getcwd() + + require(not (args.run_local and args.num_nodes), + 'Only one of --run-local and --num-nodes can be provided.') + require((not args.aws_access_key and not args.aws_secret_key) or + (args.aws_access_key and args.aws_secret_key), + 'If AWS access key is provided, AWS secret key must also be provided') + + if not args.run_local: + require(args.num_nodes, + 'neither --run-local or --num-nodes was specified.') + require(int(args.num_nodes) > 1, + 'num_nodes allocates one Spark/HDFS master and n-1 workers, and ' + 'thus must be greater than 1. %s was passed.' % args.num_nodes) + + + _log.info("startToil") + + Job.Runner.startToil(Job.wrapJobFn(setup_mango_state, + args.host, + args.port, + args.memory, + args.run_local, + args.run_mac, + args.num_nodes, + args.aws_access_key, + args.aws_secret_key), args) + + +if __name__ == "__main__": + main() diff --git a/bdgenomics/workflows/tools/functions.py b/bdgenomics/workflows/tools/functions.py new file mode 100644 index 0000000..378035c --- /dev/null +++ b/bdgenomics/workflows/tools/functions.py @@ -0,0 +1,11 @@ +""" +Functions used across pipelines + +@author Alyssa Morrow +""" +from toil_lib import require + +# all files must have s3 urls +def is_s3(f): + require(f.startswith("s3a"), + "url for file %s did not start with s3a scheme" % f) diff --git a/bdgenomics/workflows/tools/spark_tools.py b/bdgenomics/workflows/tools/spark_tools.py index b664461..61aa0e0 100644 --- a/bdgenomics/workflows/tools/spark_tools.py +++ b/bdgenomics/workflows/tools/spark_tools.py @@ -273,3 +273,196 @@ def call_deca(job, master_ip, arguments, arguments, override_parameters)) + +def call_mango_browser(job, master_ip, arguments, + host='127.0.0.1', + port=8080, + memory=None, + override_parameters=None, + work_dir=None, + run_local=False, + run_mac=False, + aws_access_key_id=None, + aws_secret_access_key=None): + """ + Invokes the Mango browsercontainer. Find mango at https://github.com/bigdatagenomics/mango. + + :param toil.Job.job job: The Toil Job calling this function + :param masterIP: The Spark leader IP address. + :param arguments: Arguments to pass to Mango. + :param memory: Gigabytes of memory to provision for Spark driver/worker. + :param override_parameters: Parameters passed by the user, that override our defaults. + :param run_local: If true, runs Spark with the --master local[*] setting, which uses + all cores on the local machine. The master_ip will be disregarded. + + :type masterIP: MasterAddress + :type arguments: list of string + :type memory: int or None + :type override_parameters: list of string or None + :type native_adam_path: string or None + :type run_local: boolean + """ + + if run_local: + master = ["--master", "local[*]"] + else: + ''' + hostname = check_output(["hostname", "-i"])[:-1] + master = ["--conf", "spark.driver.host=%s" % hostname] + ''' + master = [] + pass + + default_params = (master + [ + # set max result size to unlimited, see #177 + "--conf", "spark.driver.maxResultSize=0", + "--conf", "spark.hadoop.hadoopbam.bam.enable-bai-splitter=true", + # "--packages", "com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.4", TODO download failing + "--conf", "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem"]) + + docker_parameters = [] + + if not run_mac: + docker_parameters.extend(['--net=host']) # for accessing localhost + else: + # port forwarding because we have not set net host + endpoint = "{}:8080".format(port) + docker_parameters.extend(['-p', endpoint]) + + if aws_access_key_id: + + require(aws_secret_access_key, + 'If AWS access key is passed, secret key must be defined') + + docker_parameters.extend(['-e', 'AWS_ACCESS_KEY_ID=%s' % aws_access_key_id, + '-e', 'AWS_SECRET_ACCESS_KEY=%s' % aws_secret_access_key]) + + default_params.extend( + [ + "--conf", "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem"]) + + for scheme in ['s3', 's3n']: + default_params.extend([ + "--conf", "spark.hadoop.fs.%s.awsAccessKeyId=%s" % (scheme, aws_access_key_id), + "--conf", "spark.hadoop.fs.%s.awsSecretAccessKey=%s" % (scheme, aws_secret_access_key)]) + + default_params.extend([ + "--conf", "spark.hadoop.fs.s3a.access.key=%s" % aws_access_key_id, + "--conf", "spark.hadoop.fs.s3a.secret.key=%s" % aws_secret_access_key, + "--conf", "spark.executorEnv.AWS_ACCESS_KEY_ID=%s" % aws_access_key_id, + "--conf", "spark.executorEnv.AWS_SECRET_ACCESS_KEY=%s" % aws_secret_access_key]) + + if work_dir: + docker_parameters.extend(['-v', '%s:/data' % work_dir]) + + parameters=_make_parameters(master_ip, + default_params, + memory, + arguments, + override_parameters) + + + dockerCall(job=job, + tool="quay.io/ucsc_cgl/mango:latest", + dockerParameters=docker_parameters, + parameters=parameters) + + + +def call_mango_notebook(job, master_ip, arguments, + host='127.0.0.1', + port=8888, + memory=None, + override_parameters=None, + work_dir=None, + run_local=False, + run_mac=False, + aws_access_key_id=None, + aws_secret_access_key=None): + """ + Invokes the Mango browsercontainer. Find mango at https://github.com/bigdatagenomics/mango. + + :param toil.Job.job job: The Toil Job calling this function + :param masterIP: The Spark leader IP address. + :param arguments: Arguments to pass to ADAM. + :param memory: Gigabytes of memory to provision for Spark driver/worker. + :param override_parameters: Parameters passed by the user, that override our defaults. + :param run_local: If true, runs Spark with the --master local[*] setting, which uses + all cores on the local machine. The master_ip will be disregarded. + + :type masterIP: MasterAddress + :type arguments: list of string + :type memory: int or None + :type override_parameters: list of string or None + :type native_adam_path: string or None + :type run_local: boolean + """ + + if run_local: + master = ["--master", "local[*]"] + else: + ''' + hostname = check_output(["hostname", "-i"])[:-1] + master = ["--conf", "spark.driver.host=%s" % hostname] + ''' + master = [] + pass + + default_params = (master + [ + # set max result size to unlimited, see #177 + "--conf", "spark.driver.maxResultSize=0", + "--conf", "spark.hadoop.hadoopbam.bam.enable-bai-splitter=true", + # TODO broken package + # "--packages", "com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.4", + "--conf", "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem"]) + + docker_parameters = [] + if not run_mac: + docker_parameters.extend(['--net=host']) # for accessing localhost + else: + # port forwarding + endpoint = "{}:8888".format(port) + docker_parameters.extend(['-p', endpoint]) + + # reconfigure entrypoint for notebook + docker_parameters.extend(['--entrypoint=/home/mango/bin/mango-notebook']) + + if aws_access_key_id: + + require(aws_secret_access_key, + 'If AWS access key is passed, secret key must be defined') + + docker_parameters.extend(['-e', 'AWS_ACCESS_KEY_ID=%s' % aws_access_key_id, + '-e', 'AWS_SECRET_ACCESS_KEY=%s' % aws_secret_access_key]) + + default_params.extend( + [ + "--conf", "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem"]) + + for scheme in ['s3', 's3n']: + default_params.extend([ + "--conf", "spark.hadoop.fs.%s.awsAccessKeyId=%s" % (scheme, aws_access_key_id), + "--conf", "spark.hadoop.fs.%s.awsSecretAccessKey=%s" % (scheme, aws_secret_access_key)]) + + default_params.extend([ + "--conf", "spark.hadoop.fs.s3a.access.key=%s" % aws_access_key_id, + "--conf", "spark.hadoop.fs.s3a.secret.key=%s" % aws_secret_access_key, + "--conf", "spark.executorEnv.AWS_ACCESS_KEY_ID=%s" % aws_access_key_id, + "--conf", "spark.executorEnv.AWS_SECRET_ACCESS_KEY=%s" % aws_secret_access_key]) + + if work_dir: + docker_parameters.extend(['-v', '%s:/data' % work_dir]) + + parameters=_make_parameters(master_ip, + default_params, + memory, + arguments, + override_parameters) + + print(parameters) + + + dockerCall(job=job, + tool="quay.io/ucsc_cgl/mango:latest", + dockerParameters=docker_parameters, + parameters=parameters) diff --git a/setup.py b/setup.py index 07f177c..363f19f 100644 --- a/setup.py +++ b/setup.py @@ -5,9 +5,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -83,7 +83,7 @@ def importVersion(): new = version_template.expand_() print(new, sys.stderr) - + try: with open('bdgenomics/workflows/version.py') as f: old = f.read() @@ -125,6 +125,8 @@ def importVersion(): 'bdg-deca = bdgenomics.workflows.deca_pipeline.call_cnvs:main', 'bdg-cannoli-bwa = bdgenomics.workflows.cannoli_pipeline.bwa_alignment:main', 'bdg-gatk3-benchmark = bdgenomics.workflows.benchmarking.gatk3_pipeline.preprocessing:main', + 'bdg-mango-browser = bdgenomics.workflows.mango_pipeline.run_mango_browser:main', + 'bdg-mango-notebook = bdgenomics.workflows.mango_pipeline.run_mango_notebook:main', 'bdg-mkdups-benchmark = bdgenomics.workflows.benchmarking.single_node.mkdups:main', 'bdg-sort-benchmark = bdgenomics.workflows.benchmarking.single_node.sort:main', 'bdg-ri-benchmark = bdgenomics.workflows.benchmarking.single_node.realign_indels:main', From d75d5321a66b563728c4c6f124107b12a019457e Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Thu, 26 Oct 2017 11:27:38 -0700 Subject: [PATCH 2/4] updated arguments --- .../mango_pipeline/run_mango_browser.py | 23 +++++++++++++------ bdgenomics/workflows/tools/spark_tools.py | 16 ++++++------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/bdgenomics/workflows/mango_pipeline/run_mango_browser.py b/bdgenomics/workflows/mango_pipeline/run_mango_browser.py index 3b95391..7a0e4d3 100644 --- a/bdgenomics/workflows/mango_pipeline/run_mango_browser.py +++ b/bdgenomics/workflows/mango_pipeline/run_mango_browser.py @@ -115,9 +115,11 @@ def setup_mango_state(job, else: is_s3(reference) + if reads is not None: for f in reads.split(','): is_s3(f) + # browser requires bam files to be indexed if f.endswith('bam'): is_s3(f + '.bai') @@ -224,16 +226,23 @@ def run_mango_browser(job, else: - # TODO what about file formatting? + # holds mango arguments + arguments = [reference] + + if genes: + arguments.extend(['-genes', genes]) - # TODO parameters + if reads: + arguments.extend(['-reads', ','.join(reads)]) + + if variants: + arguments.extend(['-variants', ','.join(variants)]) + + if features: + arguments.extend(['-features', ','.join(features)]) call_mango_browser(job, master_ip=master_ip, - arguments=[reference, - '-genes', genes, - '-reads', ' '.join(reads), - '-variants', ' '.join(variants), - '-features', ' '.join(features)], + arguments=arguments, host=host, port=port, memory=memory, diff --git a/bdgenomics/workflows/tools/spark_tools.py b/bdgenomics/workflows/tools/spark_tools.py index 61aa0e0..870264f 100644 --- a/bdgenomics/workflows/tools/spark_tools.py +++ b/bdgenomics/workflows/tools/spark_tools.py @@ -59,7 +59,7 @@ def _make_parameters(master_ip, default_parameters, memory, arguments, override_ :param memory: The memory to allocate to each Spark driver and executor. :param arguments: Arguments to pass to the submitted job. :param override_parameters: Parameters passed by the user, that override our defaults. - + :type masterIP: MasterAddress :type default_parameters: list of string :type arguments: list of string @@ -72,7 +72,7 @@ def _make_parameters(master_ip, default_parameters, memory, arguments, override_ require((override_parameters is not None or memory is not None) and (override_parameters is None or memory is None), "Either the memory setting must be defined or you must provide Spark configuration parameters.") - + # if the user hasn't provided overrides, set our defaults parameters = [] if memory is not None: @@ -94,8 +94,8 @@ def _make_parameters(master_ip, default_parameters, memory, arguments, override_ # now add the tool arguments and return parameters.extend(arguments) - return parameters - + return parameters + def call_conductor(job, master_ip, src, dst, memory=None, override_parameters=None): """ @@ -242,7 +242,7 @@ def call_deca(job, master_ip, arguments, require(aws_secret_access_key, 'If AWS access key is passed, secret key must be defined') - + docker_parameters.extend(['-e', 'AWS_ACCESS_KEY_ID=%s' % aws_access_key_id, '-e', 'AWS_SECRET_ACCESS_KEY=%s' % aws_secret_access_key]) @@ -263,7 +263,7 @@ def call_deca(job, master_ip, arguments, if work_dir: docker_parameters.extend(['-v', '%s:/data' % work_dir]) - + dockerCall(job=job, tool="quay.io/ucsc_cgl/deca:0.1.0--7d13833a1220001481c4de0489e893c93ee3310f", dockerParameters=docker_parameters, @@ -317,7 +317,7 @@ def call_mango_browser(job, master_ip, arguments, # set max result size to unlimited, see #177 "--conf", "spark.driver.maxResultSize=0", "--conf", "spark.hadoop.hadoopbam.bam.enable-bai-splitter=true", - # "--packages", "com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.4", TODO download failing + "--packages", "com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.4", "--conf", "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem"]) docker_parameters = [] @@ -325,7 +325,7 @@ def call_mango_browser(job, master_ip, arguments, if not run_mac: docker_parameters.extend(['--net=host']) # for accessing localhost else: - # port forwarding because we have not set net host + # port forwarding because we have not set --net=host endpoint = "{}:8080".format(port) docker_parameters.extend(['-p', endpoint]) From c501ec27ada9a7b8610f11d5f89c8305aff84f52 Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Fri, 27 Oct 2017 12:51:24 -0700 Subject: [PATCH 3/4] minor fixes --- .../workflows/deca_pipeline/call_cnvs.py | 10 +++--- .../{mango_pipeline => mango}/__init__.py | 0 .../run_mango_browser.py | 11 ++++--- .../run_mango_notebook.py | 20 +++++++++--- bdgenomics/workflows/tools/functions.py | 31 ++++++++++++++----- bdgenomics/workflows/tools/spark_tools.py | 25 +++++++-------- setup.py | 4 +-- 7 files changed, 64 insertions(+), 37 deletions(-) rename bdgenomics/workflows/{mango_pipeline => mango}/__init__.py (100%) rename bdgenomics/workflows/{mango_pipeline => mango}/run_mango_browser.py (98%) rename bdgenomics/workflows/{mango_pipeline => mango}/run_mango_notebook.py (93%) diff --git a/bdgenomics/workflows/deca_pipeline/call_cnvs.py b/bdgenomics/workflows/deca_pipeline/call_cnvs.py index 1713c15..fbc2254 100644 --- a/bdgenomics/workflows/deca_pipeline/call_cnvs.py +++ b/bdgenomics/workflows/deca_pipeline/call_cnvs.py @@ -32,7 +32,7 @@ from bdgenomics.workflows.spark import spawn_spark_cluster -from bdgenomics.workflows.tools.functions import is_s3 +from bdgenomics.workflows.tools.functions import is_s3a from bdgenomics.workflows.tools.spark_tools import call_deca, \ MasterAddress, \ HDFS_MASTER_PORT, \ @@ -74,11 +74,11 @@ def setup_deca_state(job, else: - is_s3(targets) - is_s3(output) + is_s3a(targets) + is_s3a(output) for f in input_files: - is_s3(f) - + is_s3a(f) + # launch the spark cluster master_ip = spawn_spark_cluster(job, int(num_nodes) - 1, diff --git a/bdgenomics/workflows/mango_pipeline/__init__.py b/bdgenomics/workflows/mango/__init__.py similarity index 100% rename from bdgenomics/workflows/mango_pipeline/__init__.py rename to bdgenomics/workflows/mango/__init__.py diff --git a/bdgenomics/workflows/mango_pipeline/run_mango_browser.py b/bdgenomics/workflows/mango/run_mango_browser.py similarity index 98% rename from bdgenomics/workflows/mango_pipeline/run_mango_browser.py rename to bdgenomics/workflows/mango/run_mango_browser.py index 7a0e4d3..dd8adfd 100644 --- a/bdgenomics/workflows/mango_pipeline/run_mango_browser.py +++ b/bdgenomics/workflows/mango/run_mango_browser.py @@ -114,22 +114,23 @@ def setup_mango_state(job, else: - is_s3(reference) + is_s3a(reference) + if reads is not None: for f in reads.split(','): - is_s3(f) + is_s3a(f) # browser requires bam files to be indexed if f.endswith('bam'): - is_s3(f + '.bai') + is_s3a(f + '.bai') if variants is not None: for f in variants.split(','): - is_s3(f) + is_s3a(f) if features is not None: for f in features.split(','): - is_s3(f) + is_s3a(f) # launch the spark cluster master_ip = spawn_spark_cluster(job, diff --git a/bdgenomics/workflows/mango_pipeline/run_mango_notebook.py b/bdgenomics/workflows/mango/run_mango_notebook.py similarity index 93% rename from bdgenomics/workflows/mango_pipeline/run_mango_notebook.py rename to bdgenomics/workflows/mango/run_mango_notebook.py index 146aaa5..0800411 100644 --- a/bdgenomics/workflows/mango_pipeline/run_mango_notebook.py +++ b/bdgenomics/workflows/mango/run_mango_notebook.py @@ -32,7 +32,7 @@ from bdgenomics.workflows.spark import spawn_spark_cluster -from bdgenomics.workflows.tools.functions import is_s3 +from bdgenomics.workflows.tools.functions import is_s3a from bdgenomics.workflows.tools.spark_tools import call_mango_notebook, \ MasterAddress, \ HDFS_MASTER_PORT, \ @@ -99,6 +99,7 @@ def run_mango_notebook(job, # TODO: NOT SURE IF WE NEED THIS WHEN NET-HOST IS SET arguments.append('--ip=0.0.0.0') + arguments.append('--NotebookApp.token=') call_mango_notebook(job, master_ip=None, arguments=arguments, memory=memory, @@ -158,9 +159,7 @@ def main(): 'thus must be greater than 1. %s was passed.' % args.num_nodes) - _log.info("startToil") - - Job.Runner.startToil(Job.wrapJobFn(setup_mango_state, + job = Job.wrapJobFn(setup_mango_state, args.host, args.port, args.memory, @@ -168,7 +167,18 @@ def main(): args.run_mac, args.num_nodes, args.aws_access_key, - args.aws_secret_key), args) + args.aws_secret_key) + + # Notebook is always forced shutdown with keyboard interrupt. + # Always clean up after this process. + args.clean = "always" + + try: + Job.Runner.startToil(job, args) + + except KeyboardInterrupt: + _log.info("Shut down notebook job.") + if __name__ == "__main__": diff --git a/bdgenomics/workflows/tools/functions.py b/bdgenomics/workflows/tools/functions.py index 378035c..8596081 100644 --- a/bdgenomics/workflows/tools/functions.py +++ b/bdgenomics/workflows/tools/functions.py @@ -1,11 +1,28 @@ -""" -Functions used across pipelines +#!/usr/bin/env python2.7 +# +# Licensed to Big Data Genomics (BDG) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The BDG licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -@author Alyssa Morrow -""" from toil_lib import require -# all files must have s3 urls -def is_s3(f): +def is_s3a(f): + """ + Checks if a file has s3a URL. + + :param str f: URL + """ require(f.startswith("s3a"), - "url for file %s did not start with s3a scheme" % f) + "url for file %s did not start with s3a scheme" % f) diff --git a/bdgenomics/workflows/tools/spark_tools.py b/bdgenomics/workflows/tools/spark_tools.py index 870264f..73c7743 100644 --- a/bdgenomics/workflows/tools/spark_tools.py +++ b/bdgenomics/workflows/tools/spark_tools.py @@ -10,7 +10,6 @@ from subprocess import check_call, check_output from toil.lib.docker import dockerCall - from toil_lib import require SPARK_MASTER_PORT = "7077" @@ -285,7 +284,7 @@ def call_mango_browser(job, master_ip, arguments, aws_access_key_id=None, aws_secret_access_key=None): """ - Invokes the Mango browsercontainer. Find mango at https://github.com/bigdatagenomics/mango. + Invokes the Mango browser container. Find mango at https://github.com/bigdatagenomics/mango. :param toil.Job.job job: The Toil Job calling this function :param masterIP: The Spark leader IP address. @@ -361,12 +360,17 @@ def call_mango_browser(job, master_ip, arguments, arguments, override_parameters) + job.fileStore.logToMaster("Starting the merge sort") + job.fileStore.logToMaster(__name__) - dockerCall(job=job, - tool="quay.io/ucsc_cgl/mango:latest", - dockerParameters=docker_parameters, - parameters=parameters) + try: + dockerCall(job=job, + tool="quay.io/ucsc_cgl/mango:latest", + dockerParameters=docker_parameters, + parameters=parameters) + except: + job.fileStore.logToMaster("docker exited") def call_mango_notebook(job, master_ip, arguments, @@ -380,7 +384,7 @@ def call_mango_notebook(job, master_ip, arguments, aws_access_key_id=None, aws_secret_access_key=None): """ - Invokes the Mango browsercontainer. Find mango at https://github.com/bigdatagenomics/mango. + Invokes the Mango browser container. Find mango at https://github.com/bigdatagenomics/mango. :param toil.Job.job job: The Toil Job calling this function :param masterIP: The Spark leader IP address. @@ -412,8 +416,7 @@ def call_mango_notebook(job, master_ip, arguments, # set max result size to unlimited, see #177 "--conf", "spark.driver.maxResultSize=0", "--conf", "spark.hadoop.hadoopbam.bam.enable-bai-splitter=true", - # TODO broken package - # "--packages", "com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.4", + "--packages", "com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.4", "--conf", "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem"]) docker_parameters = [] @@ -458,10 +461,6 @@ def call_mango_notebook(job, master_ip, arguments, memory, arguments, override_parameters) - - print(parameters) - - dockerCall(job=job, tool="quay.io/ucsc_cgl/mango:latest", dockerParameters=docker_parameters, diff --git a/setup.py b/setup.py index 363f19f..477657e 100644 --- a/setup.py +++ b/setup.py @@ -125,8 +125,8 @@ def importVersion(): 'bdg-deca = bdgenomics.workflows.deca_pipeline.call_cnvs:main', 'bdg-cannoli-bwa = bdgenomics.workflows.cannoli_pipeline.bwa_alignment:main', 'bdg-gatk3-benchmark = bdgenomics.workflows.benchmarking.gatk3_pipeline.preprocessing:main', - 'bdg-mango-browser = bdgenomics.workflows.mango_pipeline.run_mango_browser:main', - 'bdg-mango-notebook = bdgenomics.workflows.mango_pipeline.run_mango_notebook:main', + 'bdg-mango-browser = bdgenomics.workflows.mango.run_mango_browser:main', + 'bdg-mango-notebook = bdgenomics.workflows.mango.run_mango_notebook:main', 'bdg-mkdups-benchmark = bdgenomics.workflows.benchmarking.single_node.mkdups:main', 'bdg-sort-benchmark = bdgenomics.workflows.benchmarking.single_node.sort:main', 'bdg-ri-benchmark = bdgenomics.workflows.benchmarking.single_node.realign_indels:main', From ce8400633a29f4bec3609535566fe99898e2bfdb Mon Sep 17 00:00:00 2001 From: Alyssa Morrow Date: Tue, 7 Nov 2017 11:05:07 -0800 Subject: [PATCH 4/4] cleanup --- bdgenomics/workflows/mango/run_mango_browser.py | 6 ++---- bdgenomics/workflows/tools/spark_tools.py | 7 +------ 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/bdgenomics/workflows/mango/run_mango_browser.py b/bdgenomics/workflows/mango/run_mango_browser.py index dd8adfd..55fa457 100644 --- a/bdgenomics/workflows/mango/run_mango_browser.py +++ b/bdgenomics/workflows/mango/run_mango_browser.py @@ -30,9 +30,9 @@ from toil_lib.files import generate_file, move_files from toil_lib.urls import download_url_job - +from bdgenomics.workflows.adam_pipeline from bdgenomics.workflows.spark import spawn_spark_cluster -from bdgenomics.workflows.tools.functions import is_s3 +from bdgenomics.workflows.tools.functions import is_s3a from bdgenomics.workflows.tools.spark_tools import call_mango_browser, \ MasterAddress, \ HDFS_MASTER_PORT, \ @@ -61,7 +61,6 @@ def setup_mango_state(job, loaded_reference = (file_name, file_id.rv()) - loaded_reads = [] if reads is not None: @@ -116,7 +115,6 @@ def setup_mango_state(job, is_s3a(reference) - if reads is not None: for f in reads.split(','): is_s3a(f) diff --git a/bdgenomics/workflows/tools/spark_tools.py b/bdgenomics/workflows/tools/spark_tools.py index 73c7743..2c1725e 100644 --- a/bdgenomics/workflows/tools/spark_tools.py +++ b/bdgenomics/workflows/tools/spark_tools.py @@ -313,7 +313,6 @@ def call_mango_browser(job, master_ip, arguments, pass default_params = (master + [ - # set max result size to unlimited, see #177 "--conf", "spark.driver.maxResultSize=0", "--conf", "spark.hadoop.hadoopbam.bam.enable-bai-splitter=true", "--packages", "com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.4", @@ -336,10 +335,6 @@ def call_mango_browser(job, master_ip, arguments, docker_parameters.extend(['-e', 'AWS_ACCESS_KEY_ID=%s' % aws_access_key_id, '-e', 'AWS_SECRET_ACCESS_KEY=%s' % aws_secret_access_key]) - default_params.extend( - [ - "--conf", "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem"]) - for scheme in ['s3', 's3n']: default_params.extend([ "--conf", "spark.hadoop.fs.%s.awsAccessKeyId=%s" % (scheme, aws_access_key_id), @@ -428,7 +423,7 @@ def call_mango_notebook(job, master_ip, arguments, docker_parameters.extend(['-p', endpoint]) # reconfigure entrypoint for notebook - docker_parameters.extend(['--entrypoint=/home/mango/bin/mango-notebook']) + docker_parameters.extend(['--entrypoint=/opt/cgl-docker-lib/mango/bin/mango-notebook']) if aws_access_key_id: