Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Oct 30, 2025

What changes were proposed in this pull request?

image

When not enable speculation ,executor launch task failed caused by OOM,
image

DAGScheduler won't know task failed and won't scheduler again, causing application stuck

image-2025-06-23-12-13-10-770

Why are the changes needed?

Some case will causing application stuck missing SLA

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT + MT

Test code
image

import org.apache.spark.sql.functions.udf
import java.util.concurrent.TimeUnit// 定义一个会sleep的UDF

val sleepUdf = udf((value: String) => {
  TimeUnit.SECONDS.sleep(10)
  value
})

val df = spark.range(0, 2500).repartition(25).withColumn("value", (col("id") % 10).cast("string")).withColumn("value", sleepUdf(col("value")))df.createOrReplaceTempView("test_table")

val result = spark.sql("SELECT value, COUNT(*) as cnt FROM test_table GROUP BY value ORDER BY cnt DESC")

result.show()
image-2025-07-23-14-44-53-911 image-2025-07-23-14-45-12-522

task failed and re-scheduled

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Oct 30, 2025
@AngersZhuuuu
Copy link
Contributor Author

@holdenk @cloud-fan How about this issue?

@holdenk
Copy link
Contributor

holdenk commented Nov 3, 2025

Try and make tests part of a test suite not just manual when you can.

@cloud-fan
Copy link
Contributor

application stuck is a serious issue, can we constuct a test case?

cc @jiangxb1987 @Ngone51 .

