Skip to content

Conversation

@JkSelf
Copy link
Contributor

@JkSelf JkSelf commented Aug 15, 2025

What changes were proposed in this pull request?

We optimized Gluten's broadcast hash join by ensuring the hash table is built only once per executor. When the SparkListenerSQLExecutionEnd event is received, the corresponding hash table is released. However, while running Gluten unit test, we encountered an exception:
After the SparkListenerSQLExecutionEnd event was triggered and the hash table was released, the hash table was unexpectedly recreated, leading to a core dump. After investigating with @wangyum , we found that the unit test result was None, which triggered the AQEPropagateEmptyRelation rule to skip the join operation. Despite this, the task was not actually canceled, which caused the hash table to be rebuilt after it had already been released.

This PR invokes the cancelJobsWithTag API to send an RPC request to terminate tasks before emitting the SparkListenerSQLExecutionEnd event. This helps prevent canceled tasks from continuing execution in most cases. If tasks still fail to terminate properly, enabling spark.task.reaper.enabled will forcibly kill the corresponding executors to ensure cleanup.

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

@github-actions github-actions bot added the SQL label Aug 15, 2025
@JkSelf
Copy link
Contributor Author

JkSelf commented Aug 15, 2025

@wangyum @cloud-fan Could you help to review this PR? Thanks for your help.

@HyukjinKwon HyukjinKwon changed the title [SPARK-53289] Invoke cancelJobGroup before start passing the ExecutionEnd event to prevent execution of canceled tasks. [SPARK-53289][SQL] Invoke cancelJobGroup before start passing the ExecutionEnd event to prevent execution of canceled tasks. Aug 18, 2025
@HyukjinKwon
Copy link
Member

Let's fix up the PR title, and describe the background in the PR description.

@cloud-fan
Copy link
Contributor

This PR ensures that the task is properly canceled ...

How is this possible? Driver can only send an RPC to kill a task, which is not guaranteed to success. The only reliable way to is to shut down the corresponding executors forciably, which is the job of task reaper (SPARK-18761).

@JkSelf
Copy link
Contributor Author

JkSelf commented Aug 19, 2025

This PR ensures that the task is properly canceled ...

How is this possible? Driver can only send an RPC to kill a task, which is not guaranteed to success. The only reliable way to is to shut down the corresponding executors forciably, which is the job of task reaper (SPARK-18761).

@cloud-fan Thanks for your review.

I tried removing sparkSession.sparkContext.cancelJobsWithTag(executionIdJobTag(sparkSession, executionId)) and directly setting spark.task.reaper.enabled to true, but it doesn't seem to take effect. My understanding is that we still need to call cancelJobsWithTag to mark the task as killed, and then enable spark.task.reaper.enabled to forcibly kill the corresponding executor to ensure the task is eventually terminated. Is that correct?"

@JkSelf JkSelf changed the title [SPARK-53289][SQL] Invoke cancelJobGroup before start passing the ExecutionEnd event to prevent execution of canceled tasks. [SPARK-53289][SQL] Invoke cancelJobsWithTag before start passing the SparkListenerSQLExecutionEnd event to prevent execution of canceled tasks. Aug 19, 2025
@@ -195,6 +195,10 @@ object SQLExecution extends Logging {
}
}
}

