Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
82d6caa
dump
juliuszsompolski Jun 2, 2023
34ef9f6
uses cancelWithTag in BroadcastExchangeExec
juliuszsompolski Jun 2, 2023
7f0bb59
nicer tag, since I remove description
juliuszsompolski Jun 2, 2023
5bebbc2
also use job tags in Spark Connect
juliuszsompolski Jun 2, 2023
370631b
fix typo
juliuszsompolski Jun 5, 2023
651d594
another typo
juliuszsompolski Jun 5, 2023
3733303
retrigger CI
juliuszsompolski Jun 5, 2023
1217de1
add to AppStatusTracker
juliuszsompolski Jun 6, 2023
c375a97
add Mima exclude
juliuszsompolski Jun 7, 2023
aa2f950
Merge branch 'master' into SPARK-43952
juliuszsompolski Jun 8, 2023
35336b0
fix compile
juliuszsompolski Jun 9, 2023
89f3047
fix history server canons, fix bug with empty tag
juliuszsompolski Jun 12, 2023
4b33ef5
test invalid tags
juliuszsompolski Jun 12, 2023
8fe0058
guard against null tag
juliuszsompolski Jun 13, 2023
40255a4
execution context
juliuszsompolski Jun 13, 2023
cecbfda
fix BroadcastExchangeSuite - it now cancels by tag
juliuszsompolski Jun 13, 2023
a87746a
did not pass the correct executionContext...
juliuszsompolski Jun 14, 2023
3bfe83c
retrigger CI
juliuszsompolski Jun 14, 2023
bd79500
retrigger CI
juliuszsompolski Jun 15, 2023
d964895
retrigger CI
juliuszsompolski Jun 15, 2023
050bf85
retrigger CI
juliuszsompolski Jun 15, 2023
c8d9b46
add also job group for Spark Connect queries
juliuszsompolski Jun 19, 2023
607f11f
fix compile
juliuszsompolski Jun 20, 2023
7d15f3c
Merge branch 'master' into SPARK-43952
juliuszsompolski Jun 20, 2023
12921d1
clear job group
juliuszsompolski Jun 20, 2023
cd8d058
nits
juliuszsompolski Jun 20, 2023
98f6034
once again try to retrigger CI
juliuszsompolski Jun 20, 2023
d52ff2e
retrigger CI
juliuszsompolski Jun 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ case class ExecutePlanHolder(
sessionHolder: SessionHolder,
request: proto.ExecutePlanRequest) {

val jobGroupId =
s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
val jobTag =
"SparkConnect_" +
s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"

def interrupt(): Unit = {
// TODO/WIP: This only interrupts active Spark jobs that are actively running.
// This would then throw the error from ExecutePlan and terminate it.
// But if the query is not running a Spark job, but executing code on Spark driver, this
// would be a noop and the execution will keep running.
sessionHolder.session.sparkContext.cancelJobGroup(jobGroupId)
sessionHolder.session.sparkContext.cancelJobsWithTag(jobTag)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
}

val executeHolder = sessionHolder.createExecutePlanHolder(v)
session.sparkContext.addJobTag(executeHolder.jobTag)
session.sparkContext.setInterruptOnCancel(true)
// Also set the tag as the JobGroup for all the jobs in the query.
// TODO: In the long term, it should be encouraged to use job tag only.
session.sparkContext.setJobGroup(
executeHolder.jobGroupId,
executeHolder.jobTag,
s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}",
interruptOnCancel = true)

Expand All @@ -89,6 +93,8 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
throw new UnsupportedOperationException(s"${v.getPlan.getOpTypeCase} not supported.")
}
} finally {
session.sparkContext.removeJobTag(executeHolder.jobTag)
session.sparkContext.clearJobGroup()
sessionHolder.removeExecutePlanHolder(executeHolder.operationId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ message JobData {
optional int64 completion_time = 5;
repeated int64 stage_ids = 6;
optional string job_group = 7;
repeated string job_tags = 21;
JobExecutionStatus status = 8;
int32 num_tasks = 9;
int32 num_active_tasks = 10;
Expand Down
77 changes: 77 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,55 @@ class SparkContext(config: SparkConf) extends Logging {
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}

/**
* Set the behavior of job cancellation from jobs started in this thread.
*
* @param interruptOnCancel If true, then job cancellation will result in `Thread.interrupt()`
* being called on the job's executor threads. This is useful to help ensure that the tasks
* are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone should check if this is still a thing :)

Copy link
Contributor Author

@juliuszsompolski juliuszsompolski Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HDFS-1208 bug is still open... but multiple places in core of Spark has by now elected to just pass true here, so it likely doesn't make sense for the user to set it to false, as these places would generate interrupts anyway... But removing it completely would be orthogonal to this PR.

* may respond to Thread.interrupt() by marking nodes as dead.
*/
def setInterruptOnCancel(interruptOnCancel: Boolean): Unit = {
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
}

/**
* Add a tag to be assigned to all the jobs started by this thread.
*
* @param tag The tag to be added. Cannot contain ',' (comma) character.
*/
def addJobTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
val existingTags = getJobTags()
val newTags = (existingTags + tag).mkString(SparkContext.SPARK_JOB_TAGS_SEP)
setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
}

/**
* Remove a tag previously added to be assigned to all the jobs started by this thread.
* Noop if such a tag was not added earlier.
*
* @param tag The tag to be removed. Cannot contain ',' (comma) character.
*/
def removeJobTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
val existingTags = getJobTags()
val newTags = (existingTags - tag).mkString(SparkContext.SPARK_JOB_TAGS_SEP)
setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
}

/** Get the tags that are currently set to be assigned to all the jobs started by this thread. */
def getJobTags(): Set[String] = {
Option(getLocalProperty(SparkContext.SPARK_JOB_TAGS))
.map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
.getOrElse(Set())
}

/** Clear the current thread's job tags. */
def clearJobTags(): Unit = {
setLocalProperty(SparkContext.SPARK_JOB_TAGS, null)
}

/**
* Execute a block of code in a scope such that all new RDDs created in this body will
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
Expand Down Expand Up @@ -2471,6 +2520,17 @@ class SparkContext(config: SparkConf) extends Logging {
dagScheduler.cancelJobGroup(groupId)
}

/**
* Cancel active jobs that have the specified tag. See `org.apache.spark.SparkContext.addJobTag`.
*
* @param tag The tag to be added. Cannot contain ',' (comma) character.
*/
def cancelJobsWithTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
assertNotStopped()
dagScheduler.cancelJobsWithTag(tag)
}

/** Cancel all jobs that have been scheduled or are running. */
def cancelAllJobs(): Unit = {
assertNotStopped()
Expand Down Expand Up @@ -2840,6 +2900,7 @@ object SparkContext extends Logging {
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
private[spark] val SPARK_JOB_TAGS = "spark.job.tags"
private[spark] val SPARK_SCHEDULER_POOL = "spark.scheduler.pool"
private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"
Expand All @@ -2851,6 +2912,22 @@ object SparkContext extends Logging {
*/
private[spark] val DRIVER_IDENTIFIER = "driver"

/** Separator of tags in SPARK_JOB_TAGS property */
private[spark] val SPARK_JOB_TAGS_SEP = ","

private[spark] def throwIfInvalidTag(tag: String) = {
if (tag == null) {
throw new IllegalArgumentException("Spark job tag cannot be null.")
}
if (tag.contains(SPARK_JOB_TAGS_SEP)) {
throw new IllegalArgumentException(
s"Spark job tag cannot contain '$SPARK_JOB_TAGS_SEP'.")
}
if (tag.isEmpty) {
throw new IllegalArgumentException(
"Spark job tag cannot be an empty string.")
}
}

private implicit def arrayToArrayWritable[T <: Writable : ClassTag](arr: Iterable[T])
: ArrayWritable = {
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore
store.jobsList(null).filter(_.jobGroup == expected).map(_.jobId).toArray
}

/**
* Return a list of all known jobs with a particular tag.
*
* The returned list may contain running, failed, and completed jobs, and may vary across
* invocations of this method. This method does not guarantee the order of the elements in
* its result.
*/
def getJobIdsForTag(jobTag: String): Array[Int] = {
store.jobsList(null).filter(_.jobTags.contains(jobTag)).map(_.jobId).toArray
}

/**
* Returns an array containing the ids of all active stages.
*
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,15 @@ private[spark] class DAGScheduler(
eventProcessLoop.post(JobGroupCancelled(groupId))
}

/**
* Cancel all jobs with a given tag.
*/
def cancelJobsWithTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
logInfo(s"Asked to cancel jobs with tag $tag")
eventProcessLoop.post(JobTagCancelled(tag))
}

/**
* Cancel all jobs that are running or waiting in the queue.
*/
Expand Down Expand Up @@ -1182,6 +1191,19 @@ private[spark] class DAGScheduler(
Option("part of cancelled job group %s".format(groupId))))
}