try {
threadPool.execute(tr)
} catch {
case e: Throwable =>
Copy link
Member

@Ngone51 Ngone51 Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of the uncontrolled OutOfMemoryError, I don't think we can guarantee the status update always be sent successfully. I actually would prefer shutdown the executor itself with an explicitly error code, e.g., SparkExitCode.OOM in the case of OutOfMemoryError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean catch the error, is it;s OutOfMemoryError then shutdown the executor itself with SparkExitCode.OOM error code, how about other error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Other errors can be handled in the current way.

Copy link
Member

@Ngone51 Ngone51 Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can mimic SparkUncaughtExceptionHandler. In the case of OutOfMemoryError, we can try to sent the status update firstly. If OutOfMemoryError is thrown again, we fail the executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UncaughtExceptionHandler seems can't catch create thread failed error, it only can handle exception after thread running

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean to use UncaughtExceptionHandler instead. I mean we can follow the way how UncaughtExceptionHandler handles the OOM error here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like current?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could do:

try {
  threadPool.execute(tr)
} catch {
  case t: Throawble =>
    try {
      log.error(s"Execute task ${taskDescription.taskId} failed", e.getCause)
      context.statusUpdate(
         taskDescription.taskId,
         TaskState.FAILED,
          env.closureSerializer.newInstance().serialize(
            new ExceptionFailure(e, Seq.empty)))
    } catch {
       case  oom: OutOfMemoryError =>
          logError("...")
          System.exit(SparkExitCode.OOM)
       case t: Throwable =>
           logError("...")
           System.exit(-1)
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case t: Throwable =>
try {
logError(log"Executor launch task ${MDC(TASK_NAME, taskDescription.name)} failed," +
log" reason: ${MDC(REASON, t.getCause)}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCause could return null? Is there a way to print full stacks of the error?

System.exit(SparkExitCode.OOM)
case t: Throwable =>
logError(log"Executor update launching task ${MDC(TASK_NAME, taskDescription.name)} " +
log"failed status failed, reason: ${MDC(REASON, t.getCause)}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use getMessage?

@AngersZhuuuu
Copy link
Contributor Author

@Ngone51 GA passed

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@AngersZhuuuu
Copy link
Contributor Author

ping @jiangxb1987 @Ngone51 .

case oom: OutOfMemoryError =>
logError(log"Executor update launching task ${MDC(TASK_NAME, taskDescription.name)} " +
log"failed status failed, reason: ${MDC(REASON, oom.getMessage)}")
System.exit(SparkExitCode.OOM)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any other special exit code for certain exceptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only meet the case is create thread failed, then throw OutOfMemoryError

try {
logError(log"Executor launch task ${MDC(TASK_NAME, taskDescription.name)} failed," +
log" reason: ${MDC(REASON, t.getMessage)}")
context.statusUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so any place that invokes context.statusUpdate needs to add try-catch to crash the executor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 means that if the RPC requests also fail due to OutOfMemoryError (OOM), exiting the executor can trigger corresponding task rerun to avoid getting stuck.
In my PR's scenario, this might be necessary in a special case. However, if the task is already running, I think it's unnecessary.

TaskState.FAILED,
env.closureSerializer.newInstance().serialize(new ExceptionFailure(t, Seq.empty)))
} catch {
case oom: OutOfMemoryError =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always skeptical to special cases, why OOM error is special here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need @Ngone51 answer your question, there's no need to handle this separately for me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why OOM error is special here?

It's very likely to hit the OOM error again given that we're already in the OOM situation. Therefore, in the case of OOM, we should not expect the following operation could be always succesful. The special catch for the OutOfMemoryError is just for logging an exact error code when the error raises. This follow the behavior of SparkUncaughtExceptionHandler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, can we add a code comment to explain it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we expecting context.statusUpdate to throw OOM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we expecting context.statusUpdate to throw OOM?

When sending RPC, if client is not created, need to create new connect task, may need create a new thread, can throw OOM since can't create thread. but always this case won't happen since client with driver should always be already created

Copy link
Contributor

@mridulm mridulm Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @cloud-fan - special handling, especially of OOM, is not very robust.
This is one of the reasons why OnOutOfMemoryError is set for YARN - better to fail the executor than get it into unpredictable states.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to make a followup pr? cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @AngersZhuuuu , @Ngone51 , @cloud-fan , @holdenk .
Merged to master/4.1 for Apache Spark 4.1.0.

dongjoon-hyun pushed a commit that referenced this pull request Nov 21, 2025
…ask killed message

### What changes were proposed in this pull request?
<img width="1906" height="754" alt="image" src="https://github.com/user-attachments/assets/115cb0fc-0651-4fd7-8dea-6b5e571b926e" />

When not enable speculation ,executor launch task failed caused by OOM,
<img width="1766" height="858" alt="image" src="https://github.com/user-attachments/assets/5b88e607-99a3-4276-82da-3549e4ca5002" />

 DAGScheduler won't know task failed and won't scheduler again, causing application stuck

<img width="1728" height="377" alt="image-2025-06-23-12-13-10-770" src="https://github.com/user-attachments/assets/e425a4ae-fe04-4032-94a9-8833d96b63f3" />

### Why are the changes needed?
Some case will causing application stuck missing SLA

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT + MT

Test  code
<img width="3022" height="1596" alt="image" src="https://github.com/user-attachments/assets/c51dd814-948c-4211-96b6-dd6edfe1d201" />

```
import org.apache.spark.sql.functions.udf
import java.util.concurrent.TimeUnit// 定义一个会sleep的UDF

val sleepUdf = udf((value: String) => {
  TimeUnit.SECONDS.sleep(10)
  value
})

val df = spark.range(0, 2500).repartition(25).withColumn("value", (col("id") % 10).cast("string")).withColumn("value", sleepUdf(col("value")))df.createOrReplaceTempView("test_table")

val result = spark.sql("SELECT value, COUNT(*) as cnt FROM test_table GROUP BY value ORDER BY cnt DESC")

result.show()
```

<img width="1728" height="482" alt="image-2025-07-23-14-44-53-911" src="https://github.com/user-attachments/assets/10d66f51-b054-47b2-a3f3-f2215da80087" />

<img width="1728" height="203" alt="image-2025-07-23-14-45-12-522" src="https://github.com/user-attachments/assets/15d09973-cb15-40d2-9618-e7dc1ce88aa8" />

task failed and re-scheduled

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #52792 from AngersZhuuuu/SPARK-54087.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 7c279e5)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 25, 2025
…ask killed message

### What changes were proposed in this pull request?
<img width="1906" height="754" alt="image" src="https://github.com/user-attachments/assets/115cb0fc-0651-4fd7-8dea-6b5e571b926e" />

When not enable speculation ,executor launch task failed caused by OOM,
<img width="1766" height="858" alt="image" src="https://github.com/user-attachments/assets/5b88e607-99a3-4276-82da-3549e4ca5002" />

 DAGScheduler won't know task failed and won't scheduler again, causing application stuck

<img width="1728" height="377" alt="image-2025-06-23-12-13-10-770" src="https://github.com/user-attachments/assets/e425a4ae-fe04-4032-94a9-8833d96b63f3" />

### Why are the changes needed?
Some case will causing application stuck missing SLA

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT + MT

Test  code
<img width="3022" height="1596" alt="image" src="https://github.com/user-attachments/assets/c51dd814-948c-4211-96b6-dd6edfe1d201" />

```
import org.apache.spark.sql.functions.udf
import java.util.concurrent.TimeUnit// 定义一个会sleep的UDF

val sleepUdf = udf((value: String) => {
  TimeUnit.SECONDS.sleep(10)
  value
})

val df = spark.range(0, 2500).repartition(25).withColumn("value", (col("id") % 10).cast("string")).withColumn("value", sleepUdf(col("value")))df.createOrReplaceTempView("test_table")

val result = spark.sql("SELECT value, COUNT(*) as cnt FROM test_table GROUP BY value ORDER BY cnt DESC")

result.show()
```

<img width="1728" height="482" alt="image-2025-07-23-14-44-53-911" src="https://github.com/user-attachments/assets/10d66f51-b054-47b2-a3f3-f2215da80087" />

<img width="1728" height="203" alt="image-2025-07-23-14-45-12-522" src="https://github.com/user-attachments/assets/15d09973-cb15-40d2-9618-e7dc1ce88aa8" />

task failed and re-scheduled

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52792 from AngersZhuuuu/SPARK-54087.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…ask killed message

### What changes were proposed in this pull request?
<img width="1906" height="754" alt="image" src="https://github.com/user-attachments/assets/115cb0fc-0651-4fd7-8dea-6b5e571b926e" />

When not enable speculation ,executor launch task failed caused by OOM,
<img width="1766" height="858" alt="image" src="https://github.com/user-attachments/assets/5b88e607-99a3-4276-82da-3549e4ca5002" />

 DAGScheduler won't know task failed and won't scheduler again, causing application stuck

<img width="1728" height="377" alt="image-2025-06-23-12-13-10-770" src="https://github.com/user-attachments/assets/e425a4ae-fe04-4032-94a9-8833d96b63f3" />

### Why are the changes needed?
Some case will causing application stuck missing SLA

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT + MT

Test  code
<img width="3022" height="1596" alt="image" src="https://github.com/user-attachments/assets/c51dd814-948c-4211-96b6-dd6edfe1d201" />

```
import org.apache.spark.sql.functions.udf
import java.util.concurrent.TimeUnit// 定义一个会sleep的UDF

val sleepUdf = udf((value: String) => {
  TimeUnit.SECONDS.sleep(10)
  value
})

val df = spark.range(0, 2500).repartition(25).withColumn("value", (col("id") % 10).cast("string")).withColumn("value", sleepUdf(col("value")))df.createOrReplaceTempView("test_table")

val result = spark.sql("SELECT value, COUNT(*) as cnt FROM test_table GROUP BY value ORDER BY cnt DESC")

result.show()
```

<img width="1728" height="482" alt="image-2025-07-23-14-44-53-911" src="https://github.com/user-attachments/assets/10d66f51-b054-47b2-a3f3-f2215da80087" />

<img width="1728" height="203" alt="image-2025-07-23-14-45-12-522" src="https://github.com/user-attachments/assets/15d09973-cb15-40d2-9618-e7dc1ce88aa8" />

task failed and re-scheduled

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52792 from AngersZhuuuu/SPARK-54087.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Dec 8, 2025
…n task killed message should not special handling OOM

### What changes were proposed in this pull request?
Follow comment #52792 (comment)

### Why are the changes needed?
Follow comment

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No need

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #53379 from AngersZhuuuu/SPARK-54087-FOLLOWUP.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Dec 8, 2025
…n task killed message should not special handling OOM

### What changes were proposed in this pull request?
Follow comment #52792 (comment)

### Why are the changes needed?
Follow comment

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No need

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #53379 from AngersZhuuuu/SPARK-54087-FOLLOWUP.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 6a60616)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
xu20160924 pushed a commit to xu20160924/spark that referenced this pull request Dec 9, 2025
…n task killed message should not special handling OOM

### What changes were proposed in this pull request?
Follow comment apache#52792 (comment)

### Why are the changes needed?
Follow comment

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No need

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#53379 from AngersZhuuuu/SPARK-54087-FOLLOWUP.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants