Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SparkConf object to configure SparkContext #223

Merged
merged 1 commit into from
Apr 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/SparkCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.kohsuke.args4j.{ Option => Args4jOption }
import java.util
import scala.collection.JavaConversions._
import org.apache.spark.SparkContext
import org.bdgenomics.adam.serialization.ADAMKryoProperties
import org.bdgenomics.adam.rdd.ADAMContext

trait SparkArgs extends Args4jBase {
Expand All @@ -44,15 +43,31 @@ trait SparkArgs extends Args4jBase {

trait SparkCommand extends ADAMCommand {

/**
* Commandline format is -spark_env foo=1 -spark_env bar=2
* @param envVariables The variables found on the commandline
* @return
*/
def parseEnvVariables(envVariables: util.ArrayList[String]): Array[(String, String)] = {
envVariables.foldLeft(Array[(String, String)]()) {
(a, kv) =>
val kvSplit = kv.split("=")
if (kvSplit.size != 2) {
throw new IllegalArgumentException("Env variables should be key=value syntax, e.g. -spark_env foo=bar")
}
a :+ (kvSplit(0), kvSplit(1))
}
}

def createSparkContext(args: SparkArgs): SparkContext = {
ADAMContext.createSparkContext(
companion.commandName,
args.spark_master,
args.spark_home,
args.spark_jars,
args.spark_env_vars,
args.spark_add_stats_listener,
args.spark_kryo_buffer_size)
name = companion.commandName,
master = args.spark_master,
sparkHome = args.spark_home,
sparkJars = args.spark_jars,
sparkEnvVars = parseEnvVariables(args.spark_env_vars),
sparkAddStatsListener = args.spark_add_stats_listener,
sparkKryoBufferSize = args.spark_kryo_buffer_size)
}

}
71 changes: 38 additions & 33 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,6 @@
*/
package org.bdgenomics.adam.rdd

import fi.tkk.ics.hadoop.bam.{ SAMRecordWritable, AnySAMInputFormat, VariantContextWritable, VCFInputFormat }
import fi.tkk.ics.hadoop.bam.util.{ SAMHeaderReader, VCFHeaderReader, WrapSeekable }
import java.util.regex.Pattern
import net.sf.samtools.SAMFileHeader
import org.apache.hadoop.fs.FileSystem
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecord
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ LongWritable, Text }
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{ Logging, SparkContext }
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.StatsReportListener
import org.bdgenomics.adam.avro.{
ADAMPileup,
ADAMRecord,
Expand All @@ -42,7 +29,18 @@ import org.bdgenomics.adam.projections.{
ADAMNucleotideContigFragmentField
}
import org.bdgenomics.adam.rich.RichADAMRecord
import org.bdgenomics.adam.serialization.ADAMKryoProperties
import fi.tkk.ics.hadoop.bam.{ SAMRecordWritable, AnySAMInputFormat }
import fi.tkk.ics.hadoop.bam.util.SAMHeaderReader
import java.util.regex.Pattern
import net.sf.samtools.SAMFileHeader
import org.apache.hadoop.fs.FileSystem
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecord
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ Text, LongWritable }
import org.apache.spark.{ SparkConf, Logging, SparkContext }
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.StatsReportListener
import parquet.avro.{ AvroParquetInputFormat, AvroReadSupport }
import parquet.filter.UnboundRecordFilter
import parquet.hadoop.ParquetInputFormat
Expand Down Expand Up @@ -115,28 +113,35 @@ object ADAMContext {
*/
def createSparkContext(name: String,
master: String,
sparkHome: String,
sparkJars: Seq[String],
sparkEnvVars: Seq[String],
sparkHome: String = null,
sparkJars: Seq[String] = Nil,
sparkEnvVars: Seq[(String, String)] = Nil,
sparkAddStatsListener: Boolean = false,
sparkKryoBufferSize: Int = 4): SparkContext = {
ADAMKryoProperties.setupContextProperties(sparkKryoBufferSize)
val appName = "adam: " + name
val environment: Map[String, String] = if (sparkEnvVars.isEmpty) {
Map()
} else {
sparkEnvVars.map {
kv =>
val kvSplit = kv.split("=")
if (kvSplit.size != 2) {
throw new IllegalArgumentException("Env variables should be key=value syntax, e.g. -spark_env foo=bar")
}
(kvSplit(0), kvSplit(1))
}.toMap
sparkKryoBufferSize: Int = 4,
loadSystemValues: Boolean = true,
sparkDriverPort: Option[Int] = None): SparkContext = {
// TODO: Add enough spark arguments so that we don't need to load the system values (e.g. SPARK_MEM)
val config: SparkConf = new SparkConf(loadSystemValues).setAppName("adam: " + name).setMaster(master)
if (sparkHome != null)
config.setSparkHome(sparkHome)
if (sparkJars != Nil)
config.setJars(sparkJars)
if (sparkEnvVars != Nil)
config.setExecutorEnv(sparkEnvVars)

// Optionally set the spark driver port
sparkDriverPort match {
case Some(port) => config.set("spark.driver.port", port.toString)
case None =>
}

val jars: Seq[String] = if (sparkJars.isEmpty) Nil else sparkJars
val sc = new SparkContext(master, appName, sparkHome, jars, environment)
// Setup the Kryo settings
config.setAll(Array(("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
("spark.kryo.registrator", "org.bdgenomics.adam.serialization.ADAMKryoRegistrator"),
("spark.kryoserializer.buffer.mb", sparkKryoBufferSize.toString),
("spark.kryo.referenceTracking", "false")))

val sc = new SparkContext(config)

if (sparkAddStatsListener) {
sc.addSparkListener(new StatsReportListener)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.bdgenomics.adam.predicates

import org.bdgenomics.adam.util.{ ParquetLogger, SparkFunSuite }
import org.scalatest.BeforeAndAfter
import java.util.logging.Level
import java.io.File
import org.bdgenomics.adam.avro.{
Expand All @@ -31,10 +30,9 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import com.google.common.io.Files
import org.apache.commons.io.FileUtils

class GenotypePredicatesSuite extends SparkFunSuite with BeforeAndAfter {
var genotypesParquetFile: File = null
class GenotypePredicatesSuite extends SparkFunSuite {

sparkBefore("genotypepredicatessuite_before") {
sparkTest("Return only PASSing records") {
ParquetLogger.hadoopLoggerLevel(Level.SEVERE)

val v0 = ADAMVariant.newBuilder
Expand All @@ -56,21 +54,14 @@ class GenotypePredicatesSuite extends SparkFunSuite with BeforeAndAfter {
.setVariant(v0)
.setVariantCallingAnnotations(failFilterAnnotation).build()))

genotypesParquetFile = new File(Files.createTempDir(), "genotypes")
if (genotypesParquetFile.exists())
FileUtils.deleteDirectory(genotypesParquetFile.getParentFile)

val genotypesParquetFile = new File(Files.createTempDir(), "genotypes")
genotypes.adamSave(genotypesParquetFile.getAbsolutePath)
}

after {
FileUtils.deleteDirectory(genotypesParquetFile.getParentFile)
}

sparkTest("Return only PASSing records") {
val gts1: RDD[ADAMGenotype] = sc.adamLoad(
genotypesParquetFile.getAbsolutePath,
predicate = Some(classOf[GenotypeVarFilterPASSPredicate]))
assert(gts1.count === 1)

FileUtils.deleteDirectory(genotypesParquetFile.getParentFile)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,36 @@ import org.scalatest.{ BeforeAndAfter, FunSuite }
import org.apache.spark.SparkContext
import java.net.ServerSocket
import org.apache.log4j.Level
import org.bdgenomics.adam.serialization.ADAMKryoProperties
import org.bdgenomics.adam.rdd.ADAMContext

object SparkTest extends org.scalatest.Tag("org.bdgenomics.adam.util.SparkFunSuite")

trait SparkFunSuite extends FunSuite with BeforeAndAfter {

val sparkPortProperty = "spark.driver.port"

var sc: SparkContext = _
var maybeLevels: Option[Map[String, Level]] = None

def createSpark(sparkName: String, silenceSpark: Boolean = true): SparkContext = {
// Use the same context properties as ADAM commands
ADAMKryoProperties.setupContextProperties()
def setupSparkContext(sparkName: String, silenceSpark: Boolean = true) {
// Silence the Spark logs if requested
maybeLevels = if (silenceSpark) Some(SparkLogUtil.silenceSpark()) else None
synchronized {
// Find an unused port
val s = new ServerSocket(0)
System.setProperty(sparkPortProperty, s.getLocalPort.toString)
// Allow Spark to take the port we just discovered
val driverPort = Some(s.getLocalPort)
s.close()

// Create a spark context
new SparkContext("local[4]", sparkName)
sc = ADAMContext.createSparkContext(
name = "adam: " + sparkName,
master = "local[4]",
loadSystemValues = false,
sparkDriverPort = driverPort)
}
}

def destroySpark() {
def teardownSparkContext() {
// Stop the context
sc.stop()
sc = null

// See notes at:
// http://blog.quantifind.com/posts/spark-unit-test/
// That post calls for clearing 'spark.master.port', but this thread
// https://groups.google.com/forum/#!topic/spark-users/MeVzgoJXm8I
// suggests that the property was renamed 'spark.driver.port'
System.clearProperty(sparkPortProperty)

maybeLevels match {
case None =>
case Some(levels) =>
Expand All @@ -70,36 +60,36 @@ trait SparkFunSuite extends FunSuite with BeforeAndAfter {

def sparkBefore(beforeName: String, silenceSpark: Boolean = true)(body: => Unit) {
before {
sc = createSpark(beforeName, silenceSpark)
setupSparkContext(beforeName, silenceSpark)
try {
// Run the before block
body
} finally {
destroySpark()
teardownSparkContext()
}
}
}

def sparkAfter(beforeName: String, silenceSpark: Boolean = true)(body: => Unit) {
after {
sc = createSpark(beforeName, silenceSpark)
setupSparkContext(beforeName, silenceSpark)
try {
// Run the after block
body
} finally {
destroySpark()
teardownSparkContext()
}
}
}

def sparkTest(name: String, silenceSpark: Boolean = true)(body: => Unit) {
test(name, SparkTest) {
sc = createSpark(name, silenceSpark)
setupSparkContext(name, silenceSpark)
try {
// Run the test
body
} finally {
destroySpark()
teardownSparkContext()
}
}
}
Expand Down