Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,28 @@ private[spark] class Executor(
tr.kill(killMark._1, killMark._2)
killMarks.remove(taskId)
}
threadPool.execute(tr)
try {
threadPool.execute(tr)
} catch {
case t: Throwable =>
try {
logError(log"Executor launch task ${MDC(TASK_NAME, taskDescription.name)} failed," +
log" reason: ${MDC(REASON, t.getMessage)}")
context.statusUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so any place that invokes context.statusUpdate needs to add try-catch to crash the executor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 means that if the RPC requests also fail due to OutOfMemoryError (OOM), exiting the executor can trigger corresponding task rerun to avoid getting stuck.
In my PR's scenario, this might be necessary in a special case. However, if the task is already running, I think it's unnecessary.

taskDescription.taskId,
TaskState.FAILED,
env.closureSerializer.newInstance().serialize(new ExceptionFailure(t, Seq.empty)))
} catch {
case oom: OutOfMemoryError =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always skeptical to special cases, why OOM error is special here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need @Ngone51 answer your question, there's no need to handle this separately for me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why OOM error is special here?

It's very likely to hit the OOM error again given that we're already in the OOM situation. Therefore, in the case of OOM, we should not expect the following operation could be always succesful. The special catch for the OutOfMemoryError is just for logging an exact error code when the error raises. This follow the behavior of SparkUncaughtExceptionHandler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, can we add a code comment to explain it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we expecting context.statusUpdate to throw OOM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we expecting context.statusUpdate to throw OOM?

When sending RPC, if client is not created, need to create new connect task, may need create a new thread, can throw OOM since can't create thread. but always this case won't happen since client with driver should always be already created

Copy link
Contributor

@mridulm mridulm Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @cloud-fan - special handling, especially of OOM, is not very robust.
This is one of the reasons why OnOutOfMemoryError is set for YARN - better to fail the executor than get it into unpredictable states.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to make a followup pr? cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please!

logError(log"Executor update launching task ${MDC(TASK_NAME, taskDescription.name)} " +
log"failed status failed, reason: ${MDC(REASON, oom.getMessage)}")
System.exit(SparkExitCode.OOM)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any other special exit code for certain exceptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only meet the case is create thread failed, then throw OutOfMemoryError

case t: Throwable =>
logError(log"Executor update launching task ${MDC(TASK_NAME, taskDescription.name)} " +
log"failed status failed, reason: ${MDC(REASON, t.getMessage)}")
System.exit(-1)
}
}
if (decommissioned) {
log.error(s"Launching a task while in decommissioned state.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler
import java.net.URL
import java.nio.ByteBuffer
import java.util.{HashMap, Properties}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.{CountDownLatch, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.immutable
Expand Down Expand Up @@ -603,6 +603,54 @@ class ExecutorSuite extends SparkFunSuite
}
}

test("SPARK-54087: launchTask should return task killed message when threadPool.execute fails") {
val conf = new SparkConf
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
val serializedTask = serializer.newInstance().serialize(new FakeTask(0, 0))
val taskDescription = createFakeTaskDescription(serializedTask)

val mockExecutorBackend = mock[ExecutorBackend]
val statusCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])

withExecutor("id", "localhost", env) { executor =>
// Use reflection to replace threadPool with a mock that throws an exception
val executorClass = classOf[Executor]
val threadPoolField = executorClass.getDeclaredField("threadPool")
threadPoolField.setAccessible(true)
val originalThreadPool = threadPoolField.get(executor).asInstanceOf[ThreadPoolExecutor]

// Create a mock ThreadPoolExecutor that throws an exception when execute is called
val mockThreadPool = mock[ThreadPoolExecutor]
val testException = new OutOfMemoryError("unable to create new native thread")
when(mockThreadPool.execute(any[Runnable])).thenThrow(testException)
threadPoolField.set(executor, mockThreadPool)

try {
// Launch the task - this should catch the exception and send statusUpdate
executor.launchTask(mockExecutorBackend, taskDescription)

// Verify that statusUpdate was called with FAILED state
verify(mockExecutorBackend).statusUpdate(
meq(taskDescription.taskId),
meq(TaskState.FAILED),
statusCaptor.capture()
)

// Verify that the exception was correctly serialized
val failureData = statusCaptor.getValue
val failReason = serializer.newInstance()
.deserialize[ExceptionFailure](failureData)
assert(failReason.exception.isDefined)
assert(failReason.exception.get.isInstanceOf[OutOfMemoryError])
assert(failReason.exception.get.getMessage === "unable to create new native thread")
} finally {
// Restore the original threadPool
threadPoolField.set(executor, originalThreadPool)
}
}
}

private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = {
val mockEnv = mock[SparkEnv]
val mockRpcEnv = mock[RpcEnv]
Expand Down