private[scheduler] def handleJobTagCancelled(tag: String): Unit = {
// Cancel all jobs belonging that have this tag.
// First finds all active jobs with this group id, and then kill stages for them.
val jobIds = activeJobs.filter { activeJob =>
Option(activeJob.properties).exists { properties =>
Option(properties.getProperty(SparkContext.SPARK_JOB_TAGS)).getOrElse("")
.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet.contains(tag)
}
}.map(_.jobId)
jobIds.foreach(handleJobCancellation(_,
Option(s"part of cancelled job tag $tag")))
}

private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = {
listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, taskInfo))
}
Expand Down Expand Up @@ -2972,6 +2994,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)

case JobTagCancelled(groupId) =>
dagScheduler.handleJobTagCancelled(groupId)

case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ private[scheduler] case class JobCancelled(

private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent

private[scheduler] case class JobTagCancelled(tagName: String) extends DAGSchedulerEvent

private[scheduler] case object AllJobsCancelled extends DAGSchedulerEvent

private[scheduler]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,12 @@ private[spark] class AppStatusListener(
.flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) }
val jobGroup = Option(event.properties)
.flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
val jobTags = Option(event.properties)
.flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_TAGS)) }
.map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
.getOrElse(Set())
.toSeq
.sorted
val sqlExecutionId = Option(event.properties)
.flatMap(p => Option(p.getProperty(SQL_EXECUTION_ID_KEY)).map(_.toLong))

Expand All @@ -448,6 +454,7 @@ private[spark] class AppStatusListener(
if (event.time > 0) Some(new Date(event.time)) else None,
event.stageIds,
jobGroup,
jobTags,
numTasks,
sqlExecutionId)
liveJobs.put(event.jobId, job)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private class LiveJob(
val submissionTime: Option[Date],
val stageIds: Seq[Int],
jobGroup: Option[String],
jobTags: Seq[String],
numTasks: Int,
sqlExecutionId: Option[Long]) extends LiveEntity {

Expand Down Expand Up @@ -98,6 +99,7 @@ private class LiveJob(
completionTime,
stageIds,
jobGroup,
jobTags,
status,
numTasks,
activeTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ class JobData private[spark](
val completionTime: Option[Date],
val stageIds: collection.Seq[Int],
val jobGroup: Option[String],
val jobTags: collection.Seq[String],
val status: JobExecutionStatus,
val numTasks: Int,
val numActiveTasks: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ private[protobuf] class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWr
}
jobData.stageIds.foreach(id => jobDataBuilder.addStageIds(id.toLong))
jobData.jobGroup.foreach(jobDataBuilder.setJobGroup)
jobData.jobTags.foreach(jobDataBuilder.addJobTags)
jobData.killedTasksSummary.foreach { entry =>
jobDataBuilder.putKillTasksSummary(entry._1, entry._2)
}
Expand All @@ -93,6 +94,7 @@ private[protobuf] class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWr
completionTime = completionTime,
stageIds = info.getStageIdsList.asScala.map(_.toInt),
jobGroup = jobGroup,
jobTags = info.getJobTagsList.asScala,
status = status,
numTasks = info.getNumTasks,
numActiveTasks = info.getNumActiveTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"jobId" : 0,
"name" : "foreach at <console>:15",
"stageIds" : [ 0 ],
"jobTags" : [ ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"jobId" : 0,
"name" : "foreach at <console>:15",
"stageIds" : [ 0 ],
"jobTags" : [ ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"jobId" : 2,
"name" : "count at <console>:17",
"stageIds" : [ 3 ],
"jobTags" : [ ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand All @@ -19,6 +20,7 @@
"jobId" : 1,
"name" : "count at <console>:20",
"stageIds" : [ 1, 2 ],
"jobTags" : [ ],
"status" : "FAILED",
"numTasks" : 16,
"numActiveTasks" : 0,
Expand All @@ -36,6 +38,7 @@
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"jobTags" : [ ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"jobTags" : [ ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"jobId" : 2,
"name" : "count at <console>:17",
"stageIds" : [ 3 ],
"jobTags" : [ ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand All @@ -19,6 +20,7 @@
"jobId" : 1,
"name" : "count at <console>:20",
"stageIds" : [ 1, 2 ],
"jobTags" : [ ],
"status" : "FAILED",
"numTasks" : 16,
"numActiveTasks" : 0,
Expand All @@ -36,6 +38,7 @@
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"jobTags" : [ ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"jobId" : 2,
"name" : "count at <console>:17",
"stageIds" : [ 3 ],
"jobTags" : [ ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand All @@ -19,6 +20,7 @@
"jobId" : 0,
"name" : "count at <console>:15",
"stageIds" : [ 0 ],
"jobTags" : [ ],
"status" : "SUCCEEDED",
"numTasks" : 8,
"numActiveTasks" : 0,
Expand Down
Loading