-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20654][core] Add config to limit disk usage of the history server. #20011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ver. This change adds a new configuration option and support code that limits how much disk space the SHS will use. The default value is pretty generous so that applications will, hopefully, only rarely need to be replayed because of their disk stored being evicted. This works by keeping track of how much data each application is using. Also, because it's not possible to know, before replaying, how much space will be needed, it's possible that usage will exceed the configured limit temporarily. The code uses the concept of a "lease" to try to limit how much the SHS will exceed the limit in those cases. Active UIs are also tracked, so they're never deleted. This works in tandem with the existing option of how many active UIs are loaded; because unused UIs will be unloaded, their disk stores will also become candidates for deletion. If the data is not deleted, though, re-loading the UI is pretty quick.
|
Some previous comments for this PR can be found at: vanzin#39 |
|
Test build #85072 has finished for PR 20011 at commit
|
| * event log. By default it's 30% of the event log size. | ||
| */ | ||
| private def approximateSize(eventLogSize: Long): Long = { | ||
| math.ceil(eventLogSizeRatio * eventLogSize).toLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll probably need to look at a better heuristic here. This is probably not a good approximation if the logs are compressed, and #20013 also affects this.
Also remove the config to tweak the heuristic since it doesn't make a lot of sense anymore.
|
Test build #85129 has finished for PR 20011 at commit
|
|
Test build #85284 has finished for PR 20011 at commit
|
|
|
||
| // Cap the value at 10% of the max size; this assumes that element cleanup will put a cap on | ||
| // how large the disk store can get, which may not always be the case. | ||
| math.min(expectedSize, maxUsage / 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems really arbitrary, and could have a ton of error. Do we really need it at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, all of these heuristics can have a lot of error. Increasing this increases the odds that an existing store will be deleted if another log starts to be parsed in parallel.
Don't really feel strongly either way, so away it goes.
| * @param listing The listing store, used to persist usage data. | ||
| * @param clock Clock instance to use. | ||
| */ | ||
| private class DiskStoreManager( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is another very different DiskStore, can we name this something else so its easier to tell they're unrelated? KVStoreDiskManager?
| private def mockManager(): DiskStoreManager = { | ||
| val conf = new SparkConf().set(MAX_LOCAL_DISK_USAGE, MAX_USAGE) | ||
| val manager = spy(new DiskStoreManager(conf, testDir, store, new ManualClock())) | ||
| doReturn(0L).when(manager).sizeOf(any(classOf[File])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests pass without this line ... and its kinda strange, I tried removing it because I was wondering why you would want this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added in case the method was called with an unexpected file. Will remove.
| lease3.rollback() | ||
| lease4.rollback() | ||
| assert(hasFreeSpace(manager, 1)) | ||
| assert(!hasFreeSpace(manager, 2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think hasFreeSpace actually helps at all, here you could replace both of these with assert(manager.free() === 1) and everywhere else do assert(manager.free() === 0) etc.
| val dst2 = lease2.commit("app2", None) | ||
| assert(!hasFreeSpace(manager, 1)) | ||
|
|
||
| // Rollback 3 and 4, now there should be 1 left. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you switch the naming scheme to be leaseA, leaseB, etc.? its a bit hard to tell when you're talking about sizes and when you're referring to specific leases.
| val lease5 = manager.lease(1) | ||
| doReturn(3L).when(manager).sizeOf(meq(lease5.path)) | ||
| lease5.commit("app2", None) | ||
| assert(dst2.exists()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would help here to have a comment explaining the case this really represents -- replaying an app because the event log has been updated, so replacing the old kvstore. Also just to make that clear you could add
val dst5 = lease5.commit("app2", None)
assert(dst5 === dst2)
(at first I thought "app2" was a copy/paste mistake here)
| // Try a big lease that should cause the committed app to be evicted. | ||
| val lease6 = manager.lease(6) | ||
| assert(!dst2.exists()) | ||
| assert(!hasFreeSpace(manager, 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a case with getting another lease, even though you're already beyond the size limit (don't think that is covered?)
| /** Visible for testing. Return the size of a directory. */ | ||
| private[history] def sizeOf(path: File): Long = FileUtils.sizeOf(path) | ||
|
|
||
| private[history] class Lease(val path: File, private val leased: Long) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you rename path to tmpPath
|
|
||
| active.synchronized { | ||
| require(!active.contains(appId -> attemptId), | ||
| s"Cannot commit lease for active application $appId / $attemptId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the scenario you're worried about here? you could have two threads both trying to call commit, and they might both get past this check, and then clobber each other later on, so this is more limited protection. The idea is just to make sure that you never do a commit while the UI has a store open, as a check of the whole UI cache eviction process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more of a sanity check that the situation you describe should not happen. The SHS code should be ensuring that there's only a single thread parsing logs for an application, and that while that happens, the app's UI is not loaded. This just asserts that's the case.
Also fix a couple of bugs found in the process.
| logInfo(s"Deleting store for ${info.appId}/${info.attemptId}.") | ||
| deleteStore(new File(info.path)) | ||
| updateUsage(-info.size, committed = true) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the change to track committed space. I'm wondering if we should log more here, even at Info level, summarizing the available space (I can see users complaining that they configured it to 10GB but its taking 12GB and wanting an explanation -- would be nice if the logs already had all the relevant info). Maybe something like
val totalNeeded = size
var activeSize = 0
...
if (!isActive) {
evicted += info
needed -= info.size
} else {
activeSize += info.size
}
...
logInfo(s"Deleted ${evicted.size} apps to free ${Utils.bytesToString(needed)})
if (needed > 0) {
val current = currentUsage.get()
val leased = Utils.bytesToString(current - committedUsage.get())
logInfo(s"Did not free requested ${Utils.bytesToString(totalNeeded)}. Current usage is ${Utils.bytesToString(current). $leased (estimated) used by apps actively updating their kvstores; ${Utils.bytesToString(active)} used by active applications.")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'll add some more logs.
|
Test build #85320 has finished for PR 20011 at commit
|
|
Test build #85326 has finished for PR 20011 at commit
|
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
merged to master |
This change adds a new configuration option and support code that limits
how much disk space the SHS will use. The default value is pretty generous
so that applications will, hopefully, only rarely need to be replayed
because of their disk stored being evicted.
This works by keeping track of how much data each application is using.
Also, because it's not possible to know, before replaying, how much space
will be needed, it's possible that usage will exceed the configured limit
temporarily. The code uses the concept of a "lease" to try to limit how
much the SHS will exceed the limit in those cases.
Active UIs are also tracked, so they're never deleted. This works in
tandem with the existing option of how many active UIs are loaded; because
unused UIs will be unloaded, their disk stores will also become candidates
for deletion. If the data is not deleted, though, re-loading the UI is
pretty quick.