Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable

import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.scalatest.PrivateMethodTester

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config
Expand All @@ -37,20 +37,24 @@ import org.apache.spark.util.ManualClock
*/
class ExecutorAllocationManagerSuite
extends SparkFunSuite
with LocalSparkContext
with BeforeAndAfter {
with LocalSparkContext {

import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._

private val contexts = new mutable.ListBuffer[SparkContext]()

before {
override def beforeEach(): Unit = {
super.beforeEach()
contexts.clear()
}

after {
contexts.foreach(_.stop())
override def afterEach(): Unit = {
try {
contexts.foreach(_.stop())
} finally {
super.afterEach()
}
}

private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = {
Expand Down Expand Up @@ -281,7 +285,7 @@ class ExecutorAllocationManagerSuite
assert(totalRunningTasks(manager) === 0)
}

test("cancel pending executors when no longer needed") {
testRetry("cancel pending executors when no longer needed") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark

import scala.concurrent.duration._

import org.scalatest.Assertions
import org.scalatest.concurrent.Eventually._

import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -58,10 +61,12 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext {
test("getRDDStorageInfo only reports on RDDs that actually persist data") {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
assert(sc.getRDDStorageInfo.size === 0)
assert(sc.getRDDStorageInfo.length === 0)
rdd.collect()
sc.listenerBus.waitUntilEmpty(10000)
assert(sc.getRDDStorageInfo.size === 1)
eventually(timeout(10.seconds), interval(100.milliseconds)) {
assert(sc.getRDDStorageInfo.length === 1)
}
assert(sc.getRDDStorageInfo.head.isCached)
assert(sc.getRDDStorageInfo.head.memSize > 0)
assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY)
Expand Down
46 changes: 45 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package org.apache.spark
// scalastyle:off
import java.io.File

import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
import scala.annotation.tailrec

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Outcome}

import org.apache.spark.internal.Logging
import org.apache.spark.util.AccumulatorContext
Expand Down Expand Up @@ -52,6 +54,7 @@ import org.apache.spark.util.AccumulatorContext
abstract class SparkFunSuite
extends FunSuite
with BeforeAndAfterAll
with BeforeAndAfterEach
with ThreadAudit
with Logging {
// scalastyle:on
Expand Down Expand Up @@ -87,6 +90,47 @@ abstract class SparkFunSuite
getTestResourceFile(file).getCanonicalPath
}

/**
* Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to
* set up and tear down resources.
*/
def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = {
test(s) {
retry(n) {
body
}
}
}

/**
* Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to
* set up and tear down resources.
*/
def retry[T](n: Int)(body: => T): T = {
if (this.isInstanceOf[BeforeAndAfter]) {
throw new UnsupportedOperationException(
s"testRetry/retry cannot be used with ${classOf[BeforeAndAfter]}. " +
s"Please use ${classOf[BeforeAndAfterEach]} instead.")
}
retry0(n, n)(body)
}

@tailrec private final def retry0[T](n: Int, n0: Int)(body: => T): T = {
try body
catch { case e: Throwable =>
if (n > 0) {
logWarning(e.getMessage, e)
logInfo(s"\n\n===== RETRY #${n0 - n + 1} =====\n")
// Reset state before re-attempting in order so that tests which use patterns like
// LocalSparkContext to clean up state can work correctly when retried.
afterEach()
beforeEach()
retry0(n-1, n0)(body)
}
else throw e
}
}

/**
* Log the suite name and the test name before and after each test.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.JobExecutionStatus._

class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkContext {

test("basic status API usage") {
testRetry("basic status API usage") {
sc = new SparkContext("local", "test", new SparkConf(false))
val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync()
val jobId: Int = eventually(timeout(10 seconds)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.json4s.jackson.JsonMethods._
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, argThat}
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._

Expand All @@ -48,16 +47,21 @@ import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}

class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

private var testDir: File = null

before {
override def beforeEach(): Unit = {
super.beforeEach()
testDir = Utils.createTempDir(namePrefix = s"a b%20c+d")
}

after {
Utils.deleteRecursively(testDir)
override def afterEach(): Unit = {
try {
Utils.deleteRecursively(testDir)
} finally {
super.afterEach()
}
}

/** Create a fake log file using the new log format used in Spark 1.3+ */
Expand Down Expand Up @@ -487,15 +491,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
provider.inSafeMode = false
clock.setTime(10000)

eventually(timeout(1 second), interval(10 millis)) {
eventually(timeout(3.second), interval(10.milliseconds)) {
provider.getConfig().keys should not contain ("HDFS State")
}
} finally {
provider.stop()
}
}

test("provider reports error after FS leaves safe mode") {
testRetry("provider reports error after FS leaves safe mode") {
testDir.delete()
val clock = new ManualClock()
val provider = new SafeModeTestProvider(createTestConf(), clock)
Expand All @@ -505,7 +509,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
provider.inSafeMode = false
clock.setTime(10000)

eventually(timeout(1 second), interval(10 millis)) {
eventually(timeout(3.second), interval(10.milliseconds)) {
verify(errorHandler).uncaughtException(any(), any())
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,23 @@ package org.apache.spark.scheduler

import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo

/**
* Unit tests for SparkListener that require a local cluster.
*/
class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
with BeforeAndAfter with BeforeAndAfterAll {
class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

before {
override def beforeEach(): Unit = {
super.beforeEach()
sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite")
}

test("SparkListener sends executor added message") {
testRetry("SparkListener sends executor added message") {
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

Expand Down
95 changes: 92 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,84 @@ object SparkBuild extends PomBuild {
else x.settings(Seq[Setting[_]](): _*)
} ++ Seq[Project](OldDeps.project)
}

if (!sys.env.contains("SERIAL_SBT_TESTS")) {
allProjects.foreach(enable(SparkParallelTestGrouping.settings))
}
}

object SparkParallelTestGrouping {
// Settings for parallelizing tests. The basic strategy here is to run the slowest suites (or
// collections of suites) in their own forked JVMs, allowing us to gain parallelism within a
// SBT project. Here, we take a whitelisting approach where the default behavior is to run all
// tests sequentially in a single JVM, requiring us to manually opt-in to the extra parallelism.
//
// There are a reasons why such a whitelist approach is good:
//
// 1. Launching one JVM per suite adds significant overhead for short-running suites. In
// addition to JVM startup time and JIT warmup, it appears that initialization of Derby
// metastores can be very slow so creating a fresh warehouse per suite is inefficient.
//
// 2. When parallelizing within a project we need to give each forked JVM a different tmpdir
// so that the metastore warehouses do not collide. Unfortunately, it seems that there are
// some tests which have an overly tight dependency on the default tmpdir, so those fragile
// tests need to continue re-running in the default configuration (or need to be rewritten).
// Fixing that problem would be a huge amount of work for limited payoff in most cases
// because most test suites are short-running.
//

private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set(
"org.apache.spark.DistributedSuite",
"org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite",
"org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite",
"org.apache.spark.sql.catalyst.expressions.CastSuite",
"org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite",
"org.apache.spark.sql.hive.HiveExternalCatalogSuite",
"org.apache.spark.sql.hive.StatisticsSuite",
"org.apache.spark.sql.hive.execution.HiveCompatibilitySuite",
"org.apache.spark.sql.hive.client.VersionsSuite",
"org.apache.spark.sql.hive.client.HiveClientVersions",
"org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite",
"org.apache.spark.ml.classification.LogisticRegressionSuite",
"org.apache.spark.ml.classification.LinearSVCSuite",
"org.apache.spark.sql.SQLQueryTestSuite"
)

private val DEFAULT_TEST_GROUP = "default_test_group"

private def testNameToTestGroup(name: String): String = name match {
case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name
case _ => DEFAULT_TEST_GROUP
}

lazy val settings = Seq(
testGrouping in Test := {
val tests: Seq[TestDefinition] = (definedTests in Test).value
val defaultForkOptions = ForkOptions(
bootJars = Nil,
javaHome = javaHome.value,
connectInput = connectInput.value,
outputStrategy = outputStrategy.value,
runJVMOptions = (javaOptions in Test).value,
workingDirectory = Some(baseDirectory.value),
envVars = (envVars in Test).value
)
tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) =>
val forkOptions = {
if (groupName == DEFAULT_TEST_GROUP) {
defaultForkOptions
} else {
defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++
Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName"))
}
}
new Tests.Group(
name = groupName,
tests = groupTests,
runPolicy = Tests.SubProcess(forkOptions))
}
}.toSeq
)
}

object Core {
Expand Down Expand Up @@ -844,8 +922,14 @@ object TestSettings {
testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
// Enable Junit testing.
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test",
// Only allow one test at a time, even across projects, since they run in the same JVM
parallelExecution in Test := false,
// `parallelExecutionInTest` controls whether test suites belonging to the same SBT project
// can run in parallel with one another. It does NOT control whether tests execute in parallel
// within the same JVM (which is controlled by `testForkedParallel`) or whether test cases
// within the same suite can run in parallel (which is a ScalaTest runner option which is passed
// to the underlying runner but is not a SBT-level configuration). This needs to be `true` in
// order for the extra parallelism enabled by `SparkParallelTestGrouping` to take effect.
// The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be feature-flagged.
parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS")) false else true },
// Make sure the test temp directory exists.
resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map { outDir: File =>
var dir = new File(testTempDir)
Expand All @@ -866,7 +950,12 @@ object TestSettings {
}
Seq.empty[File]
}).value,
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
concurrentRestrictions in Global := {
// The number of concurrent test groups is empirically chosen based on experience
// with Jenkins flakiness.
if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value
else Seq(Tags.limit(Tags.ForkedTestGroup, 4))
}
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ abstract class BaseYarnClusterSuite

val handle = launcher.startApplication()
try {
eventually(timeout(2 minutes), interval(1 second)) {
eventually(timeout(3.minutes), interval(1.second)) {
assert(handle.getState().isFinal())
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
.startApplication()

try {
eventually(timeout(30 seconds), interval(100 millis)) {
eventually(timeout(3.minutes), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.RUNNING)
}

handle.getAppId() should not be (null)
handle.getAppId() should startWith ("application_")
handle.stop()

eventually(timeout(30 seconds), interval(100 millis)) {
eventually(timeout(3.minutes), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.KILLED)
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
val df = session.sql(sql)
val schema = df.schema
val notIncludedMsg = "[not included in comparison]"
val clsName = this.getClass.getCanonicalName
// Get answer, but also get rid of the #1234 expression ids that show up in explain plans
val answer = df.queryExecution.hiveResultString().map(_.replaceAll("#\\d+", "#x")
.replaceAll("Location.*/sql/core/", s"Location ${notIncludedMsg}sql/core/")
.replaceAll(
s"Location.*/sql/core/spark-warehouse/$clsName/",
s"Location ${notIncludedMsg}sql/core/spark-warehouse/")
.replaceAll("Created By.*", s"Created By $notIncludedMsg")
.replaceAll("Created Time.*", s"Created Time $notIncludedMsg")
.replaceAll("Last Access.*", s"Last Access $notIncludedMsg")
Expand Down
Loading