diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2fef447b0a3c..5a9c4dd4e743 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -798,6 +798,18 @@ private[spark] class TaskSetManager( } } maybeFinishTaskSet() + + // kill running task if stage failed + if (reason.isInstanceOf[FetchFailed]) { + killTasks(runningTasksSet, taskInfos) + } + } + + def killTasks(tasks: HashSet[Long], taskInfo: HashMap[Long, TaskInfo]): Unit = { + tasks.foreach { task => + val executorId = taskInfo(task).executorId + sched.sc.schedulerBackend.killTask(task, executorId, true) + } } def abort(message: String, exception: Option[Throwable] = None): Unit = sched.synchronized {