diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 1cab0f8d35af..94b18b17cd2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -100,6 +100,20 @@ object SQLExecution extends Logging { if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) { sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString) sc.addJobTag(executionIdJobTag(sparkSession, executionId)) + logWarning("EXECUTION_ROOT_ID_KEY is null and addJobTag :" + + " " + executionIdJobTag(sparkSession, executionId) + " " +" " + + "the executionId is " + executionId + "" + + "and the rootExecutionId is " + + "" + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + + " and the thread is " + Thread.currentThread().getId + "" + + " local property hash is " + sc.localProperties.hashCode()) + } else { + logWarning("EXECUTION_ROOT_ID_KEY is not null and " + + "the executionId is " + executionId + "" + + "and the rootExecutionId is " + + "" + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + " " + + " and the thread is " + Thread.currentThread().getId + "" + + " local property hash is " + sc.localProperties.hashCode()) } val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong executionIdToQueryExecution.put(executionId, queryExecution) @@ -211,6 +225,17 @@ object SQLExecution extends Logging { } } } + + // Cancel all spark jobs associated with this executionID, but only if it's the + // root execution. + + // TODO: Consider enhancing this logic to cancel jobs earlier when nested + // query executions are completed. + if (executionId == rootExecutionId) { + sparkSession.sparkContext.cancelJobsWithTag( + executionIdJobTag(sparkSession, executionId)) + } + val event = SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis(), @@ -238,6 +263,17 @@ object SQLExecution extends Logging { if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId.toString) { sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null) sc.removeJobTag(executionIdJobTag(sparkSession, executionId)) + logWarning("if branch removing the jobTag " + + "" + executionIdJobTag(sparkSession, executionId) + " thread is " + + "" + Thread.currentThread().getId + " executionId is " + executionId + + " rootExecutionId is " + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + "" + + " local property hash is " + sc.localProperties.hashCode()) + } else { + logWarning("not removing the jobTag " + + "" + executionIdJobTag(sparkSession, executionId) + " thread is " + + "" + Thread.currentThread().getId + " executionId is " + executionId + + " rootExecutionId is " + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + "" + + " local property hash is " + sc.localProperties.hashCode()) } sc.setLocalProperty(SPARK_JOB_INTERRUPT_ON_CANCEL, originalInterruptOnCancel) }