Skip to content

Commit 4d75022

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into gen_join
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
2 parents c1c0588 + be5dd88 commit 4d75022

File tree

180 files changed

+4976
-1803
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

180 files changed

+4976
-1803
lines changed

.rat-excludes

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ logs
7373
.*dependency-reduced-pom.xml
7474
known_translations
7575
json_expectation
76-
local-1422981759269/*
77-
local-1422981780767/*
78-
local-1425081759269/*
79-
local-1426533911241/*
80-
local-1426633911242/*
81-
local-1430917381534/*
76+
local-1422981759269
77+
local-1422981780767
78+
local-1425081759269
79+
local-1426533911241
80+
local-1426633911242
81+
local-1430917381534
8282
local-1430917381535_1
8383
local-1430917381535_2
8484
DESCRIPTION

assembly/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent_2.10</artifactId>
23+
<artifactId>spark-parent_2.11</artifactId>
2424
<version>2.0.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

2828
<groupId>org.apache.spark</groupId>
29-
<artifactId>spark-assembly_2.10</artifactId>
29+
<artifactId>spark-assembly_2.11</artifactId>
3030
<name>Spark Project Assembly</name>
3131
<url>http://spark.apache.org/</url>
3232
<packaging>pom</packaging>

common/sketch/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
<modelVersion>4.0.0</modelVersion>
2222
<parent>
2323
<groupId>org.apache.spark</groupId>
24-
<artifactId>spark-parent_2.10</artifactId>
24+
<artifactId>spark-parent_2.11</artifactId>
2525
<version>2.0.0-SNAPSHOT</version>
2626
<relativePath>../../pom.xml</relativePath>
2727
</parent>
2828

2929
<groupId>org.apache.spark</groupId>
30-
<artifactId>spark-sketch_2.10</artifactId>
30+
<artifactId>spark-sketch_2.11</artifactId>
3131
<packaging>jar</packaging>
3232
<name>Spark Project Sketch</name>
3333
<url>http://spark.apache.org/</url>

core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent_2.10</artifactId>
23+
<artifactId>spark-parent_2.11</artifactId>
2424
<version>2.0.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

2828
<groupId>org.apache.spark</groupId>
29-
<artifactId>spark-core_2.10</artifactId>
29+
<artifactId>spark-core_2.11</artifactId>
3030
<properties>
3131
<sbt.project.name>core</sbt.project.name>
3232
</properties>

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 8 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
248248
val logInfos: Seq[FileStatus] = statusList
249249
.filter { entry =>
250250
try {
251-
getModificationTime(entry).map { time =>
252-
time >= lastScanTime
253-
}.getOrElse(false)
251+
!entry.isDirectory() && (entry.getModificationTime() >= lastScanTime)
254252
} catch {
255253
case e: AccessControlException =>
256254
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -261,9 +259,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
261259
}
262260
.flatMap { entry => Some(entry) }
263261
.sortWith { case (entry1, entry2) =>
264-
val mod1 = getModificationTime(entry1).getOrElse(-1L)
265-
val mod2 = getModificationTime(entry2).getOrElse(-1L)
266-
mod1 >= mod2
262+
entry1.getModificationTime() >= entry2.getModificationTime()
267263
}
268264

269265
logInfos.grouped(20)
@@ -341,19 +337,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
341337
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
342338
}.foreach { attempt =>
343339
val logPath = new Path(logDir, attempt.logPath)
344-
// If this is a legacy directory, then add the directory to the zipStream and add
345-
// each file to that directory.
346-
if (isLegacyLogDirectory(fs.getFileStatus(logPath))) {
347-
val files = fs.listStatus(logPath)
348-
zipStream.putNextEntry(new ZipEntry(attempt.logPath + "/"))
349-
zipStream.closeEntry()
350-
files.foreach { file =>
351-
val path = file.getPath
352-
zipFileToStream(path, attempt.logPath + Path.SEPARATOR + path.getName, zipStream)
353-
}
354-
} else {
355-
zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
356-
}
340+
zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
357341
}
358342
} finally {
359343
zipStream.close()
@@ -527,30 +511,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
527511
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
528512
val logPath = eventLog.getPath()
529513
logInfo(s"Replaying log path: $logPath")
530-
val logInput =
531-
if (isLegacyLogDirectory(eventLog)) {
532-
openLegacyEventLog(logPath)
533-
} else {
534-
EventLoggingListener.openEventLog(logPath, fs)
535-
}
514+
val logInput = EventLoggingListener.openEventLog(logPath, fs)
536515
try {
537516
val appListener = new ApplicationEventListener
538517
val appCompleted = isApplicationCompleted(eventLog)
539518
bus.addListener(appListener)
540519
bus.replay(logInput, logPath.toString, !appCompleted)
541520

542521
// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
543-
// try to show their UI. Some old versions of Spark generate logs without an app ID, so let
544-
// logs generated by those versions go through.
545-
if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
522+
// try to show their UI.
523+
if (appListener.appId.isDefined) {
546524
Some(new FsApplicationAttemptInfo(
547525
logPath.getName(),
548526
appListener.appName.getOrElse(NOT_STARTED),
549527
appListener.appId.getOrElse(logPath.getName()),
550528
appListener.appAttemptId,
551529
appListener.startTime.getOrElse(-1L),
552530
appListener.endTime.getOrElse(-1L),
553-
getModificationTime(eventLog).get,
531+
eventLog.getModificationTime(),
554532
appListener.sparkUser.getOrElse(NOT_STARTED),
555533
appCompleted))
556534
} else {
@@ -561,91 +539,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
561539
}
562540
}
563541

564-
/**
565-
* Loads a legacy log directory. This assumes that the log directory contains a single event
566-
* log file (along with other metadata files), which is the case for directories generated by
567-
* the code in previous releases.
568-
*
569-
* @return input stream that holds one JSON record per line.
570-
*/
571-
private[history] def openLegacyEventLog(dir: Path): InputStream = {
572-
val children = fs.listStatus(dir)
573-
var eventLogPath: Path = null
574-
var codecName: Option[String] = None
575-
576-
children.foreach { child =>
577-
child.getPath().getName() match {
578-
case name if name.startsWith(LOG_PREFIX) =>
579-
eventLogPath = child.getPath()
580-
case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
581-
codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
582-
case _ =>
583-
}
584-
}
585-
586-
if (eventLogPath == null) {
587-
throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
588-
}
589-
590-
val codec = try {
591-
codecName.map { c => CompressionCodec.createCodec(conf, c) }
592-
} catch {
593-
case e: Exception =>
594-
throw new IllegalArgumentException(s"Unknown compression codec $codecName.")
595-
}
596-
597-
val in = new BufferedInputStream(fs.open(eventLogPath))
598-
codec.map(_.compressedInputStream(in)).getOrElse(in)
599-
}
600-
601-
/**
602-
* Return whether the specified event log path contains a old directory-based event log.
603-
* Previously, the event log of an application comprises of multiple files in a directory.
604-
* As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
605-
* See SPARK-2261 for more detail.
606-
*/
607-
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory
608-
609-
/**
610-
* Returns the modification time of the given event log. If the status points at an empty
611-
* directory, `None` is returned, indicating that there isn't an event log at that location.
612-
*/
613-
private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
614-
if (isLegacyLogDirectory(fsEntry)) {
615-
val statusList = fs.listStatus(fsEntry.getPath)
616-
if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
617-
} else {
618-
Some(fsEntry.getModificationTime())
619-
}
620-
}
621-
622542
/**
623543
* Return true when the application has completed.
624544
*/
625545
private def isApplicationCompleted(entry: FileStatus): Boolean = {
626-
if (isLegacyLogDirectory(entry)) {
627-
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
628-
} else {
629-
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
630-
}
631-
}
632-
633-
/**
634-
* Returns whether the version of Spark that generated logs records app IDs. App IDs were added
635-
* in Spark 1.1.
636-
*/
637-
private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
638-
if (isLegacyLogDirectory(entry)) {
639-
fs.listStatus(entry.getPath())
640-
.find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
641-
.map { status =>
642-
val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
643-
version != "1.0" && version != "1.1"
644-
}
645-
.getOrElse(true)
646-
} else {
647-
true
648-
}
546+
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
649547
}
650548

