diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/SparkCommand.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/SparkCommand.scala index 0cd03ff944..55dedab38e 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/SparkCommand.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/SparkCommand.scala @@ -20,7 +20,6 @@ import org.kohsuke.args4j.{ Option => Args4jOption } import java.util import scala.collection.JavaConversions._ import org.apache.spark.SparkContext -import org.bdgenomics.adam.serialization.ADAMKryoProperties import org.bdgenomics.adam.rdd.ADAMContext trait SparkArgs extends Args4jBase { @@ -44,15 +43,31 @@ trait SparkArgs extends Args4jBase { trait SparkCommand extends ADAMCommand { + /** + * Commandline format is -spark_env foo=1 -spark_env bar=2 + * @param envVariables The variables found on the commandline + * @return + */ + def parseEnvVariables(envVariables: util.ArrayList[String]): Array[(String, String)] = { + envVariables.foldLeft(Array[(String, String)]()) { + (a, kv) => + val kvSplit = kv.split("=") + if (kvSplit.size != 2) { + throw new IllegalArgumentException("Env variables should be key=value syntax, e.g. -spark_env foo=bar") + } + a :+ (kvSplit(0), kvSplit(1)) + } + } + def createSparkContext(args: SparkArgs): SparkContext = { ADAMContext.createSparkContext( - companion.commandName, - args.spark_master, - args.spark_home, - args.spark_jars, - args.spark_env_vars, - args.spark_add_stats_listener, - args.spark_kryo_buffer_size) + name = companion.commandName, + master = args.spark_master, + sparkHome = args.spark_home, + sparkJars = args.spark_jars, + sparkEnvVars = parseEnvVariables(args.spark_env_vars), + sparkAddStatsListener = args.spark_add_stats_listener, + sparkKryoBufferSize = args.spark_kryo_buffer_size) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index aee1afd4bb..666c090b83 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -15,19 +15,6 @@ */ package org.bdgenomics.adam.rdd -import fi.tkk.ics.hadoop.bam.{ SAMRecordWritable, AnySAMInputFormat, VariantContextWritable, VCFInputFormat } -import fi.tkk.ics.hadoop.bam.util.{ SAMHeaderReader, VCFHeaderReader, WrapSeekable } -import java.util.regex.Pattern -import net.sf.samtools.SAMFileHeader -import org.apache.hadoop.fs.FileSystem -import org.apache.avro.Schema -import org.apache.avro.specific.SpecificRecord -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{ LongWritable, Text } -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.{ Logging, SparkContext } -import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.StatsReportListener import org.bdgenomics.adam.avro.{ ADAMPileup, ADAMRecord, @@ -42,7 +29,18 @@ import org.bdgenomics.adam.projections.{ ADAMNucleotideContigFragmentField } import org.bdgenomics.adam.rich.RichADAMRecord -import org.bdgenomics.adam.serialization.ADAMKryoProperties +import fi.tkk.ics.hadoop.bam.{ SAMRecordWritable, AnySAMInputFormat } +import fi.tkk.ics.hadoop.bam.util.SAMHeaderReader +import java.util.regex.Pattern +import net.sf.samtools.SAMFileHeader +import org.apache.hadoop.fs.FileSystem +import org.apache.avro.Schema +import org.apache.avro.specific.SpecificRecord +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{ Text, LongWritable } +import org.apache.spark.{ SparkConf, Logging, SparkContext } +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.StatsReportListener import parquet.avro.{ AvroParquetInputFormat, AvroReadSupport } import parquet.filter.UnboundRecordFilter import parquet.hadoop.ParquetInputFormat @@ -115,28 +113,35 @@ object ADAMContext { */ def createSparkContext(name: String, master: String, - sparkHome: String, - sparkJars: Seq[String], - sparkEnvVars: Seq[String], + sparkHome: String = null, + sparkJars: Seq[String] = Nil, + sparkEnvVars: Seq[(String, String)] = Nil, sparkAddStatsListener: Boolean = false, - sparkKryoBufferSize: Int = 4): SparkContext = { - ADAMKryoProperties.setupContextProperties(sparkKryoBufferSize) - val appName = "adam: " + name - val environment: Map[String, String] = if (sparkEnvVars.isEmpty) { - Map() - } else { - sparkEnvVars.map { - kv => - val kvSplit = kv.split("=") - if (kvSplit.size != 2) { - throw new IllegalArgumentException("Env variables should be key=value syntax, e.g. -spark_env foo=bar") - } - (kvSplit(0), kvSplit(1)) - }.toMap + sparkKryoBufferSize: Int = 4, + loadSystemValues: Boolean = true, + sparkDriverPort: Option[Int] = None): SparkContext = { + // TODO: Add enough spark arguments so that we don't need to load the system values (e.g. SPARK_MEM) + val config: SparkConf = new SparkConf(loadSystemValues).setAppName("adam: " + name).setMaster(master) + if (sparkHome != null) + config.setSparkHome(sparkHome) + if (sparkJars != Nil) + config.setJars(sparkJars) + if (sparkEnvVars != Nil) + config.setExecutorEnv(sparkEnvVars) + + // Optionally set the spark driver port + sparkDriverPort match { + case Some(port) => config.set("spark.driver.port", port.toString) + case None => } - val jars: Seq[String] = if (sparkJars.isEmpty) Nil else sparkJars - val sc = new SparkContext(master, appName, sparkHome, jars, environment) + // Setup the Kryo settings + config.setAll(Array(("spark.serializer", "org.apache.spark.serializer.KryoSerializer"), + ("spark.kryo.registrator", "org.bdgenomics.adam.serialization.ADAMKryoRegistrator"), + ("spark.kryoserializer.buffer.mb", sparkKryoBufferSize.toString), + ("spark.kryo.referenceTracking", "false"))) + + val sc = new SparkContext(config) if (sparkAddStatsListener) { sc.addSparkListener(new StatsReportListener) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoProperties.scala b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoProperties.scala deleted file mode 100644 index 76f01e140c..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoProperties.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2013. Regents of the University of California - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.bdgenomics.adam.serialization - -object ADAMKryoProperties { - - /** - * Sets up serialization properties for ADAM. - * - * @param kryoBufferSize Buffer size in MB to allocate for object serialization. Default is 4MB. - */ - def setupContextProperties(kryoBufferSize: Int = 4) = { - System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - System.setProperty("spark.kryo.registrator", "org.bdgenomics.adam.serialization.ADAMKryoRegistrator") - System.setProperty("spark.kryoserializer.buffer.mb", kryoBufferSize.toString) - System.setProperty("spark.kryo.referenceTracking", "false") - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/predicates/GenotypePredicatesSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/predicates/GenotypePredicatesSuite.scala index ee060fe484..26268121a7 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/predicates/GenotypePredicatesSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/predicates/GenotypePredicatesSuite.scala @@ -17,7 +17,6 @@ package org.bdgenomics.adam.predicates import org.bdgenomics.adam.util.{ ParquetLogger, SparkFunSuite } -import org.scalatest.BeforeAndAfter import java.util.logging.Level import java.io.File import org.bdgenomics.adam.avro.{ @@ -31,10 +30,9 @@ import org.bdgenomics.adam.rdd.ADAMContext._ import com.google.common.io.Files import org.apache.commons.io.FileUtils -class GenotypePredicatesSuite extends SparkFunSuite with BeforeAndAfter { - var genotypesParquetFile: File = null +class GenotypePredicatesSuite extends SparkFunSuite { - sparkBefore("genotypepredicatessuite_before") { + sparkTest("Return only PASSing records") { ParquetLogger.hadoopLoggerLevel(Level.SEVERE) val v0 = ADAMVariant.newBuilder @@ -56,21 +54,14 @@ class GenotypePredicatesSuite extends SparkFunSuite with BeforeAndAfter { .setVariant(v0) .setVariantCallingAnnotations(failFilterAnnotation).build())) - genotypesParquetFile = new File(Files.createTempDir(), "genotypes") - if (genotypesParquetFile.exists()) - FileUtils.deleteDirectory(genotypesParquetFile.getParentFile) - + val genotypesParquetFile = new File(Files.createTempDir(), "genotypes") genotypes.adamSave(genotypesParquetFile.getAbsolutePath) - } - after { - FileUtils.deleteDirectory(genotypesParquetFile.getParentFile) - } - - sparkTest("Return only PASSing records") { val gts1: RDD[ADAMGenotype] = sc.adamLoad( genotypesParquetFile.getAbsolutePath, predicate = Some(classOf[GenotypeVarFilterPASSPredicate])) assert(gts1.count === 1) + + FileUtils.deleteDirectory(genotypesParquetFile.getParentFile) } } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/util/SparkFunSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/util/SparkFunSuite.scala index 0ecd91e59a..c4191589c7 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/util/SparkFunSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/util/SparkFunSuite.scala @@ -19,46 +19,36 @@ import org.scalatest.{ BeforeAndAfter, FunSuite } import org.apache.spark.SparkContext import java.net.ServerSocket import org.apache.log4j.Level -import org.bdgenomics.adam.serialization.ADAMKryoProperties +import org.bdgenomics.adam.rdd.ADAMContext object SparkTest extends org.scalatest.Tag("org.bdgenomics.adam.util.SparkFunSuite") trait SparkFunSuite extends FunSuite with BeforeAndAfter { - val sparkPortProperty = "spark.driver.port" - var sc: SparkContext = _ var maybeLevels: Option[Map[String, Level]] = None - def createSpark(sparkName: String, silenceSpark: Boolean = true): SparkContext = { - // Use the same context properties as ADAM commands - ADAMKryoProperties.setupContextProperties() + def setupSparkContext(sparkName: String, silenceSpark: Boolean = true) { // Silence the Spark logs if requested maybeLevels = if (silenceSpark) Some(SparkLogUtil.silenceSpark()) else None synchronized { // Find an unused port val s = new ServerSocket(0) - System.setProperty(sparkPortProperty, s.getLocalPort.toString) - // Allow Spark to take the port we just discovered + val driverPort = Some(s.getLocalPort) s.close() - - // Create a spark context - new SparkContext("local[4]", sparkName) + sc = ADAMContext.createSparkContext( + name = "adam: " + sparkName, + master = "local[4]", + loadSystemValues = false, + sparkDriverPort = driverPort) } } - def destroySpark() { + def teardownSparkContext() { // Stop the context sc.stop() sc = null - // See notes at: - // http://blog.quantifind.com/posts/spark-unit-test/ - // That post calls for clearing 'spark.master.port', but this thread - // https://groups.google.com/forum/#!topic/spark-users/MeVzgoJXm8I - // suggests that the property was renamed 'spark.driver.port' - System.clearProperty(sparkPortProperty) - maybeLevels match { case None => case Some(levels) => @@ -70,36 +60,36 @@ trait SparkFunSuite extends FunSuite with BeforeAndAfter { def sparkBefore(beforeName: String, silenceSpark: Boolean = true)(body: => Unit) { before { - sc = createSpark(beforeName, silenceSpark) + setupSparkContext(beforeName, silenceSpark) try { // Run the before block body } finally { - destroySpark() + teardownSparkContext() } } } def sparkAfter(beforeName: String, silenceSpark: Boolean = true)(body: => Unit) { after { - sc = createSpark(beforeName, silenceSpark) + setupSparkContext(beforeName, silenceSpark) try { // Run the after block body } finally { - destroySpark() + teardownSparkContext() } } } def sparkTest(name: String, silenceSpark: Boolean = true)(body: => Unit) { test(name, SparkTest) { - sc = createSpark(name, silenceSpark) + setupSparkContext(name, silenceSpark) try { // Run the test body } finally { - destroySpark() + teardownSparkContext() } } }