-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20643][core] Add listener implementation to collect app state. #19383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The initial listener is based on the existing JobProgressListener (and others), and tries to mimin their behavior as much as possible. The change also includes some minor code movement so that some types and methods from the initial history server code code can be reused. The code introduces a few mutable versions of public API types, used internally, to make it easier to update information without ugly copy methods, and also to make certain updates cheaper. Note the code here is not 100% correct. This is meant as a building ground for the UI integration in the next milestones. As different parts of the UI are ported, fixes will be made to the different parts of this code to account for the needed behavior. I also added annotations to API types so that Jackson is able to correctly deserialize options, sequences and maps that store primitive types.
|
For context:
|
|
Test build #82284 has finished for PR 19383 at commit
|
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not done with the review yet but will checkpoint my concerns here. Overall looks fine.
|
|
||
| Options options = new Options(); | ||
| options.createIfMissing(!path.exists()); | ||
| options.createIfMissing(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, did you encounter a problem w/ the previous version? though I guess makes sense to tell leveldb to always create if missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests generally use a temp dir for the db, using Utils.createTempDir or something like that which creates the directory for you. That would cause this to fail unless you deleted the directory first (which LevelDBSuite does), which I found a little bit annoying after a while.
|
|
||
| import java.util.Date | ||
|
|
||
| import scala.collection.JavaConverters._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused
| import org.apache.spark.status.api.v1 | ||
| import org.apache.spark.storage._ | ||
| import org.apache.spark.ui.SparkUI | ||
| import org.apache.spark.ui.scope._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.scheduler._ | ||
| import org.apache.spark.scheduler.ReplayListenerBus._ | ||
| import org.apache.spark.status.KVUtils._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only need to import KVIndexParam?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I start using more stuff from that object in later changes, so might as well avoid these changes later.
In fact I'm not sure why I'm not just using open() in this class now, since it was added to KVUtils...
| } | ||
|
|
||
| override def onJobStart(event: SparkListenerJobStart): Unit = { | ||
| // Compute (a potential underestimate of) the number of tasks that will be run by this job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize you are copying this comment, but it seems wrong. Its a potential under-estimate of the job-progress. Its a potential over-estimate of the number of tasks that will be run. I looked at the referenced PR, and I think it agrees with that understanding -- the pr description says "If a job contains stages that aren't run, then its overall job progress bar may be an underestimate of the total job progress"
| case JobFailed(_) => JobExecutionStatus.FAILED | ||
| } | ||
|
|
||
| job.completionTime = if (event.time != -1) Some(new Date(event.time)) else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here on the time filters
| val skipped = !event.stageInfo.submissionTime.isDefined | ||
| stage.status = event.stageInfo.failureReason match { | ||
| case Some(_) => v1.StageStatus.FAILED | ||
| case None => if (skipped) v1.StageStatus.SKIPPED else v1.StageStatus.COMPLETE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its slightly confusing that skipped actually doesn't indicate skipped. maybe rename to hasSubmissionTime (with corresponding change in logic)? or even include directly in match, something like
stage.status = event.stageInfo.failureReason match {
case Some(_) => v1.StageStatus.FAILED
case None if event.stageInfo.submissionTime.isDefined => v1.StageStatus.COMPLETE
case _ => v1.StageStatus.SKIPPED
}| override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { | ||
| event.blockUpdatedInfo.blockId match { | ||
| case block: RDDBlockId => updateRDDBlock(event, block) | ||
| case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually don't think shuffle blocks ever get reported via SparkListenerBlockUpdated (might be wrong about this).
There will be updates for Broadcast blocks, though I think those are also ignored in the UI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so in that pr, you handle StreamBlocks, but my point is that the comment about shuffle storage is wrong.
| new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"), | ||
| new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2")) | ||
|
|
||
| val stageProps = new Properties() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these properties should also get passed to onJobStart, and there should be an assert on job.info.jobGroup
(probably it should also have the scheduler pool, but thats missing in the current code, so can keep it separate ...)
|
|
||
| check[StageDataWrapper](key(stages.head)) { stage => | ||
| assert(stage.info.status === v1.StageStatus.ACTIVE) | ||
| assert(stage.info.submissionTime === Some(new Date(stages.head.submissionTime.get))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert on stage.info.schedulingPool.
(should probably also have jobGroup, but again, current code doesn't have it, so can be separate.)
|
Test build #82333 has finished for PR 19383 at commit
|
|
Test build #82390 has finished for PR 19383 at commit
|
| private var coresPerTask: Int = 1 | ||
|
|
||
| // Keep track of live entities, so that task metrics can be efficiently updated (without | ||
| // causing too many writes to the underlying store, and other expensive operations). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we update metrics to disk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the SHS starts writing UI data to disk (starting with vanzin#43). But even writing to an in-memory store can have non-trivial overhead (e.g. resizing a large hash table).
| store.write(doUpdate()) | ||
| } | ||
|
|
||
| protected def doUpdate(): Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add document? Seems all implementations do copy instead of update.
| * A Spark listener that writes application information to a data store. The types written to the | ||
| * store are defined in the `storeTypes.scala` file and are based on the public REST API. | ||
| */ | ||
| private class AppStatusListener(kvstore: KVStore) extends SparkListener with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do we use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Test build #82890 has finished for PR 19383 at commit
|
| val dbPath = new File(path, "listing.ldb") | ||
| val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION, logDir.toString()) | ||
|
|
||
| def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused now
|
|
||
| val schedulingPool = Option(event.properties).flatMap { p => | ||
| Option(p.getProperty("spark.scheduler.pool")) | ||
| }.getOrElse(SparkUI.DEFAULT_POOL_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you actually need to set the scheduling pool in onStageSubmitted. If it is shared by multiple jobs, with different pools, then this will just use the scheduling pool of the job that was submitted last, rather than the one that actually is used when the stage is submitted. that has a handle on the properties of the submitting job so shoudl be easy
|
|
||
| override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = { | ||
| liveTasks.get(event.taskInfo.taskId).foreach { task => | ||
| update(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the point of doing this? won't you already have this update written?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I thought I needed to handle this event later on, but looks like I don't, so it can go away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this records the gettingResultTime value in the underlying store (that value is part of the mutable TaskInfo that the LiveTask instance references).
| // completion event is for. Let's just drop it here. This means we might have some speculation | ||
| // tasks on the web ui that are never marked as complete. | ||
| if (event.taskInfo == null || event.stageAttemptId == -1) { | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shoudl you still do liveTasks.remove(event.taskInfo.taskId) even if event.stageAttemptId == -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little skeptical that this can really happen at all, but this is the exact behavior of JobProgressListener.
But I guess we can let the rest of the code run, at worst it won't do anything bad because it can't find a matching stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok that makes sense -- I also couldn't see how this would happen but figured maybe that case was there for a reason.
| stage.jobs.foreach { job => | ||
| stage.status match { | ||
| case v1.StageStatus.COMPLETE => | ||
| job.completedStages = job.completedStages + event.stageInfo.stageId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: job.completedStages += event.stageInfo.stageId
| override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { | ||
| event.blockUpdatedInfo.blockId match { | ||
| case block: RDDBlockId => updateRDDBlock(event, block) | ||
| case _ => // TODO: API only covers RDD storage. UI might need shuffle storage too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so in that pr, you handle StreamBlocks, but my point is that the comment about shuffle storage is wrong.
| maybeExec.foreach { exec => | ||
| if (exec.rddBlocks + rddBlocksDelta > 0) { | ||
| val dist = rdd.distribution(exec) | ||
| dist.memoryRemaining = newValue(dist.memoryRemaining, -memoryDelta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory remaining (and the on- and off-heap breakdown) looks wrong to me.
LiveRDDDistribution is local to one LiveRDD. But you will have partitions from multiple RDDs stored on one executor. So the memory remaining needs to take into account all of the rdds on that executor. I don't think you can keep this value precomputed -- an update to a totally different RDD would change it. (unless on every block update, you update the value for all rdds stored on that executor.)
The current UI handles this by storing it in the StorageStatusListener by executor, and populating this info in every request for the RDD info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bunch of fixes to this part in vanzin#46. As mentioned in the description, this is not expected to be 100% correct, and fixes will be made as individual pages are changed to use this data.
| assert(execs.size > 0) | ||
| execs.foreach { exec => | ||
| assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit -- this section on execs doesn't belong inside check[StageDataWrapper](key(stages.head))
| assert(exec.info.memoryUsed === 3L) | ||
| assert(exec.info.diskUsed === 3L) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test for memoryRemaining with multiple Rdds on the same executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add those in vanzin#46 when this part of the listener is fixed.
|
Test build #83024 has finished for PR 19383 at commit
|
|
lgtm |
|
btw, for any other potential reviews, I'm going already reviewing the rest of marcelo's commits in this project (the prs against is own repo here: https://github.com/vanzin/spark/pulls). In general I'm just finding small things and have enough paged in that I expect to be able to review the rest of these changes quickly. |
|
merged to master. thanks @vanzin |
| } | ||
|
|
||
| private def update(entity: LiveEntity): Unit = { | ||
| entity.write(kvstore) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems the update is called very frequently, almost for each event. Does it mean we flush data to disk very frequently too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be tweaked in following PRs.
| val memoryUsed: Long, | ||
| val memoryRemaining: Long, | ||
| val diskUsed: Long, | ||
| @JsonDeserialize(contentAs = classOf[JLong]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See jackson documentation. It tells jackson to deserialize the contents of a container as a specific type.
| // Exclude rules for 2.3.x | ||
| lazy val v23excludes = v22excludes ++ Seq( | ||
| // SPARK-18085: Better History Server scalability for many / large applications | ||
| ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's going on here? I don't see this PR touch the code of ExecutorSummary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed an import and that changed the type of one of the fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see: -import scala.collection.Map. But is this really necessary to break the compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it doesn't break compatibility because this class is not to be used outside Spark (it's only public so mima can detect breakages in the JSON format, and this doesn't change that).
Leaving the old import makes using this class really awkward from other places that don't have that import.
The initial listener code is based on the existing JobProgressListener (and others),
and tries to mimic their behavior as much as possible. The change also includes
some minor code movement so that some types and methods from the initial history
server code code can be reused.
The code introduces a few mutable versions of public API types, used internally,
to make it easier to update information without ugly copy methods, and also to
make certain updates cheaper.
Note the code here is not 100% correct. This is meant as a building ground for
the UI integration in the next milestones. As different parts of the UI are
ported, fixes will be made to the different parts of this code to account
for the needed behavior.
I also added annotations to API types so that Jackson is able to correctly
deserialize options, sequences and maps that store primitive types.