651549
/**
@@ -670,12 +568,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
670568

671569
private[history] object FsHistoryProvider {
672570
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
673-
674-
// Constants used to parse Spark 1.0.0 log directories.
675-
val LOG_PREFIX = "EVENT_LOG_"
676-
val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
677-
val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
678-
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
679571
}
680572

681573
private class FsApplicationAttemptInfo(

core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[mesos] class MesosClusterDispatcher(
5050
extends Logging {
5151

5252
private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
53-
private val recoveryMode = conf.get("spark.mesos.deploy.recoveryMode", "NONE").toUpperCase()
53+
private val recoveryMode = conf.get("spark.deploy.recoveryMode", "NONE").toUpperCase()
5454
logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
5555

5656
private val engineFactory = recoveryMode match {
@@ -98,8 +98,8 @@ private[mesos] object MesosClusterDispatcher extends Logging {
9898
conf.setMaster(dispatcherArgs.masterUrl)
9999
conf.setAppName(dispatcherArgs.name)
100100
dispatcherArgs.zookeeperUrl.foreach { z =>
101-
conf.set("spark.mesos.deploy.recoveryMode", "ZOOKEEPER")
102-
conf.set("spark.mesos.deploy.zookeeper.url", z)
101+
conf.set("spark.deploy.recoveryMode", "ZOOKEEPER")
102+
conf.set("spark.deploy.zookeeper.url", z)
103103
}
104104
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
105105
dispatcher.start()

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,6 +1542,15 @@ abstract class RDD[T: ClassTag](
15421542

15431543
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
15441544

1545+
// Whether to checkpoint all ancestor RDDs that are marked for checkpointing. By default,
1546+
// we stop as soon as we find the first such RDD, an optimization that allows us to write
1547+
// less data but is not safe for all workloads. E.g. in streaming we may checkpoint both
1548+
// an RDD and its parent in every batch, in which case the parent may never be checkpointed
1549+
// and its lineage never truncated, leading to OOMs in the long run (SPARK-6847).
1550+
private val checkpointAllMarkedAncestors =
1551+
Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS))
1552+
.map(_.toBoolean).getOrElse(false)
1553+
15451554
/** Returns the first parent RDD */
15461555
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
15471556
dependencies.head.rdd.asInstanceOf[RDD[U]]
@@ -1585,6 +1594,13 @@ abstract class RDD[T: ClassTag](
15851594
if (!doCheckpointCalled) {
15861595
doCheckpointCalled = true
15871596
if (checkpointData.isDefined) {
1597+
if (checkpointAllMarkedAncestors) {
1598+
// TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
1599+
// them in parallel.
1600+
// Checkpoint parents first because our lineage will be truncated after we
1601+
// checkpoint ourselves
1602+
dependencies.foreach(_.rdd.doCheckpoint())
1603+
}
15881604
checkpointData.get.checkpoint()
15891605
} else {
15901606
dependencies.foreach(_.rdd.doCheckpoint())
@@ -1704,6 +1720,9 @@ abstract class RDD[T: ClassTag](
17041720
*/
17051721
object RDD {
17061722

1723+
private[spark] val CHECKPOINT_ALL_MARKED_ANCESTORS =
1724+
"spark.checkpoint.checkpointAllMarkedAncestors"
1725+
17071726
// The following implicit functions were in SparkContext before 1.3 and users had to
17081727
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
17091728
// them automatically. However, we still keep the old functions in SparkContext for backward

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,6 @@ private[spark] object EventLoggingListener extends Logging {
232232
// Suffix applied to the names of files still being written by applications.
233233
val IN_PROGRESS = ".inprogress"
234234
val DEFAULT_LOG_DIR = "/tmp/spark-events"
235-
val SPARK_VERSION_KEY = "SPARK_VERSION"
236-
val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC"
237235

238236
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
239237

core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ class StatsReportListener extends SparkListener with Logging {
270270

271271
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
272272
implicit val sc = stageCompleted
273-
this.logInfo("Finished stage: " + stageCompleted.stageInfo)
273+
this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}")
274274
showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics)
275275

276276
// Shuffle write
@@ -297,6 +297,17 @@ class StatsReportListener extends SparkListener with Logging {
297297
taskInfoMetrics.clear()
298298
}
299299

300+
private def getStatusDetail(info: StageInfo): String = {
301+
val failureReason = info.failureReason.map("(" + _ + ")").getOrElse("")
302+
val timeTaken = info.submissionTime.map(
303+
x => info.completionTime.getOrElse(System.currentTimeMillis()) - x
304+
).getOrElse("-")
305+
306+
s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " +
307+
s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " +
308+
s"Took: $timeTaken msec"
309+
}
310+
300311
}
301312

302313
private[spark] object StatsReportListener extends Logging {

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ private[spark] class CoarseMesosSchedulerBackend(
179179
.orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
180180

181181
if (uri.isEmpty) {
182-
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
182+
val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
183183
command.setValue(
184184
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
185185
.format(prefixEnv, runScript) +

0 commit comments

Comments
 (0)