sparkSession.sparkContext.cancelJobsWithTag(
executionIdJobTag(sparkSession, executionId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only do this if this is the root execution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW does it work for AQE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only do this if this is the root execution

It seems that EXECUTION_ROOT_ID_KEY is already set to executionId here, so there's no need to add this check, right?

BTW does it work for AQE?

Yes. I re-ran the test with AQE enabled, and it worked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nested query execution can also hit this code branch, or did I miss something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the way we clean up shuffles looks better. AQE will track all the query stages executed, so that we can clean up shuffle as soon as a query (whether nested or not) is completed. But here we use job group, which is only set for the root execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nested query execution can also hit this code branch, or did I miss something?

Yes. We need skip this cancel operator for nested query execution. Already add the check only cancel in root execution. Please help to review again. Thanks.

@JkSelf
Copy link
Contributor Author

JkSelf commented Aug 19, 2025

I couldn't reproduce the failed unit tests from CI — they pass in my local environment.

2025-08-18T06:33:50.1460342Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- concurrent query execution with fork-join pool (SPARK-13747) *** FAILED *** (1 second, 617 milliseconds)�[0m�[0m
2025-08-18T06:33:50.1463536Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  org.apache.spark.SparkException: [SPARK_JOB_CANCELLED] Job 13 cancelled part of cancelled job tags spark-session-bf217928-ec88-4b87-8190-490644c0873f-execution-root-id-23546 SQLSTATE: XXKDA�[0m�[0m
2025-08-18T06:33:50.1466074Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.errors.SparkCoreErrors$.sparkJobCancelled(SparkCoreErrors.scala:222)�[0m�[0m
2025-08-18T06:33:50.1470809Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:2865)�[0m�[0m
2025-08-18T06:33:50.1472812Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleJobTagCancelled$7(DAGScheduler.scala:1258)�[0m�[0m
2025-08-18T06:33:50.1474859Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)�[0m�[0m
2025-08-18T06:33:50.1495157Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.mutable.HashSet$Node.foreach(HashSet.scala:450)�[0m�[0m
2025-08-18T06:33:50.1499692Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.mutable.HashSet.foreach(HashSet.scala:376)�[0m�[0m
2025-08-18T06:33:50.1504374Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.scheduler.DAGScheduler.handleJobTagCancelled(DAGScheduler.scala:1258)�[0m�[0m
2025-08-18T06:33:50.1507546Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3166)�[0m�[0m
2025-08-18T06:33:50.1510448Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)�[0m�[0m
2025-08-18T06:33:50.1519005Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)�[0m�[0m
2025-08-18T06:33:50.1546475Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)�[0m�[0m
2025-08-18T06:33:50.2622276Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[32m- Finding QueryExecution for given executionId (106 milliseconds)�[0m�[0m

@cloud-fan Do you have any input? Thanks.

@@ -195,6 +195,12 @@ object SQLExecution extends Logging {
}
}
}

if (executionId == rootExecutionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's leave some code comments here to explain it.

It will be better if we can cancel uesless jobs earlier when nested query executions are completed, but we can do it in a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Yes. Added the comments. Please help to review again. Thanks.

@zhouyuan
Copy link

@JkSelf it looks like the CI build failed on this. could you please do a rebase? Thanks.

sparkSession.sparkContext.cancelJobsWithTag(
executionIdJobTag(sparkSession, executionId))
// scalastyle:off println
println("cancel the executionID " + executionIdJobTag(sparkSession, executionId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use logInfo instead of print

@@ -50,7 +50,7 @@ class SQLExecutionSuite extends SparkFunSuite with SQLConfHelper {
}
}

test("concurrent query execution with fork-join pool (SPARK-13747)") {
ignore("concurrent query execution with fork-join pool (SPARK-13747)") {
Copy link
Contributor Author

@JkSelf JkSelf Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
I looked deeper into the failed unit tests and found that they involve multiple threads sharing the same SparkContext.
If a job is cancelled in one thread, it can cause exceptions in other threads that are still running. How should we handle this concurrent scenario? Would it be acceptable to skip the cancel operation for this test case, or do you have other suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. They are individual queries with different execution ids, right? How can cancelling one break the other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thanks for your reply.

Here is the error stack.

org.apache.spark.SparkException: [SPARK_JOB_CANCELLED] Job 20 cancelled part of cancelled job tags spark-session-42c4d99b-c576-4172-b5ed-56c61bb79f14-execution-root-id-25164 SQLSTATE: XXKDA�[0m�[0m
at org.apache.spark.errors.SparkCoreErrors$.sparkJobCancelled(SparkCoreErrors.scala:222)�[0m�[0m
at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:2918)�[0m�[0m
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleJobTagCancelled$7(DAGScheduler.scala:1266)�[0m�[0m
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)�[0m�[0m
at scala.collection.mutable.HashSet$Node.foreach(HashSet.scala:450)�[0m�[0m
at scala.collection.mutable.HashSet.foreach(HashSet.scala:376)�[0m�[0m
at org.apache.spark.scheduler.DAGScheduler.handleJobTagCancelled(DAGScheduler.scala:1266)�[0m�[0m
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3219)�[0m�[0m
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194)�[0m�[0m
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3183)�[0m�[0m
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)�[0m�[0m
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1017)�[0m�[0m
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2489)�[0m�[0m
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2510)�[0m�[0m
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2529)�[0m�[0m
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2554)�[0m�[0m
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)�[0m�[0m
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)�[0m�[0m
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)�[0m�[0m
at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)�[0m�[0m
at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)�[0m�[0m
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:462)�[0m�[0m
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:396)�[0m�[0m
at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:328)�[0m�[0m
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:348)�[0m�[0m
at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:298)�[0m�[0m
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:346)�[0m�[0m
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)�[0m�[0m
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:342)�[0m�[0m
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)�[0m�[0m
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)�[0m�[0m
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)�[0m�[0m
at java.base/java.lang.Thread.run(Thread.java:840)�[0m�[0m

According to the following log, the job was actually canceled twice in parallel mode.

Line   2: 2025-10-27T05:32:55.7283226Z 05:32:55.724 WARN org.apache.spark.sql.execution.SQLExecution: The SQLExecution is org.apache.spark.sql.execution.SQLExecution$@163503c1 and the executionIdJobTag is spark-session-42c4d99b-c576-4172-b5ed-56c61bb79f14-execution-root-id-25164
Line  10: 2025-10-27T05:32:55.7668804Z 05:32:55.765 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 29.0 (TID 56) (localhost executor driver): TaskKilled (Stage cancelled: [SPARK_JOB_CANCELLED] Job 20 cancelled part of cancelled job tags spark-session-42c4d99b-c576-4172-b5ed-56c61bb79f14-execution-root-id-25164 SQLSTATE: XXKDA)
Line  73: 2025-10-27T05:32:57.1915923Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  org.apache.spark.SparkException: [SPARK_JOB_CANCELLED] Job 20 cancelled part of cancelled job tags spark-session-42c4d99b-c576-4172-b5ed-56c61bb79f14-execution-root-id-25164 SQLSTATE: XXKDA�[0m�[0m

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC @wangyum Could you also help to take a look? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failed unit tests is SQLExecutionSuite#concurrent query execution with fork-join pool (SPARK-13747)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SQLExecution is ... and the executionIdJobTag is ... only appear once, why do you think it's canceled twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first time is in
Line 10: 2025-10-27T05:32:55.7668804Z 05:32:55.765 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 29.0 (TID 56) (localhost executor driver): TaskKilled (Stage cancelled: [SPARK_JOB_CANCELLED] Job 20 cancelled part of cancelled job tags spark-session-42c4d99b-c576-4172-b5ed-56c61bb79f14-execution-root-id-25164 SQLSTATE: XXKDA)

And the second time is in
Line 73: 2025-10-27T05:32:57.1915923Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m org.apache.spark.SparkException: [SPARK_JOB_CANCELLED] Job 20 cancelled part of cancelled job tags spark-session-42c4d99b-c576-4172-b5ed-56c61bb79f14-execution-root-id-25164 SQLSTATE: XXKDA�[0m�[0m

@cloud-fan Correct me if wrong understanding. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first message indicates a task being killed due to job group cancellation. The second message indicates a job is cancelled due to job group cancellation. It does not indicate we cancel the same job group twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is only be cancelled once.

body match {
case Left(e) =>
sc.listenerBus.post(startEvent)
synchronized {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of SPARK_JOB_TAGS affects the job cancellation logic here. And the SQ method adds and removes job tags for each thread here, but when multiple threads share the same SparkContext, the value may be overwritten or become inconsistent. Therefore, we introduce a lock here to ensure thread-safe isolation of the tag values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @wangyum Please help to take a look. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the spark local property is a thread local, are you sure the value can be changed by other threads?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants