Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2775,6 +2775,13 @@ object SparkContext extends Logging {
private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"

// just used to record the temporary output directory of HDFS or HIVE
private[spark] val MAPREDUCE_OUTPUT_FILEOUTPUTFORMAT_OUTPUTDIR =
"mapreduce.output.fileoutputformat.outputdir"
private[spark] val MAPREDUCE_JOB_APPLICATION_ATTEMPT_ID =
"mapreduce.job.application.attempt.id"


/**
* Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was
* changed to `driver` because the angle brackets caused escaping issues in URLs and XML (see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ TaskAttemptContext => NewTaskAttemptContext, TaskAttemptID => NewTaskAttemptID,
import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemptContextImpl}

import org.apache.spark.{SerializableWritable, SparkConf, SparkException, TaskContext}
import org.apache.spark.SparkContext.{MAPREDUCE_JOB_APPLICATION_ATTEMPT_ID, MAPREDUCE_OUTPUT_FILEOUTPUTFORMAT_OUTPUTDIR}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand Down Expand Up @@ -78,6 +79,13 @@ object SparkHadoopWriter extends Logging {
val committer = config.createCommitter(commitJobId)
committer.setupJob(jobContext)

rdd.context.setLocalProperty(MAPREDUCE_OUTPUT_FILEOUTPUTFORMAT_OUTPUTDIR,
jobContext.getConfiguration().get(MAPREDUCE_OUTPUT_FILEOUTPUTFORMAT_OUTPUTDIR))
rdd.context.setLocalProperty(MAPREDUCE_JOB_APPLICATION_ATTEMPT_ID,
jobContext.getConfiguration().getInt(MAPREDUCE_JOB_APPLICATION_ATTEMPT_ID, 0).toString)

rdd.setResultStageAllowToRetry(true)

// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ private[spark] class ApproximateActionListener[T, U, R](
}
}

override def stageFailed(): Unit = {
finishedTasks = 0
}

override def jobFailed(exception: Exception): Unit = {
synchronized {
failure = Some(exception)
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.{AccumulatorV2, BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap,
Utils => collectionUtils}
import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler,
Expand Down Expand Up @@ -2084,6 +2084,21 @@ abstract class RDD[T: ClassTag](
}
}

private[spark] var isResultStageRetryAllowed = false

private[spark] def setResultStageAllowToRetry(isRetryAllowed: Boolean): Unit = {
isResultStageRetryAllowed = isRetryAllowed
}

private[spark] var totalNumRowsAccumulator: Option[AccumulatorV2[_, _]] = None

private[spark] def reset(): Unit = {
totalNumRowsAccumulator match {
case Some(accumulatorV2) => accumulatorV2.reset()
case _ =>
}
}

}


Expand Down
84 changes: 79 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.io.{IOException, NotSerializableException}
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeoutException, TimeUnit }
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -30,8 +30,10 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal

import com.google.common.util.concurrent.{Futures, SettableFuture}
import org.apache.hadoop.fs.Path

import org.apache.spark._
import org.apache.spark.SparkContext.{MAPREDUCE_JOB_APPLICATION_ATTEMPT_ID, MAPREDUCE_OUTPUT_FILEOUTPUTFORMAT_OUTPUTDIR}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
Expand Down Expand Up @@ -416,6 +418,67 @@ private[spark] class DAGScheduler(
cacheLocs.clear()
}

private def unregisterAllResultOutput(rs: ResultStage): Unit = {
// cleanup finished partitions
rs.activeJob.get.resetAllPartitions()
// cleanup job listener state
rs.activeJob.get.listener.stageFailed()
// cleanup stage commit messages
outputCommitCoordinator.stageEnd(rs.id)
// cleanup temp directory for writing to hive tables/hdfs
cleanupJobAttemptPath()
// cleanup accumulator state for datasource v2 commands
rs.rdd.reset
}

private def cleanupJobAttemptPath(): Unit = {
val outputDir = sc.getLocalProperty(MAPREDUCE_OUTPUT_FILEOUTPUTFORMAT_OUTPUTDIR)
val jobAttemptId = sc.getLocalProperty(MAPREDUCE_JOB_APPLICATION_ATTEMPT_ID)

if (outputDir != null && outputDir.nonEmpty) {
val jobAttemptPath = new Path(getPendingJobAttemptsPath(new Path(outputDir)),
String.valueOf(jobAttemptId))

val fs = jobAttemptPath.getFileSystem(sc.hadoopConfiguration)

if (fs.exists(jobAttemptPath)) {
var attempts = 0
val maxAttempts = 10
while (!fs.delete(jobAttemptPath, true)) {
attempts += 1
if (attempts > maxAttempts) {
throw new IOException(s"Job attempt dir: ${jobAttemptPath.getName} " +
s"fail to be deleted after $maxAttempts attempts!")
}
logWarning(s"Job attempt dir: ${jobAttemptPath.getName} " +
s"fail to be deleted at the ${attempts}th retry, not reach the max: $maxAttempts yet," +
s" will retry again in 1000 ms")
Thread.sleep(1000)
}
attempts = 0
while (!fs.mkdirs(jobAttemptPath)) {
attempts += 1
if (attempts > maxAttempts) {
throw new IOException(s"Job attempt dir: ${jobAttemptPath.getName} " +
s"fail to be recreate after $maxAttempts attempts!")
}
logWarning(s"Job attempt dir: ${jobAttemptPath.getName} " +
s"fail to be recreate at the ${attempts}th retry, " +
s"not reach the max: $maxAttempts yet, will retry again in 1000 ms")
Thread.sleep(1000)
}
logInfo(s"Job attempt dir: ${jobAttemptPath.getName} has be cleaned")
} else {
logInfo(s"Job attempt dir: ${jobAttemptPath.getName} does not exist " +
s"and does not need to be cleaned")
}
}
}

def getPendingJobAttemptsPath(out: Path): Path = {
new Path(out, "_temporary")
}

/**
* Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
* shuffle map stage doesn't already exist, this method will create the shuffle map stage in
Expand Down Expand Up @@ -1964,8 +2027,9 @@ private[spark] class DAGScheduler(
def generateErrorMessage(stage: Stage): String = {
"A shuffle map stage with indeterminate output was failed and retried. " +
s"However, Spark cannot rollback the $stage to re-process the input data, " +
"and has to fail this job. Please eliminate the indeterminacy by " +
"checkpointing the RDD before repartition and try again."
"and has to fail this job In the scenario of writing to database. " +
"Please eliminate the indeterminacy by checkpointing the RDD " +
"before repartition and try again."
}

activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil))
Expand All @@ -1991,8 +2055,18 @@ private[spark] class DAGScheduler(
case resultStage: ResultStage if resultStage.activeJob.isDefined =>
val numMissingPartitions = resultStage.findMissingPartitions().length
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
if (resultStage.rdd.isResultStageRetryAllowed) {
rollingBackStages += resultStage
// FetchFailed from a indeterminate mapStage,
// so the result stage should be reran all tasks.
// if FetchFailed from a determinate mapStage,
// the result stage should not be rollback all partitions
unregisterAllResultOutput(resultStage)
} else {
// TODO: support to rollback result tasks
// in the scenario of writing to database.
abortStage(resultStage, generateErrorMessage(resultStage), None)
}
}

case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ package org.apache.spark.scheduler
*/
private[spark] trait JobListener {
def taskSucceeded(index: Int, result: Any): Unit
def stageFailed(): Unit
def jobFailed(exception: Exception): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ private[spark] class JobWaiter[T](
}
}

override def stageFailed(): Unit = {
finishedTasks.getAndSet(0)
}

override def jobFailed(exception: Exception): Unit = {
if (!jobPromise.tryFailure(exception)) {
logWarning("Ignore failure", exception)
Expand Down
Loading