-
Executor Summary
+
Executor Summary ({allExecutors.length})
{executorsTable}
{
if (removedExecutors.nonEmpty) {
-
Removed Executors
++
+
Removed Executors ({removedExecutors.length})
++
removedExecutorsTable
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 9351c72094e3..bc0bf6a1d970 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -57,8 +57,10 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private def handleKillRequest(request: HttpServletRequest, action: String => Unit): Unit = {
if (parent.killEnabled &&
parent.master.securityMgr.checkModifyPermissions(request.getRemoteUser)) {
- val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
- val id = Option(request.getParameter("id"))
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val killFlag =
+ Option(UIUtils.stripXSS(request.getParameter("terminate"))).getOrElse("false").toBoolean
+ val id = Option(UIUtils.stripXSS(request.getParameter("id")))
if (id.isDefined && killFlag) {
action(id.get)
}
@@ -126,14 +128,14 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
-
Workers
+ Workers ({workers.length})
{workerTable}
-
Running Applications
+ Running Applications ({activeApps.length})
{activeAppsTable}
@@ -142,7 +144,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
{if (hasDrivers) {
-
Running Drivers
+ Running Drivers ({activeDrivers.length})
{activeDriversTable}
@@ -152,7 +154,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
-
Completed Applications
+ Completed Applications ({completedApps.length})
{completedAppsTable}
@@ -162,7 +164,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
if (hasDrivers) {
-
Completed Drivers
+ Completed Drivers ({completedDrivers.length})
{completedDriversTable}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 00b9d1af373d..1198e3cb05ea 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -55,7 +55,7 @@ private[deploy] class Worker(
private val host = rpcEnv.address.host
private val port = rpcEnv.address.port
- Utils.checkHost(host, "Expected hostname")
+ Utils.checkHost(host)
assert (port > 0)
// A scheduled executor used to send messages at the specified time.
@@ -99,6 +99,20 @@ private[deploy] class Worker(
private val testing: Boolean = sys.props.contains("spark.testing")
private var master: Option[RpcEndpointRef] = None
+
+ /**
+ * Whether to use the master address in `masterRpcAddresses` if possible. If it's disabled, Worker
+ * will just use the address received from Master.
+ */
+ private val preferConfiguredMasterAddress =
+ conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
+ /**
+ * The master address to connect in case of failure. When the connection is broken, worker will
+ * use this address to connect. This is usually just one of `masterRpcAddresses`. However, when
+ * a master is restarted or takes over leadership, it will be an address sent from master, which
+ * may not be in `masterRpcAddresses`.
+ */
+ private var masterAddressToConnect: Option[RpcAddress] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
private var workerWebUiUrl: String = ""
@@ -196,10 +210,19 @@ private[deploy] class Worker(
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
- private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
+ /**
+ * Change to use the new master.
+ *
+ * @param masterRef the new master ref
+ * @param uiUrl the new master Web UI address
+ * @param masterAddress the new master address which the worker should use to connect in case of
+ * failure
+ */
+ private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String, masterAddress: RpcAddress) {
// activeMasterUrl it's a valid Spark url since we receive it from master.
activeMasterUrl = masterRef.address.toSparkURL
activeMasterWebUiUrl = uiUrl
+ masterAddressToConnect = Some(masterAddress)
master = Some(masterRef)
connected = true
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
@@ -266,7 +289,8 @@ private[deploy] class Worker(
if (registerMasterFutures != null) {
registerMasterFutures.foreach(_.cancel(true))
}
- val masterAddress = masterRef.address
+ val masterAddress =
+ if (preferConfiguredMasterAddress) masterAddressToConnect.get else masterRef.address
registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
@@ -342,15 +366,27 @@ private[deploy] class Worker(
}
private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
- masterEndpoint.send(RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl))
+ masterEndpoint.send(RegisterWorker(
+ workerId,
+ host,
+ port,
+ self,
+ cores,
+ memory,
+ workerWebUiUrl,
+ masterEndpoint.address))
}
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
- case RegisteredWorker(masterRef, masterWebUiUrl) =>
- logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
+ case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
+ if (preferConfiguredMasterAddress) {
+ logInfo("Successfully registered with master " + masterAddress.toSparkURL)
+ } else {
+ logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
+ }
registered = true
- changeMaster(masterRef, masterWebUiUrl)
+ changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
@@ -419,7 +455,7 @@ private[deploy] class Worker(
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
- changeMaster(masterRef, masterWebUiUrl)
+ changeMaster(masterRef, masterWebUiUrl, masterRef.address)
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
@@ -561,7 +597,8 @@ private[deploy] class Worker(
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
- if (master.exists(_.address == remoteAddress)) {
+ if (master.exists(_.address == remoteAddress) ||
+ masterAddressToConnect.exists(_ == remoteAddress)) {
logInfo(s"$remoteAddress Disassociated !")
masterDisconnected()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 777020d4d5c8..bd07d342e04a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -68,12 +68,12 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
- Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
+ Utils.checkHost(value)
host = value
parse(tail)
case ("--host" | "-h") :: value :: tail =>
- Utils.checkHost(value, "Please use hostname " + value)
+ Utils.checkHost(value)
host = value
parse(tail)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index 80dc9bf8779d..2f5a5642d3ca 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -33,13 +33,16 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
private val supportedLogTypes = Set("stderr", "stdout")
private val defaultBytes = 100 * 1024
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
def renderLog(request: HttpServletRequest): String = {
- val appId = Option(request.getParameter("appId"))
- val executorId = Option(request.getParameter("executorId"))
- val driverId = Option(request.getParameter("driverId"))
- val logType = request.getParameter("logType")
- val offset = Option(request.getParameter("offset")).map(_.toLong)
- val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+ val appId = Option(UIUtils.stripXSS(request.getParameter("appId")))
+ val executorId = Option(UIUtils.stripXSS(request.getParameter("executorId")))
+ val driverId = Option(UIUtils.stripXSS(request.getParameter("driverId")))
+ val logType = UIUtils.stripXSS(request.getParameter("logType"))
+ val offset = Option(UIUtils.stripXSS(request.getParameter("offset"))).map(_.toLong)
+ val byteLength =
+ Option(UIUtils.stripXSS(request.getParameter("byteLength"))).map(_.toInt)
+ .getOrElse(defaultBytes)
val logDir = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
@@ -55,13 +58,16 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
pre + logText
}
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
def render(request: HttpServletRequest): Seq[Node] = {
- val appId = Option(request.getParameter("appId"))
- val executorId = Option(request.getParameter("executorId"))
- val driverId = Option(request.getParameter("driverId"))
- val logType = request.getParameter("logType")
- val offset = Option(request.getParameter("offset")).map(_.toLong)
- val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+ val appId = Option(UIUtils.stripXSS(request.getParameter("appId")))
+ val executorId = Option(UIUtils.stripXSS(request.getParameter("executorId")))
+ val driverId = Option(UIUtils.stripXSS(request.getParameter("driverId")))
+ val logType = UIUtils.stripXSS(request.getParameter("logType"))
+ val offset = Option(UIUtils.stripXSS(request.getParameter("offset"))).map(_.toLong)
+ val byteLength =
+ Option(UIUtils.stripXSS(request.getParameter("byteLength"))).map(_.toInt)
+ .getOrElse(defaultBytes)
val (logDir, params, pageName) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index b2b26ee107c0..a2f1aa22b006 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -191,11 +191,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
- val port = executorConf.getInt("spark.executor.port", 0)
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
- port,
+ -1,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
@@ -221,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
val env = SparkEnv.createExecutorEnv(
- driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
+ driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 51b6c373c4da..5b396687dd11 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -71,7 +71,7 @@ private[spark] class Executor(
private val conf = env.conf
// No ip or host:port - just hostname
- Utils.checkHost(executorHostname, "Expected executed slave to be a hostname")
+ Utils.checkHost(executorHostname)
// must not have port specified.
assert (0 == Utils.parseHostPort(executorHostname)._2)
@@ -425,6 +425,7 @@ private[spark] class Executor(
}
}
+ setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7f7921d56f49..e193ed222e22 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -278,4 +278,13 @@ package object config {
"spark.io.compression.codec.")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
+ ConfigBuilder("spark.shuffle.accurateBlockThreshold")
+ .doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
+ "record the size accurately if it's above this config. This helps to prevent OOM by " +
+ "avoiding underestimating shuffle block size when fetch shuffle blocks.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(100 * 1024 * 1024)
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index 28c45d800ed0..6da8865cd10d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -34,6 +34,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
var adminAcls: Option[String] = None
var viewAclsGroups: Option[String] = None
var adminAclsGroups: Option[String] = None
+ var appSparkVersion: Option[String] = None
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
@@ -57,4 +58,10 @@ private[spark] class ApplicationEventListener extends SparkListener {
adminAclsGroups = allProperties.get("spark.admin.acls.groups")
}
}
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case SparkListenerLogStart(sparkVersion) =>
+ appSparkVersion = Some(sparkVersion)
+ case _ =>
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index aab177f257a8..875acc37e90f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -58,7 +58,7 @@ import org.apache.spark.util._
* set of map output files, and another to read those files after a barrier). In the end, every
* stage will have only shuffle dependencies on other stages, and may compute multiple operations
* inside it. The actual pipelining of these operations happens in the RDD.compute() functions of
- * various RDDs (MappedRDD, FilteredRDD, etc).
+ * various RDDs
*
* In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred
* locations to run each task on, based on the current cache status, and passes these to the
@@ -618,12 +618,7 @@ class DAGScheduler(
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
- // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
- // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
- // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
- // safe to pass in null here. For more detail, see SPARK-13747.
- val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
- waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
+ ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index a7dbf87915b2..f48143633224 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -119,7 +119,7 @@ private[spark] class EventLoggingListener(
val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
val bstream = new BufferedOutputStream(cstream, outputBufferSize)
- EventLoggingListener.initEventLog(bstream)
+ EventLoggingListener.initEventLog(bstream, testing, loggedEvents)
fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
writer = Some(new PrintWriter(bstream))
logInfo("Logging events to %s".format(logPath))
@@ -283,10 +283,17 @@ private[spark] object EventLoggingListener extends Logging {
*
* @param logStream Raw output stream to the event log file.
*/
- def initEventLog(logStream: OutputStream): Unit = {
+ def initEventLog(
+ logStream: OutputStream,
+ testing: Boolean,
+ loggedEvents: ArrayBuffer[JValue]): Unit = {
val metadata = SparkListenerLogStart(SPARK_VERSION)
- val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n"
+ val eventJson = JsonProtocol.logStartToJson(metadata)
+ val metadataJson = compact(eventJson) + "\n"
logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8))
+ if (testing && loggedEvents != null) {
+ loggedEvents += eventJson
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index b2e9a97129f0..048e0d018659 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,13 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
import org.roaringbitmap.RoaringBitmap
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
@@ -121,34 +126,41 @@ private[spark] class CompressedMapStatus(
}
/**
- * A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
+ * A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger
+ * than spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty blocks,
* plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param emptyBlocks a bitmap tracking which blocks are empty
- * @param avgSize average size of the non-empty blocks
+ * @param avgSize average size of the non-empty and non-huge blocks
+ * @param hugeBlockSizes sizes of huge blocks by their reduceId.
*/
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
- private[this] var avgSize: Long)
+ private[this] var avgSize: Long,
+ @transient private var hugeBlockSizes: Map[Int, Byte])
extends MapStatus with Externalizable {
// loc could be null when the default constructor is called during deserialization
- require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
+ require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")
- protected def this() = this(null, -1, null, -1) // For deserialization only
+ protected def this() = this(null, -1, null, -1, null) // For deserialization only
override def location: BlockManagerId = loc
override def getSizeForBlock(reduceId: Int): Long = {
+ assert(hugeBlockSizes != null)
if (emptyBlocks.contains(reduceId)) {
0
} else {
- avgSize
+ hugeBlockSizes.get(reduceId) match {
+ case Some(size) => MapStatus.decompressSize(size)
+ case None => avgSize
+ }
}
}
@@ -156,6 +168,11 @@ private[spark] class HighlyCompressedMapStatus private (
loc.writeExternal(out)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
+ out.writeInt(hugeBlockSizes.size)
+ hugeBlockSizes.foreach { kv =>
+ out.writeInt(kv._1)
+ out.writeByte(kv._2)
+ }
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -163,6 +180,14 @@ private[spark] class HighlyCompressedMapStatus private (
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
+ val count = in.readInt()
+ val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]()
+ (0 until count).foreach { _ =>
+ val block = in.readInt()
+ val size = in.readByte()
+ hugeBlockSizesArray += Tuple2(block, size)
+ }
+ hugeBlockSizes = hugeBlockSizesArray.toMap
}
}
@@ -178,11 +203,21 @@ private[spark] object HighlyCompressedMapStatus {
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
+ val threshold = Option(SparkEnv.get)
+ .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+ .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+ val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]()
while (i < totalNumBlocks) {
- var size = uncompressedSizes(i)
+ val size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
- totalSize += size
+ // Huge blocks are not included in the calculation for average size, thus size for smaller
+ // blocks is more accurate.
+ if (size < threshold) {
+ totalSize += size
+ } else {
+ hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))
+ }
} else {
emptyBlocks.add(i)
}
@@ -195,6 +230,7 @@ private[spark] object HighlyCompressedMapStatus {
}
emptyBlocks.trim()
emptyBlocks.runOptimize()
- new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
+ new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
+ hugeBlockSizesArray.toMap)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index bc2e53071668..59f89a82a1da 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -160,9 +160,9 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
/**
* An internal class that describes the metadata of an event log.
- * This event is not meant to be posted to listeners downstream.
*/
-private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
+@DeveloperApi
+case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
/**
* Interface for creating history listeners defined in other modules like SQL, which are used to
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 3ff363321e8c..3b0d3b1b150f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -71,7 +71,6 @@ private[spark] trait SparkListenerBus
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
- case logStart: SparkListenerLogStart => // ignore event log metadata
case _ => listener.onOtherEvent(event)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 5c337b992c84..7767ef1803a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -115,26 +115,33 @@ private[spark] abstract class Task[T](
case t: Throwable =>
e.addSuppressed(t)
}
+ context.markTaskCompleted(Some(e))
throw e
} finally {
- // Call the task completion callbacks.
- context.markTaskCompleted()
try {
- Utils.tryLogNonFatalError {
- // Release memory used by this thread for unrolling blocks
- SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
- SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
- // Notify any tasks waiting for execution memory to be freed to wake up and try to
- // acquire memory again. This makes impossible the scenario where a task sleeps forever
- // because there are no other tasks left to notify it. Since this is safe to do but may
- // not be strictly necessary, we should revisit whether we can remove this in the future.
- val memoryManager = SparkEnv.get.memoryManager
- memoryManager.synchronized { memoryManager.notifyAll() }
- }
+ // Call the task completion callbacks. If "markTaskCompleted" is called twice, the second
+ // one is no-op.
+ context.markTaskCompleted(None)
} finally {
- // Though we unset the ThreadLocal here, the context member variable itself is still queried
- // directly in the TaskRunner to check for FetchFailedExceptions.
- TaskContext.unset()
+ try {
+ Utils.tryLogNonFatalError {
+ // Release memory used by this thread for unrolling blocks
+ SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
+ SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(
+ MemoryMode.OFF_HEAP)
+ // Notify any tasks waiting for execution memory to be freed to wake up and try to
+ // acquire memory again. This makes impossible the scenario where a task sleeps forever
+ // because there are no other tasks left to notify it. Since this is safe to do but may
+ // not be strictly necessary, we should revisit whether we can remove this in the
+ // future.
+ val memoryManager = SparkEnv.get.memoryManager
+ memoryManager.synchronized { memoryManager.notifyAll() }
+ }
+ } finally {
+ // Though we unset the ThreadLocal here, the context member variable itself is still
+ // queried directly in the TaskRunner to check for FetchFailedExceptions.
+ TaskContext.unset()
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
index a0239266d875..f039744e7f67 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -90,7 +90,8 @@ private[spark] object ApplicationsListResource {
},
lastUpdated = new Date(internalAttemptInfo.lastUpdated),
sparkUser = internalAttemptInfo.sparkUser,
- completed = internalAttemptInfo.completed
+ completed = internalAttemptInfo.completed,
+ appSparkVersion = internalAttemptInfo.appSparkVersion
)
}
)
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 56d8e51732ff..f6203271f3cd 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -38,7 +38,8 @@ class ApplicationAttemptInfo private[spark](
val lastUpdated: Date,
val duration: Long,
val sparkUser: String,
- val completed: Boolean = false) {
+ val completed: Boolean = false,
+ val appSparkVersion: String) {
def getStartTimeEpoch: Long = startTime.getTime
def getEndTimeEpoch: Long = endTime.getTime
def getLastUpdatedEpoch: Long = lastUpdated.getTime
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 3219969bcd06..137d24b52515 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -23,14 +23,12 @@ import java.nio.channels.Channels
import scala.collection.mutable
import scala.collection.mutable.HashMap
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.Random
import scala.util.control.NonFatal
-import com.google.common.io.ByteStreams
-
import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.internal.Logging
@@ -41,7 +39,6 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalShuffleClient
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.security.CryptoStreamUtils
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
@@ -337,7 +334,7 @@ private[spark] class BlockManager(
val task = asyncReregisterTask
if (task != null) {
try {
- Await.ready(task, Duration.Inf)
+ ThreadUtils.awaitReady(task, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for async. reregistration", t)
@@ -612,12 +609,19 @@ private[spark] class BlockManager(
/**
* Return a list of locations for the given block, prioritizing the local machine since
- * multiple block managers can share the same host.
+ * multiple block managers can share the same host, followed by hosts on the same rack.
*/
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
val locs = Random.shuffle(master.getLocations(blockId))
val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
- preferredLocs ++ otherLocs
+ blockManagerId.topologyInfo match {
+ case None => preferredLocs ++ otherLocs
+ case Some(_) =>
+ val (sameRackLocs, differentRackLocs) = otherLocs.partition {
+ loc => blockManagerId.topologyInfo == loc.topologyInfo
+ }
+ preferredLocs ++ sameRackLocs ++ differentRackLocs
+ }
}
/**
@@ -912,7 +916,7 @@ private[spark] class BlockManager(
if (level.replication > 1) {
// Wait for asynchronous replication to finish
try {
- Await.ready(replicationFuture, Duration.Inf)
+ ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for replication to finish", t)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index c37a3604d28f..2c3da0ee85e0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -46,7 +46,7 @@ class BlockManagerId private (
def executorId: String = executorId_
if (null != host_) {
- Utils.checkHost(host_, "Expected hostname")
+ Utils.checkHost(host_)
assert (port_ > 0)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index bf4cf79e9faa..f271c56021e9 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -60,6 +60,8 @@ private[spark] class SparkUI private (
var appId: String = _
+ var appSparkVersion = org.apache.spark.SPARK_VERSION
+
private var streamingJobProgressListener: Option[SparkListener] = None
/** Initialize all components of the server. */
@@ -118,7 +120,8 @@ private[spark] class SparkUI private (
duration = 0,
lastUpdated = new Date(startTime),
sparkUser = getSparkUser,
- completed = false
+ completed = false,
+ appSparkVersion = appSparkVersion
))
))
}
@@ -139,6 +142,7 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
def appName: String = parent.appName
+ def appSparkVersion: String = parent.appSparkVersion
}
private[spark] object SparkUI {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 79b0d81af52b..2610f673d27f 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -25,6 +25,8 @@ import scala.util.control.NonFatal
import scala.xml._
import scala.xml.transform.{RewriteRule, RuleTransformer}
+import org.apache.commons.lang3.StringEscapeUtils
+
import org.apache.spark.internal.Logging
import org.apache.spark.ui.scope.RDDOperationGraph
@@ -34,6 +36,8 @@ private[spark] object UIUtils extends Logging {
val TABLE_CLASS_STRIPED = TABLE_CLASS_NOT_STRIPED + " table-striped"
val TABLE_CLASS_STRIPED_SORTABLE = TABLE_CLASS_STRIPED + " sortable"
+ private val NEWLINE_AND_SINGLE_QUOTE_REGEX = raw"(?i)(\r\n|\n|\r|%0D%0A|%0A|%0D|'|%27)".r
+
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat =
@@ -228,7 +232,7 @@ private[spark] object UIUtils extends Logging {
@@ -527,4 +531,21 @@ private[spark] object UIUtils extends Logging {
origHref
}
}
+
+ /**
+ * Remove suspicious characters of user input to prevent Cross-Site scripting (XSS) attacks
+ *
+ * For more information about XSS testing:
+ * https://www.owasp.org/index.php/XSS_Filter_Evasion_Cheat_Sheet and
+ * https://www.owasp.org/index.php/Testing_for_Reflected_Cross_site_scripting_(OTG-INPVAL-001)
+ */
+ def stripXSS(requestParameter: String): String = {
+ if (requestParameter == null) {
+ null
+ } else {
+ // Remove new lines and single quotes, followed by escaping HTML version 4.0
+ StringEscapeUtils.escapeHtml4(
+ NEWLINE_AND_SINGLE_QUOTE_REGEX.replaceAllIn(requestParameter, ""))
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
index 6ce3f511e89c..7b211ea5199c 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala
@@ -28,8 +28,10 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
private val sc = parent.sc
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
def render(request: HttpServletRequest): Seq[Node] = {
- val executorId = Option(request.getParameter("executorId")).map { executorId =>
+ val executorId =
+ Option(UIUtils.stripXSS(request.getParameter("executorId"))).map { executorId =>
UIUtils.decodeURLParameter(executorId)
}.getOrElse {
throw new IllegalArgumentException(s"Missing executorId parameter")
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 18be0870746e..cce7a7611b42 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -220,18 +220,20 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
jobTag: String,
jobs: Seq[JobUIData],
killEnabled: Boolean): Seq[Node] = {
- val allParameters = request.getParameterMap.asScala.toMap
+ // stripXSS is called to remove suspicious characters used in XSS attacks
+ val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS))
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
.map(para => para._1 + "=" + para._2(0))
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
- val parameterJobPage = request.getParameter(jobTag + ".page")
- val parameterJobSortColumn = request.getParameter(jobTag + ".sort")
- val parameterJobSortDesc = request.getParameter(jobTag + ".desc")
- val parameterJobPageSize = request.getParameter(jobTag + ".pageSize")
- val parameterJobPrevPageSize = request.getParameter(jobTag + ".prevPageSize")
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val parameterJobPage = UIUtils.stripXSS(request.getParameter(jobTag + ".page"))
+ val parameterJobSortColumn = UIUtils.stripXSS(request.getParameter(jobTag + ".sort"))
+ val parameterJobSortDesc = UIUtils.stripXSS(request.getParameter(jobTag + ".desc"))
+ val parameterJobPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".pageSize"))
+ val parameterJobPrevPageSize = UIUtils.stripXSS(request.getParameter(jobTag + ".prevPageSize"))
val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1)
val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn =>
@@ -629,7 +631,8 @@ private[ui] class JobPagedTable(
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
- {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
+ {UIUtils.makeProgressBar(started = job.numActiveTasks,
+ completed = job.completedIndices.size,
failed = job.numFailedTasks, skipped = job.numSkippedTasks,
reasonToNumKilled = job.reasonToNumKilled, total = job.numTasks - job.numSkippedTasks)}
|
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 3131c4a1eb7d..9fb011a049b7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -187,7 +187,8 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
val listener = parent.jobProgresslistener
listener.synchronized {
- val parameterId = request.getParameter("id")
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val parameterId = UIUtils.stripXSS(request.getParameter("id"))
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
val jobId = parameterId.toInt
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 8870187f2219..1b10feb36e43 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -329,13 +329,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
- val metrics = TaskMetrics.empty
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
- stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics)))
+ stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
@@ -405,7 +404,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
- val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None))
+ val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info))
taskData.updateTaskInfo(info)
taskData.updateTaskMetrics(taskMetrics)
taskData.errorMessage = errorMessage
@@ -424,6 +423,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
jobData.numActiveTasks -= 1
taskEnd.reason match {
case Success =>
+ jobData.completedIndices.add((taskEnd.stageId, info.index))
jobData.numCompletedTasks += 1
case kill: TaskKilled =>
jobData.reasonToNumKilled = jobData.reasonToNumKilled.updated(
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 620c54c2dc0a..cc173381879a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
import javax.servlet.http.HttpServletRequest
import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.{SparkUI, SparkUITab}
+import org.apache.spark.ui.{SparkUI, SparkUITab, UIUtils}
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
@@ -40,7 +40,8 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
- val jobId = Option(request.getParameter("id")).map(_.toInt)
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val jobId = Option(UIUtils.stripXSS(request.getParameter("id"))).map(_.toInt)
jobId.foreach { id =>
if (jobProgresslistener.activeJobs.contains(id)) {
sc.foreach(_.cancelJob(id))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 8ee70d27cc09..b164f32b62e9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -31,7 +31,8 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
- val poolName = Option(request.getParameter("poolname")).map { poolname =>
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val poolName = Option(UIUtils.stripXSS(request.getParameter("poolname"))).map { poolname =>
UIUtils.decodeURLParameter(poolname)
}.getOrElse {
throw new IllegalArgumentException(s"Missing poolname parameter")
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 19325a2dc916..6b3dadc33331 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -87,17 +87,18 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
def render(request: HttpServletRequest): Seq[Node] = {
progressListener.synchronized {
- val parameterId = request.getParameter("id")
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val parameterId = UIUtils.stripXSS(request.getParameter("id"))
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
- val parameterAttempt = request.getParameter("attempt")
+ val parameterAttempt = UIUtils.stripXSS(request.getParameter("attempt"))
require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter")
- val parameterTaskPage = request.getParameter("task.page")
- val parameterTaskSortColumn = request.getParameter("task.sort")
- val parameterTaskSortDesc = request.getParameter("task.desc")
- val parameterTaskPageSize = request.getParameter("task.pageSize")
- val parameterTaskPrevPageSize = request.getParameter("task.prevPageSize")
+ val parameterTaskPage = UIUtils.stripXSS(request.getParameter("task.page"))
+ val parameterTaskSortColumn = UIUtils.stripXSS(request.getParameter("task.sort"))
+ val parameterTaskSortDesc = UIUtils.stripXSS(request.getParameter("task.desc"))
+ val parameterTaskPageSize = UIUtils.stripXSS(request.getParameter("task.pageSize"))
+ val parameterTaskPrevPageSize = UIUtils.stripXSS(request.getParameter("task.prevPageSize"))
val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1)
val taskSortColumn = Option(parameterTaskSortColumn).map { sortColumn =>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 256b726fa7ee..a28daf7f9045 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -42,15 +42,17 @@ private[ui] class StageTableBase(
isFairScheduler: Boolean,
killEnabled: Boolean,
isFailedStage: Boolean) {
- val allParameters = request.getParameterMap().asScala.toMap
+ // stripXSS is called to remove suspicious characters used in XSS attacks
+ val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS))
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag))
.map(para => para._1 + "=" + para._2(0))
- val parameterStagePage = request.getParameter(stageTag + ".page")
- val parameterStageSortColumn = request.getParameter(stageTag + ".sort")
- val parameterStageSortDesc = request.getParameter(stageTag + ".desc")
- val parameterStagePageSize = request.getParameter(stageTag + ".pageSize")
- val parameterStagePrevPageSize = request.getParameter(stageTag + ".prevPageSize")
+ val parameterStagePage = UIUtils.stripXSS(request.getParameter(stageTag + ".page"))
+ val parameterStageSortColumn = UIUtils.stripXSS(request.getParameter(stageTag + ".sort"))
+ val parameterStageSortDesc = UIUtils.stripXSS(request.getParameter(stageTag + ".desc"))
+ val parameterStagePageSize = UIUtils.stripXSS(request.getParameter(stageTag + ".pageSize"))
+ val parameterStagePrevPageSize =
+ UIUtils.stripXSS(request.getParameter(stageTag + ".prevPageSize"))
val stagePage = Option(parameterStagePage).map(_.toInt).getOrElse(1)
val stageSortColumn = Option(parameterStageSortColumn).map { sortColumn =>
@@ -512,4 +514,3 @@ private[ui] class StageDataSource(
}
}
}
-
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 181465bdf960..799d76962639 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
import javax.servlet.http.HttpServletRequest
import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.{SparkUI, SparkUITab}
+import org.apache.spark.ui.{SparkUI, SparkUITab, UIUtils}
/** Web UI showing progress status of all stages in the given SparkContext. */
private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
@@ -39,7 +39,8 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
def handleKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
- val stageId = Option(request.getParameter("id")).map(_.toInt)
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val stageId = Option(UIUtils.stripXSS(request.getParameter("id"))).map(_.toInt)
stageId.foreach { id =>
if (progressListener.activeStages.contains(id)) {
sc.foreach(_.cancelStage(id, "killed via the Web UI"))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index ac1a74ad8029..048c4ad0146e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -62,6 +62,7 @@ private[spark] object UIData {
var numTasks: Int = 0,
var numActiveTasks: Int = 0,
var numCompletedTasks: Int = 0,
+ var completedIndices: OpenHashSet[(Int, Int)] = new OpenHashSet[(Int, Int)](),
var numSkippedTasks: Int = 0,
var numFailedTasks: Int = 0,
var reasonToNumKilled: Map[String, Int] = Map.empty,
@@ -112,9 +113,9 @@ private[spark] object UIData {
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
- class TaskUIData private(
- private var _taskInfo: TaskInfo,
- private var _metrics: Option[TaskMetricsUIData]) {
+ class TaskUIData private(private var _taskInfo: TaskInfo) {
+
+ private[this] var _metrics: Option[TaskMetricsUIData] = Some(TaskMetricsUIData.EMPTY)
var errorMessage: Option[String] = None
@@ -127,7 +128,7 @@ private[spark] object UIData {
}
def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
- _metrics = TaskUIData.toTaskMetricsUIData(metrics)
+ _metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics)
}
def taskDuration: Option[Long] = {
@@ -140,28 +141,8 @@ private[spark] object UIData {
}
object TaskUIData {
- def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = {
- new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics))
- }
-
- private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData] = {
- metrics.map { m =>
- TaskMetricsUIData(
- executorDeserializeTime = m.executorDeserializeTime,
- executorDeserializeCpuTime = m.executorDeserializeCpuTime,
- executorRunTime = m.executorRunTime,
- executorCpuTime = m.executorCpuTime,
- resultSize = m.resultSize,
- jvmGCTime = m.jvmGCTime,
- resultSerializationTime = m.resultSerializationTime,
- memoryBytesSpilled = m.memoryBytesSpilled,
- diskBytesSpilled = m.diskBytesSpilled,
- peakExecutionMemory = m.peakExecutionMemory,
- inputMetrics = InputMetricsUIData(m.inputMetrics),
- outputMetrics = OutputMetricsUIData(m.outputMetrics),
- shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
- shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
- }
+ def apply(taskInfo: TaskInfo): TaskUIData = {
+ new TaskUIData(dropInternalAndSQLAccumulables(taskInfo))
}
/**
@@ -206,6 +187,28 @@ private[spark] object UIData {
shuffleReadMetrics: ShuffleReadMetricsUIData,
shuffleWriteMetrics: ShuffleWriteMetricsUIData)
+ object TaskMetricsUIData {
+ def fromTaskMetrics(m: TaskMetrics): TaskMetricsUIData = {
+ TaskMetricsUIData(
+ executorDeserializeTime = m.executorDeserializeTime,
+ executorDeserializeCpuTime = m.executorDeserializeCpuTime,
+ executorRunTime = m.executorRunTime,
+ executorCpuTime = m.executorCpuTime,
+ resultSize = m.resultSize,
+ jvmGCTime = m.jvmGCTime,
+ resultSerializationTime = m.resultSerializationTime,
+ memoryBytesSpilled = m.memoryBytesSpilled,
+ diskBytesSpilled = m.diskBytesSpilled,
+ peakExecutionMemory = m.peakExecutionMemory,
+ inputMetrics = InputMetricsUIData(m.inputMetrics),
+ outputMetrics = OutputMetricsUIData(m.outputMetrics),
+ shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
+ shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
+ }
+
+ val EMPTY: TaskMetricsUIData = fromTaskMetrics(TaskMetrics.empty)
+ }
+
case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
object InputMetricsUIData {
def apply(metrics: InputMetrics): InputMetricsUIData = {
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index a1a0c729b924..317e0aa5ea25 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -31,14 +31,15 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
- val parameterId = request.getParameter("id")
+ // stripXSS is called first to remove suspicious characters used in XSS attacks
+ val parameterId = UIUtils.stripXSS(request.getParameter("id"))
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
- val parameterBlockPage = request.getParameter("block.page")
- val parameterBlockSortColumn = request.getParameter("block.sort")
- val parameterBlockSortDesc = request.getParameter("block.desc")
- val parameterBlockPageSize = request.getParameter("block.pageSize")
- val parameterBlockPrevPageSize = request.getParameter("block.prevPageSize")
+ val parameterBlockPage = UIUtils.stripXSS(request.getParameter("block.page"))
+ val parameterBlockSortColumn = UIUtils.stripXSS(request.getParameter("block.sort"))
+ val parameterBlockSortDesc = UIUtils.stripXSS(request.getParameter("block.desc"))
+ val parameterBlockPageSize = UIUtils.stripXSS(request.getParameter("block.pageSize"))
+ val parameterBlockPrevPageSize = UIUtils.stripXSS(request.getParameter("block.prevPageSize"))
val blockPage = Option(parameterBlockPage).map(_.toInt).getOrElse(1)
val blockSortColumn = Option(parameterBlockSortColumn).getOrElse("Block Name")
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index a65ec75cc5db..1a9a6929541a 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -84,10 +84,11 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
* Returns the name of this accumulator, can only be called after registration.
*/
final def name: Option[String] = {
+ assertMetadataNotNull()
+
if (atDriverSide) {
- AccumulatorContext.get(id).flatMap(_.metadata.name)
+ metadata.name.orElse(AccumulatorContext.get(id).flatMap(_.metadata.name))
} else {
- assertMetadataNotNull()
metadata.name
}
}
@@ -165,13 +166,15 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
- val isInternalAcc =
- (name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)) ||
- getClass.getSimpleName == "SQLMetric"
+ val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
if (isInternalAcc) {
// Do not serialize the name of internal accumulator and send it to executor.
copyAcc.metadata = metadata.copy(name = None)
} else {
+ // For non-internal accumulators, we still need to send the name because users may need to
+ // access the accumulator name at executor side, or they may keep the accumulators sent from
+ // executors and access the name when the registered accumulator is already garbage
+ // collected(e.g. SQLMetrics).
copyAcc.metadata = metadata
}
copyAcc
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index 46a5cb2cff5a..e5cccf39f945 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -28,7 +28,7 @@ private[spark] object RpcUtils {
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
- Utils.checkHost(driverHost, "Expected hostname")
+ Utils.checkHost(driverHost)
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 1aa4456ed01b..81aaf79db0c1 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -206,4 +206,25 @@ private[spark] object ThreadUtils {
}
}
// scalastyle:on awaitresult
+
+ // scalastyle:off awaitready
+ /**
+ * Preferred alternative to `Await.ready()`.
+ *
+ * @see [[awaitResult]]
+ */
+ @throws(classOf[SparkException])
+ def awaitReady[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = {
+ try {
+ // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+ // See SPARK-13747.
+ val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+ awaitable.ready(atMost)(awaitPermission)
+ } catch {
+ // TimeoutException is thrown in the current thread, so not need to warp the exception.
+ case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
+ throw new SparkException("Exception thrown in awaitResult: ", t)
+ }
+ }
+ // scalastyle:on awaitready
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 4d37db96dfc3..edfe22979232 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -937,12 +937,13 @@ private[spark] object Utils extends Logging {
customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
}
- def checkHost(host: String, message: String = "") {
- assert(host.indexOf(':') == -1, message)
+ def checkHost(host: String) {
+ assert(host != null && host.indexOf(':') == -1, s"Expected hostname (not IP) but got $host")
}
- def checkHostPort(hostPort: String, message: String = "") {
- assert(hostPort.indexOf(':') != -1, message)
+ def checkHostPort(hostPort: String) {
+ assert(hostPort != null && hostPort.indexOf(':') != -1,
+ s"Expected host and port but got $hostPort")
}
// Typically, this will be of order of number of nodes in cluster
diff --git a/core/src/main/scala/org/apache/spark/util/taskListeners.scala b/core/src/main/scala/org/apache/spark/util/taskListeners.scala
index 1be31e88ab68..51feccfb8342 100644
--- a/core/src/main/scala/org/apache/spark/util/taskListeners.scala
+++ b/core/src/main/scala/org/apache/spark/util/taskListeners.scala
@@ -55,14 +55,16 @@ class TaskCompletionListenerException(
extends RuntimeException {
override def getMessage: String = {
- if (errorMessages.size == 1) {
- errorMessages.head
- } else {
- errorMessages.zipWithIndex.map { case (msg, i) => s"Exception $i: $msg" }.mkString("\n")
- } +
- previousError.map { e =>
+ val listenerErrorMessage =
+ if (errorMessages.size == 1) {
+ errorMessages.head
+ } else {
+ errorMessages.zipWithIndex.map { case (msg, i) => s"Exception $i: $msg" }.mkString("\n")
+ }
+ val previousErrorMessage = previousError.map { e =>
"\n\nPrevious exception in task: " + e.getMessage + "\n" +
e.getStackTrace.mkString("\t", "\n\t", "")
}.getOrElse("")
+ listenerErrorMessage + previousErrorMessage
}
}
diff --git a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index 10902ab5a832..f2c3ec5da889 100644
--- a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -8,6 +8,7 @@
"duration" : 10671,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479335620587,
"startTimeEpoch" : 1479335609916,
"lastUpdatedEpoch" : 0
@@ -22,6 +23,7 @@
"duration" : 101795,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479252138874,
"startTimeEpoch" : 1479252037079,
"lastUpdatedEpoch" : 0
@@ -36,6 +38,7 @@
"duration" : 10505,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917391398,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
@@ -51,6 +54,7 @@
"duration" : 57,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380950,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
@@ -62,6 +66,7 @@
"duration" : 10,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380890,
"startTimeEpoch" : 1430917380880,
"lastUpdatedEpoch" : 0
@@ -77,6 +82,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1426633945177,
"startTimeEpoch" : 1426633910242,
"lastUpdatedEpoch" : 0
@@ -88,6 +94,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1426533945177,
"startTimeEpoch" : 1426533910242,
"lastUpdatedEpoch" : 0
@@ -102,6 +109,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1425081766912,
"startTimeEpoch" : 1425081758277,
"lastUpdatedEpoch" : 0
@@ -116,6 +124,7 @@
"duration" : 9011,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1422981788731,
"startTimeEpoch" : 1422981779720,
"lastUpdatedEpoch" : 0
@@ -130,6 +139,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1422981766912,
"startTimeEpoch" : 1422981758277,
"lastUpdatedEpoch" : 0
diff --git a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index 10902ab5a832..c925c1dd8a4d 100644
--- a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -8,6 +8,7 @@
"duration" : 10671,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479335620587,
"startTimeEpoch" : 1479335609916,
"lastUpdatedEpoch" : 0
@@ -22,6 +23,7 @@
"duration" : 101795,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479252138874,
"startTimeEpoch" : 1479252037079,
"lastUpdatedEpoch" : 0
@@ -36,6 +38,7 @@
"duration" : 10505,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917391398,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
@@ -51,6 +54,7 @@
"duration" : 57,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380950,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
@@ -62,6 +66,7 @@
"duration" : 10,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380890,
"startTimeEpoch" : 1430917380880,
"lastUpdatedEpoch" : 0
@@ -77,6 +82,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1426633945177,
"startTimeEpoch" : 1426633910242,
"lastUpdatedEpoch" : 0
@@ -88,6 +94,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1426533945177,
"startTimeEpoch" : 1426533910242,
"lastUpdatedEpoch" : 0
@@ -102,6 +109,8 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
+ "appSparkVersion" : "",
"endTimeEpoch" : 1425081766912,
"startTimeEpoch" : 1425081758277,
"lastUpdatedEpoch" : 0
@@ -116,6 +125,7 @@
"duration" : 9011,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1422981788731,
"startTimeEpoch" : 1422981779720,
"lastUpdatedEpoch" : 0
@@ -130,6 +140,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1422981766912,
"startTimeEpoch" : 1422981758277,
"lastUpdatedEpoch" : 0
diff --git a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
index 8820c717f85d..cc0b2b0022bd 100644
--- a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
@@ -8,6 +8,7 @@
"duration" : 10671,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479335620587,
"startTimeEpoch" : 1479335609916,
"lastUpdatedEpoch" : 0
@@ -22,6 +23,7 @@
"duration" : 101795,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479252138874,
"startTimeEpoch" : 1479252037079,
"lastUpdatedEpoch" : 0
@@ -36,6 +38,7 @@
"duration" : 10505,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917391398,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
diff --git a/core/src/test/resources/HistoryServerExpectations/maxDate2_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/maxDate2_app_list_json_expectation.json
index c3fe4db222ae..fa12413eeb0e 100644
--- a/core/src/test/resources/HistoryServerExpectations/maxDate2_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/maxDate2_app_list_json_expectation.json
@@ -8,6 +8,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1422981766912,
"startTimeEpoch" : 1422981758277,
"lastUpdatedEpoch" : 0
diff --git a/core/src/test/resources/HistoryServerExpectations/maxDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/maxDate_app_list_json_expectation.json
index 8281fa75aa0d..a0d4a0d1c455 100644
--- a/core/src/test/resources/HistoryServerExpectations/maxDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/maxDate_app_list_json_expectation.json
@@ -8,6 +8,7 @@
"duration" : 9011,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1422981788731,
"startTimeEpoch" : 1422981779720,
"lastUpdatedEpoch" : 0
@@ -22,6 +23,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1422981766912,
"startTimeEpoch" : 1422981758277,
"lastUpdatedEpoch" : 0
diff --git a/core/src/test/resources/HistoryServerExpectations/maxEndDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/maxEndDate_app_list_json_expectation.json
index 1842f1888b78..dfa90010c6ca 100644
--- a/core/src/test/resources/HistoryServerExpectations/maxEndDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/maxEndDate_app_list_json_expectation.json
@@ -9,6 +9,7 @@
"duration" : 57,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1430917380893,
"endTimeEpoch" : 1430917380950
@@ -20,6 +21,7 @@
"duration" : 10,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1430917380880,
"endTimeEpoch" : 1430917380890
@@ -35,6 +37,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1426633910242,
"endTimeEpoch" : 1426633945177
@@ -46,6 +49,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1426533910242,
"endTimeEpoch" : 1426533945177
@@ -60,6 +64,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1425081758277,
"endTimeEpoch" : 1425081766912
@@ -74,6 +79,7 @@
"duration" : 9011,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1422981779720,
"endTimeEpoch" : 1422981788731
@@ -88,6 +94,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1422981758277,
"endTimeEpoch" : 1422981766912
diff --git a/core/src/test/resources/HistoryServerExpectations/minDate_and_maxEndDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minDate_and_maxEndDate_app_list_json_expectation.json
index 24f9f21ec650..3ebe60e2cd03 100644
--- a/core/src/test/resources/HistoryServerExpectations/minDate_and_maxEndDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minDate_and_maxEndDate_app_list_json_expectation.json
@@ -9,6 +9,7 @@
"duration" : 57,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1430917380893,
"endTimeEpoch" : 1430917380950
@@ -20,6 +21,7 @@
"duration" : 10,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1430917380880,
"endTimeEpoch" : 1430917380890
@@ -35,6 +37,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1426633910242,
"endTimeEpoch" : 1426633945177
@@ -46,6 +49,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1426533910242,
"endTimeEpoch" : 1426533945177
diff --git a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index 1930281f1a3e..5af50abd8533 100644
--- a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -8,6 +8,7 @@
"duration" : 10671,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479335620587,
"startTimeEpoch" : 1479335609916,
"lastUpdatedEpoch" : 0
@@ -22,6 +23,7 @@
"duration" : 101795,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"endTimeEpoch" : 1479252138874,
"startTimeEpoch" : 1479252037079,
"lastUpdatedEpoch" : 0
@@ -36,6 +38,7 @@
"duration" : 10505,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917391398,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
@@ -51,6 +54,7 @@
"duration" : 57,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380950,
"startTimeEpoch" : 1430917380893,
"lastUpdatedEpoch" : 0
@@ -62,6 +66,7 @@
"duration" : 10,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"endTimeEpoch" : 1430917380890,
"startTimeEpoch" : 1430917380880,
"lastUpdatedEpoch" : 0
@@ -77,6 +82,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1426633945177,
"startTimeEpoch" : 1426633910242,
"lastUpdatedEpoch" : 0
@@ -88,6 +94,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1426533945177,
"startTimeEpoch" : 1426533910242,
"lastUpdatedEpoch" : 0
@@ -102,6 +109,7 @@
"duration" : 8635,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1425081766912,
"startTimeEpoch" : 1425081758277,
"lastUpdatedEpoch" : 0
diff --git a/core/src/test/resources/HistoryServerExpectations/minEndDate_and_maxEndDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minEndDate_and_maxEndDate_app_list_json_expectation.json
index 3745e8a09a98..74a7b40a5927 100644
--- a/core/src/test/resources/HistoryServerExpectations/minEndDate_and_maxEndDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minEndDate_and_maxEndDate_app_list_json_expectation.json
@@ -9,6 +9,7 @@
"duration" : 57,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1430917380893,
"endTimeEpoch" : 1430917380950
@@ -20,6 +21,7 @@
"duration" : 10,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1430917380880,
"endTimeEpoch" : 1430917380890
@@ -35,6 +37,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1426633910242,
"endTimeEpoch" : 1426633945177
@@ -46,6 +49,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1426533910242,
"endTimeEpoch" : 1426533945177
diff --git a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
index 05233db441ed..7f896c74b5be 100644
--- a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
@@ -8,6 +8,7 @@
"duration" : 10671,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"startTimeEpoch" : 1479335609916,
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1479335620587
@@ -22,6 +23,7 @@
"duration" : 101795,
"sparkUser" : "jose",
"completed" : true,
+ "appSparkVersion" : "2.1.0-SNAPSHOT",
"startTimeEpoch" : 1479252037079,
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1479252138874
@@ -36,6 +38,7 @@
"duration" : 10505,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1430917380893,
"endTimeEpoch" : 1430917391398
@@ -51,6 +54,7 @@
"duration" : 57,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1430917380893,
"endTimeEpoch" : 1430917380950
@@ -62,7 +66,7 @@
"duration" : 10,
"sparkUser" : "irashid",
"completed" : true,
- "completed" : true,
+ "appSparkVersion" : "1.4.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1430917380880,
"endTimeEpoch" : 1430917380890
diff --git a/core/src/test/resources/HistoryServerExpectations/one_app_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_app_json_expectation.json
index e8ed96dc85f0..24ec6a163fc2 100644
--- a/core/src/test/resources/HistoryServerExpectations/one_app_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/one_app_json_expectation.json
@@ -8,6 +8,7 @@
"duration" : 9011,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1422981788731,
"startTimeEpoch" : 1422981779720,
"lastUpdatedEpoch" : 0
diff --git a/core/src/test/resources/HistoryServerExpectations/one_app_multi_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_app_multi_attempt_json_expectation.json
index 88c601512d79..94b6d6dba76e 100644
--- a/core/src/test/resources/HistoryServerExpectations/one_app_multi_attempt_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/one_app_multi_attempt_json_expectation.json
@@ -9,6 +9,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1426633945177,
"startTimeEpoch" : 1426633910242,
"lastUpdatedEpoch" : 0
@@ -20,6 +21,7 @@
"duration" : 34935,
"sparkUser" : "irashid",
"completed" : true,
+ "appSparkVersion" : "",
"endTimeEpoch" : 1426533945177,
"startTimeEpoch" : 1426533910242,
"lastUpdatedEpoch" : 0
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index ee70a3399efe..48408ccc8f81 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -114,7 +114,7 @@ trait RDDCheckpointTester { self: SparkFunSuite =>
* RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
* the generated RDD will remember the partitions and therefore potentially the whole lineage.
* This function should be called only those RDD whose partitions refer to parent RDD's
- * partitions (i.e., do not call it on simple RDD like MappedRDD).
+ * partitions (i.e., do not call it on simple RDDs).
*
* @param op an operation to run on the RDD
* @param reliableCheckpoint if true, use reliable checkpoints, otherwise use local checkpoints
@@ -388,7 +388,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
// the parent RDD has been checkpointed and parent partitions have been changed.
// Note that this test is very specific to the current implementation of CartesianRDD.
val ones = sc.makeRDD(1 to 100, 10).map(x => x)
- checkpoint(ones, reliableCheckpoint) // checkpoint that MappedRDD
+ checkpoint(ones, reliableCheckpoint)
val cartesian = new CartesianRDD(sc, ones, ones)
val splitBeforeCheckpoint =
serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
@@ -411,7 +411,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
// Note that this test is very specific to the current implementation of
// CoalescedRDDPartitions.
val ones = sc.makeRDD(1 to 100, 10).map(x => x)
- checkpoint(ones, reliableCheckpoint) // checkpoint that MappedRDD
+ checkpoint(ones, reliableCheckpoint)
val coalesced = new CoalescedRDD(ones, 2)
val splitBeforeCheckpoint =
serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 58b865969f51..622f7985ba44 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.ShuffleWriter
-import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId}
+import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}
import org.apache.spark.util.{MutablePair, Utils}
abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext {
@@ -277,7 +277,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// Delete one of the local shuffle blocks.
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
- assert(hashFile.exists() || sortFile.exists())
+ val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
+ assert(hashFile.exists() || (sortFile.exists() && indexFile.exists()))
if (hashFile.exists()) {
hashFile.delete()
@@ -285,11 +286,36 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
if (sortFile.exists()) {
sortFile.delete()
}
+ if (indexFile.exists()) {
+ indexFile.delete()
+ }
// This count should retry the execution of the previous stage and rerun shuffle.
rdd.count()
}
+ test("cannot find its local shuffle file if no execution of the stage and rerun shuffle") {
+ sc = new SparkContext("local", "test", conf.clone())
+ val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)
+
+ // Cannot find one of the local shuffle blocks.
+ val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
+ val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
+ val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
+ assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists())
+
+ rdd.count()
+
+ // Can find one of the local shuffle blocks.
+ val hashExistsFile = sc.env.blockManager.diskBlockManager
+ .getFile(new ShuffleBlockId(0, 0, 0))
+ val sortExistsFile = sc.env.blockManager.diskBlockManager
+ .getFile(new ShuffleDataBlockId(0, 0, 0))
+ val indexExistsFile = sc.env.blockManager.diskBlockManager
+ .getFile(new ShuffleIndexBlockId(0, 0, 0))
+ assert(hashExistsFile.exists() || (sortExistsFile.exists() && indexExistsFile.exists()))
+ }
+
test("metrics for shuffle without aggregation") {
sc = new SparkContext("local", "test", conf.clone())
val numRecords = 10000
diff --git a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
index 7a897c2b4698..c0126e41ff7f 100644
--- a/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
@@ -38,6 +38,10 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
override def beforeAll() {
super.beforeAll()
+ // Once 'spark.local.dir' is set, it is cached. Unless this is manually cleared
+ // before/after a test, it could return the same directory even if this property
+ // is configured.
+ Utils.clearLocalRootDirs()
conf.set("spark.shuffle.manager", "sort")
}
@@ -50,6 +54,7 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
override def afterEach(): Unit = {
try {
Utils.deleteRecursively(tempDir)
+ Utils.clearLocalRootDirs()
} finally {
super.afterEach()
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 7e26139a2bea..27945a9a5ede 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
-import scala.concurrent.Await
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
@@ -35,7 +34,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.Matchers._
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
@@ -315,7 +314,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
sc.cancelJobGroup("nonExistGroupId")
- Await.ready(future, Duration(2, TimeUnit.SECONDS))
+ ThreadUtils.awaitReady(future, Duration(2, TimeUnit.SECONDS))
// In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
// SparkContext to shutdown, so the following assertion will fail.
diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
index f50cb38311db..42b8cde65039 100644
--- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
@@ -243,16 +243,22 @@ private[deploy] object IvyTestUtils {
withManifest: Option[Manifest] = None): File = {
val jarFile = new File(dir, artifactName(artifact, useIvyLayout))
val jarFileStream = new FileOutputStream(jarFile)
- val manifest = withManifest.getOrElse {
- val mani = new Manifest()
+ val manifest: Manifest = withManifest.getOrElse {
if (withR) {
+ val mani = new Manifest()
val attr = mani.getMainAttributes
attr.put(Name.MANIFEST_VERSION, "1.0")
attr.put(new Name("Spark-HasRPackage"), "true")
+ mani
+ } else {
+ null
}
- mani
}
- val jarStream = new JarOutputStream(jarFileStream, manifest)
+ val jarStream = if (manifest != null) {
+ new JarOutputStream(jarFileStream, manifest)
+ } else {
+ new JarOutputStream(jarFileStream)
+ }
for (file <- files) {
val jarEntry = new JarEntry(file._1)
diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index 005587051b6a..5e0bf6d438dc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -133,6 +133,16 @@ class RPackageUtilsSuite
}
}
+ test("jars without manifest return false") {
+ IvyTestUtils.withRepository(main, None, None) { repo =>
+ val jar = IvyTestUtils.packJar(new File(new URI(repo)), dep1, Nil,
+ useIvyLayout = false, withR = false, None)
+ val jarFile = new JarFile(jar)
+ assert(jarFile.getManifest == null, "jar file should have null manifest")
+ assert(!RPackageUtils.checkManifestForR(jarFile), "null manifest should return false")
+ }
+ }
+
test("SparkR zipping works properly") {
val tempDir = Files.createTempDir()
Utils.tryWithSafeFinally {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
index 7998e3702c12..871c87415d35 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -177,7 +177,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
ended: Long): SparkUI = {
val info = new ApplicationInfo(name, name, Some(1), Some(1), Some(1), Some(64),
Seq(new AttemptInfo(attemptId, new Date(started), new Date(ended),
- new Date(ended), ended - started, "user", completed)))
+ new Date(ended), ended - started, "user", completed, org.apache.spark.SPARK_VERSION)))
val ui = mock[SparkUI]
when(ui.getApplicationInfoList).thenReturn(List(info).iterator)
when(ui.getAppName).thenReturn(name)
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 456158d41b93..9b3e4ec79382 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -109,7 +109,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
user: String,
completed: Boolean): ApplicationHistoryInfo = {
ApplicationHistoryInfo(id, name,
- List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
+ List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed, "")))
}
// For completed files, lastUpdated would be lastModified time.
@@ -606,7 +606,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
if (isNewFormat) {
val newFormatStream = new FileOutputStream(file)
Utils.tryWithSafeFinally {
- EventLoggingListener.initEventLog(newFormatStream)
+ EventLoggingListener.initEventLog(newFormatStream, false, null)
} {
newFormatStream.close()
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 2127da48ece4..978249f102e3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -21,6 +21,7 @@ import java.util.Date
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps
@@ -34,7 +35,7 @@ import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy._
import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEnv}
class MasterSuite extends SparkFunSuite
with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
@@ -447,8 +448,15 @@ class MasterSuite extends SparkFunSuite
}
})
- master.self.send(
- RegisterWorker("1", "localhost", 9999, fakeWorker, 10, 1024, "http://localhost:8080"))
+ master.self.send(RegisterWorker(
+ "1",
+ "localhost",
+ 9999,
+ fakeWorker,
+ 10,
+ 1024,
+ "http://localhost:8080",
+ RpcAddress("localhost", 9999)))
val executors = (0 until 3).map { i =>
new ExecutorDescription(appId = i.toString, execId = i, 2, ExecutorState.RUNNING)
}
@@ -459,4 +467,136 @@ class MasterSuite extends SparkFunSuite
assert(killedDrivers.asScala.toList.sorted === List("0", "1", "2"))
}
}
+
+ test("SPARK-20529: Master should reply the address received from worker") {
+ val master = makeMaster()
+ master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+ eventually(timeout(10.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+ assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
+ }
+
+ @volatile var receivedMasterAddress: RpcAddress = null
+ val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = master.rpcEnv
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case RegisteredWorker(_, _, masterAddress) =>
+ receivedMasterAddress = masterAddress
+ }
+ })
+
+ master.self.send(RegisterWorker(
+ "1",
+ "localhost",
+ 9999,
+ fakeWorker,
+ 10,
+ 1024,
+ "http://localhost:8080",
+ RpcAddress("localhost2", 10000)))
+
+ eventually(timeout(10.seconds)) {
+ assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
+ }
+ }
+
+ test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
+ val conf = new SparkConf().set("spark.worker.timeout", "1")
+ val master = makeMaster(conf)
+ master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+ eventually(timeout(10.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+ assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
+ }
+
+ val app = DeployTestUtils.createAppDesc()
+ var appId = ""
+ val driverEnv1 = RpcEnv.create("driver1", "localhost", 22344, conf, new SecurityManager(conf))
+ val fakeDriver1 = driverEnv1.setupEndpoint("driver", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = driverEnv1
+ override def receive: PartialFunction[Any, Unit] = {
+ case RegisteredApplication(id, _) => appId = id
+ }
+ })
+ val drivers = new HashMap[String, String]
+ val workerEnv1 = RpcEnv.create("worker1", "localhost", 12344, conf, new SecurityManager(conf))
+ val fakeWorker1 = workerEnv1.setupEndpoint("worker", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = workerEnv1
+ override def receive: PartialFunction[Any, Unit] = {
+ case RegisteredWorker(masterRef, _, _) =>
+ masterRef.send(WorkerLatestState("1", Nil, drivers.keys.toSeq))
+ case LaunchDriver(id, desc) =>
+ drivers(id) = id
+ master.self.send(RegisterApplication(app, fakeDriver1))
+ case KillDriver(driverId) =>
+ master.self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
+ drivers.remove(driverId)
+ }
+ })
+ val worker1 = RegisterWorker(
+ "1",
+ "localhost",
+ 9999,
+ fakeWorker1,
+ 10,
+ 1024,
+ "http://localhost:8080",
+ RpcAddress("localhost2", 10000))
+ master.self.send(worker1)
+ val driver = DeployTestUtils.createDriverDesc().copy(supervise = true)
+ master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
+
+ eventually(timeout(10.seconds)) {
+ assert(!appId.isEmpty)
+ }
+
+ eventually(timeout(10.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+ assert(masterState.workers(0).state == WorkerState.DEAD)
+ }
+
+ val driverEnv2 = RpcEnv.create("driver2", "localhost", 22345, conf, new SecurityManager(conf))
+ val fakeDriver2 = driverEnv2.setupEndpoint("driver", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = driverEnv2
+ override def receive: PartialFunction[Any, Unit] = {
+ case RegisteredApplication(id, _) => appId = id
+ }
+ })
+ val workerEnv2 = RpcEnv.create("worker2", "localhost", 12345, conf, new SecurityManager(conf))
+ val fakeWorker2 = workerEnv2.setupEndpoint("worker2", new RpcEndpoint {
+ override val rpcEnv: RpcEnv = workerEnv2
+ override def receive: PartialFunction[Any, Unit] = {
+ case LaunchDriver(_, _) =>
+ master.self.send(RegisterApplication(app, fakeDriver2))
+ }
+ })
+
+ appId = ""
+ master.self.send(RegisterWorker(
+ "2",
+ "localhost",
+ 9998,
+ fakeWorker2,
+ 10,
+ 1024,
+ "http://localhost:8081",
+ RpcAddress("localhost2", 10001)))
+ eventually(timeout(10.seconds)) {
+ assert(!appId.isEmpty)
+ }
+
+ master.self.send(worker1)
+ eventually(timeout(10.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+
+ val worker = masterState.workers.filter(w => w.id == "1")
+ assert(worker.length == 1)
+ // make sure the `DriverStateChanged` arrives at Master.
+ assert(worker(0).drivers.isEmpty)
+ assert(masterState.activeDrivers.length == 1)
+ assert(masterState.activeDrivers(0).state == DriverState.RUNNING)
+ assert(masterState.activeApps.length == 1)
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index fe8955840d72..792a1d7f57e2 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -22,7 +22,7 @@ import java.nio._
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
-import scala.concurrent.{Await, Promise}
+import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
@@ -36,6 +36,7 @@ import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.storage.{BlockId, ShuffleBlockId}
+import org.apache.spark.util.ThreadUtils
class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar with ShouldMatchers {
test("security default off") {
@@ -166,7 +167,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
}
})
- Await.ready(promise.future, FiniteDuration(10, TimeUnit.SECONDS))
+ ThreadUtils.awaitReady(promise.future, FiniteDuration(10, TimeUnit.SECONDS))
promise.future.value.get
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 759d52fca5ce..3ec37f674c77 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -17,11 +17,15 @@
package org.apache.spark.scheduler
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
+
import scala.util.Random
+import org.mockito.Mockito._
import org.roaringbitmap.RoaringBitmap
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage.BlockManagerId
@@ -128,4 +132,26 @@ class MapStatusSuite extends SparkFunSuite {
assert(size1 === size2)
assert(!success)
}
+
+ test("Blocks which are bigger than SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be " +
+ "underestimated.") {
+ val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "1000")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+ // Value of element in sizes is equal to the corresponding index.
+ val sizes = (0L to 2000L).toArray
+ val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes)
+ val arrayStream = new ByteArrayOutputStream(102400)
+ val objectOutputStream = new ObjectOutputStream(arrayStream)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+ objectOutputStream.writeObject(status1)
+ objectOutputStream.flush()
+ val array = arrayStream.toByteArray
+ val objectInput = new ObjectInputStream(new ByteArrayInputStream(array))
+ val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus]
+ (1001 to 2000).foreach {
+ case part => assert(status2.getSizeForBlock(part) >= sizes(part))
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 8300607ea888..37b08980db87 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Future
import scala.concurrent.duration.{Duration, SECONDS}
import scala.language.existentials
import scala.reflect.ClassTag
@@ -260,7 +260,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
*/
def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = {
try {
- Await.ready(jobFuture, duration)
+ ThreadUtils.awaitReady(jobFuture, duration)
} catch {
case te: TimeoutException if backendException.get() != null =>
val msg = raw"""
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index f5575ce1e157..80c7e0bfee6e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -184,7 +184,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be {1}
val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get
- stageInfo2.rddInfos.size should be {3} // ParallelCollectionRDD, FilteredRDD, MappedRDD
+ stageInfo2.rddInfos.size should be {3}
stageInfo2.rddInfos.forall(_.numPartitions == 4) should be {true}
stageInfo2.rddInfos.exists(_.name == "Deux") should be {true}
listener.stageInfos.clear()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index b22da565d86e..992d3396d203 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -100,7 +100,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
context.addTaskCompletionListener(_ => throw new Exception("blah"))
intercept[TaskCompletionListenerException] {
- context.markTaskCompleted()
+ context.markTaskCompleted(None)
}
verify(listener, times(1)).onTaskCompletion(any())
@@ -231,10 +231,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("immediately call a completion listener if the context is completed") {
var invocations = 0
val context = TaskContext.empty()
- context.markTaskCompleted()
+ context.markTaskCompleted(None)
context.addTaskCompletionListener(_ => invocations += 1)
assert(invocations == 1)
- context.markTaskCompleted()
+ context.markTaskCompleted(None)
assert(invocations == 1)
}
@@ -254,6 +254,36 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
assert(lastError == error)
assert(invocations == 1)
}
+
+ test("TaskCompletionListenerException.getMessage should include previousError") {
+ val listenerErrorMessage = "exception in listener"
+ val taskErrorMessage = "exception in task"
+ val e = new TaskCompletionListenerException(
+ Seq(listenerErrorMessage),
+ Some(new RuntimeException(taskErrorMessage)))
+ assert(e.getMessage.contains(listenerErrorMessage) && e.getMessage.contains(taskErrorMessage))
+ }
+
+ test("all TaskCompletionListeners should be called even if some fail or a task") {
+ val context = TaskContext.empty()
+ val listener = mock(classOf[TaskCompletionListener])
+ context.addTaskCompletionListener(_ => throw new Exception("exception in listener1"))
+ context.addTaskCompletionListener(listener)
+ context.addTaskCompletionListener(_ => throw new Exception("exception in listener3"))
+
+ val e = intercept[TaskCompletionListenerException] {
+ context.markTaskCompleted(Some(new Exception("exception in task")))
+ }
+
+ // Make sure listener 2 was called.
+ verify(listener, times(1)).onTaskCompletion(any())
+
+ // also need to check failure in TaskCompletionListener does not mask earlier exception
+ assert(e.getMessage.contains("exception in listener1"))
+ assert(e.getMessage.contains("exception in listener3"))
+ assert(e.getMessage.contains("exception in task"))
+ }
+
}
private object TaskContextSuite {
diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
index 1bfb0c1547ec..82bd7c4ff660 100644
--- a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
@@ -31,7 +31,7 @@ class AllStagesResourceSuite extends SparkFunSuite {
val tasks = new LinkedHashMap[Long, TaskUIData]
taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
tasks(idx.toLong) = TaskUIData(
- new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None)
+ new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false))
}
val stageUiData = new StageUIData()
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index 89ed031b6fcd..f0c521b00b58 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.storage
+import java.util.UUID
+
import org.apache.spark.SparkFunSuite
class BlockIdSuite extends SparkFunSuite {
@@ -67,6 +69,32 @@ class BlockIdSuite extends SparkFunSuite {
assertSame(id, BlockId(id.toString))
}
+ test("shuffle data") {
+ val id = ShuffleDataBlockId(4, 5, 6)
+ assertSame(id, ShuffleDataBlockId(4, 5, 6))
+ assertDifferent(id, ShuffleDataBlockId(6, 5, 6))
+ assert(id.name === "shuffle_4_5_6.data")
+ assert(id.asRDDId === None)
+ assert(id.shuffleId === 4)
+ assert(id.mapId === 5)
+ assert(id.reduceId === 6)
+ assert(!id.isShuffle)
+ assertSame(id, BlockId(id.toString))
+ }
+
+ test("shuffle index") {
+ val id = ShuffleIndexBlockId(7, 8, 9)
+ assertSame(id, ShuffleIndexBlockId(7, 8, 9))
+ assertDifferent(id, ShuffleIndexBlockId(9, 8, 9))
+ assert(id.name === "shuffle_7_8_9.index")
+ assert(id.asRDDId === None)
+ assert(id.shuffleId === 7)
+ assert(id.mapId === 8)
+ assert(id.reduceId === 9)
+ assert(!id.isShuffle)
+ assertSame(id, BlockId(id.toString))
+ }
+
test("broadcast") {
val id = BroadcastBlockId(42)
assertSame(id, BroadcastBlockId(42))
@@ -101,6 +129,30 @@ class BlockIdSuite extends SparkFunSuite {
assertSame(id, BlockId(id.toString))
}
+ test("temp local") {
+ val id = TempLocalBlockId(new UUID(5, 2))
+ assertSame(id, TempLocalBlockId(new UUID(5, 2)))
+ assertDifferent(id, TempLocalBlockId(new UUID(5, 3)))
+ assert(id.name === "temp_local_00000000-0000-0005-0000-000000000002")
+ assert(id.asRDDId === None)
+ assert(id.isBroadcast === false)
+ assert(id.id.getMostSignificantBits() === 5)
+ assert(id.id.getLeastSignificantBits() === 2)
+ assert(!id.isShuffle)
+ }
+
+ test("temp shuffle") {
+ val id = TempShuffleBlockId(new UUID(1, 2))
+ assertSame(id, TempShuffleBlockId(new UUID(1, 2)))
+ assertDifferent(id, TempShuffleBlockId(new UUID(1, 3)))
+ assert(id.name === "temp_shuffle_00000000-0000-0001-0000-000000000002")
+ assert(id.asRDDId === None)
+ assert(id.isBroadcast === false)
+ assert(id.id.getMostSignificantBits() === 1)
+ assert(id.id.getLeastSignificantBits() === 2)
+ assert(!id.isShuffle)
+ }
+
test("test") {
val id = TestBlockId("abc")
assertSame(id, TestBlockId("abc"))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 1b325801e27f..917db766f7f1 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -152,7 +152,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
// one should acquire the write lock. The second thread should block until the winner of the
// write race releases its lock.
val winningFuture: Future[Boolean] =
- Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
+ ThreadUtils.awaitReady(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
assert(winningFuture.value.get.get)
val winningTID = blockInfoManager.get("block").get.writerTask
assert(winningTID === 1 || winningTID === 2)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a8b960489983..1e7bcdb6740f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -496,8 +496,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
}
- test("optimize a location order of blocks") {
- val localHost = Utils.localHostName()
+ test("optimize a location order of blocks without topology information") {
+ val localHost = "localhost"
val otherHost = "otherHost"
val bmMaster = mock(classOf[BlockManagerMaster])
val bmId1 = BlockManagerId("id1", localHost, 1)
@@ -508,7 +508,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockManager = makeBlockManager(128, "exec", bmMaster)
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
- assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
+ assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
+ }
+
+ test("optimize a location order of blocks with topology information") {
+ val localHost = "localhost"
+ val otherHost = "otherHost"
+ val localRack = "localRack"
+ val otherRack = "otherRack"
+
+ val bmMaster = mock(classOf[BlockManagerMaster])
+ val bmId1 = BlockManagerId("id1", localHost, 1, Some(localRack))
+ val bmId2 = BlockManagerId("id2", localHost, 2, Some(localRack))
+ val bmId3 = BlockManagerId("id3", otherHost, 3, Some(otherRack))
+ val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack))
+ val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack))
+ when(bmMaster.getLocations(mc.any[BlockId]))
+ .thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4))
+
+ val blockManager = makeBlockManager(128, "exec", bmMaster)
+ blockManager.blockManagerId =
+ BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack))
+ val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
+ val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
+ assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost))
+ assert(locations.flatMap(_.topologyInfo)
+ === Seq(localRack, localRack, localRack, otherRack, otherRack))
}
test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index bbfd6df3b699..7859b0bba2b4 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.storage
import java.io.{File, FileWriter}
-import scala.language.reflectiveCalls
-
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.spark.{SparkConf, SparkFunSuite}
diff --git a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
index 3050f9a25023..535105379963 100644
--- a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
@@ -145,7 +145,7 @@ class PartiallySerializedBlockSuite
try {
TaskContext.setTaskContext(TaskContext.empty())
val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2)
- TaskContext.get().asInstanceOf[TaskContextImpl].markTaskCompleted()
+ TaskContext.get().asInstanceOf[TaskContextImpl].markTaskCompleted(None)
Mockito.verify(partiallySerializedBlock.getUnrolledChunkedByteBuffer).dispose()
Mockito.verifyNoMoreInteractions(memoryStore)
} finally {
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index e56e440380a5..9900d1edc4cb 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -192,7 +192,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
// Complete the task; then the 2nd block buffer should be exhausted
verify(blocks(ShuffleBlockId(0, 1, 0)), times(0)).release()
- taskContext.markTaskCompleted()
+ taskContext.markTaskCompleted(None)
verify(blocks(ShuffleBlockId(0, 1, 0)), times(1)).release()
// The 3rd block should not be retained because the iterator is already in zombie state
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index bdd148875e38..267c8dc1bd75 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -320,12 +320,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)")
- // Ideally, the following test would pass, but currently we overcount completed tasks
- // if task recomputations occur:
- // find(cssSelector(".progress-cell .progress")).get.text should be ("2/2 (1 failed)")
- // Instead, we guarantee that the total number of tasks is always correct, while the number
- // of completed tasks may be higher:
- find(cssSelector(".progress-cell .progress")).get.text should be ("3/2 (1 failed)")
+ find(cssSelector(".progress-cell .progress")).get.text should be ("2/2 (1 failed)")
}
val jobJson = getJson(sc.ui.get, "jobs")
(jobJson \ "numTasks").extract[Int]should be (2)
diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
index c770fd5da76f..423daacc0f5a 100644
--- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala
@@ -133,6 +133,45 @@ class UIUtilsSuite extends SparkFunSuite {
assert(decoded2 === decodeURLParameter(decoded2))
}
+ test("SPARK-20393: Prevent newline characters in parameters.") {
+ val encoding = "Encoding:base64%0d%0a%0d%0aPGh0bWw%2bjcmlwdD48L2h0bWw%2b"
+ val stripEncoding = "Encoding:base64PGh0bWw%2bjcmlwdD48L2h0bWw%2b"
+
+ assert(stripEncoding === stripXSS(encoding))
+ }
+
+ test("SPARK-20393: Prevent script from parameters running on page.") {
+ val scriptAlert = """>"'>