Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ object SQLExecution extends Logging {
if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) {
sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString)
sc.addJobTag(executionIdJobTag(sparkSession, executionId))
logWarning("EXECUTION_ROOT_ID_KEY is null and addJobTag :" +
" " + executionIdJobTag(sparkSession, executionId) + " " +" " +
"the executionId is " + executionId + "" +
"and the rootExecutionId is " +
"" + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) +
" and the thread is " + Thread.currentThread().getId + "" +
" local property hash is " + sc.localProperties.hashCode())
} else {
logWarning("EXECUTION_ROOT_ID_KEY is not null and " +
"the executionId is " + executionId + "" +
"and the rootExecutionId is " +
"" + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + " " +
" and the thread is " + Thread.currentThread().getId + "" +
" local property hash is " + sc.localProperties.hashCode())
}
val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong
executionIdToQueryExecution.put(executionId, queryExecution)
Expand Down Expand Up @@ -211,6 +225,17 @@ object SQLExecution extends Logging {
}
}
}

// Cancel all spark jobs associated with this executionID, but only if it's the
// root execution.

// TODO: Consider enhancing this logic to cancel jobs earlier when nested
// query executions are completed.
if (executionId == rootExecutionId) {
sparkSession.sparkContext.cancelJobsWithTag(
executionIdJobTag(sparkSession, executionId))
}

val event = SparkListenerSQLExecutionEnd(
executionId,
System.currentTimeMillis(),
Expand Down Expand Up @@ -238,6 +263,17 @@ object SQLExecution extends Logging {
if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId.toString) {
sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null)
sc.removeJobTag(executionIdJobTag(sparkSession, executionId))
logWarning("if branch removing the jobTag " +
"" + executionIdJobTag(sparkSession, executionId) + " thread is " +
"" + Thread.currentThread().getId + " executionId is " + executionId +
" rootExecutionId is " + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + "" +
" local property hash is " + sc.localProperties.hashCode())
} else {
logWarning("not removing the jobTag " +
"" + executionIdJobTag(sparkSession, executionId) + " thread is " +
"" + Thread.currentThread().getId + " executionId is " + executionId +
" rootExecutionId is " + sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) + "" +
" local property hash is " + sc.localProperties.hashCode())
}
sc.setLocalProperty(SPARK_JOB_INTERRUPT_ON_CANCEL, originalInterruptOnCancel)
}
Expand Down