-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22805][CORE] Use StorageLevel aliases in event logs #19992
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The format of event logs uses redundant representation for storage
levels, for instance StorageLevel.DISK_ONLY is represented as
{"Use Disk":true,"Use Memory":false,"Deserialized":false,"Replication":1}
which is 64 bytes more. This commit changes the event log representation
of the StorageLevel to predefined constants: NONE, DISK_ONLY, etc. The
change is fully backward compatibly, because
* StorageLevel constructor is private, meaning that existing event
logs can only contain these predefined levels;
* The JsonProtocol supports reading both the old format and the new one.
| ("Deserialized" -> storageLevel.deserialized) ~ | ||
| ("Replication" -> storageLevel.replication) | ||
| def storageLevelToJson(storageLevel: StorageLevel): JValue = storageLevel match { | ||
| case StorageLevel.NONE => "NONE" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can change StorageLevel.toString to do this or add another method e.g. StorageLevel.name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, that would be more robust.
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewor14 I think you touched this last, and a long time ago. Just pinging you in case you have an opinion.
| ("Deserialized" -> storageLevel.deserialized) ~ | ||
| ("Replication" -> storageLevel.replication) | ||
| def storageLevelToJson(storageLevel: StorageLevel): JValue = storageLevel match { | ||
| case StorageLevel.NONE => "NONE" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, that would be more robust.
| val replication = (json \ "Replication").extract[Int] | ||
| StorageLevel(useDisk, useMemory, deserialized, replication) | ||
| def storageLevelFromJson(json: JValue): StorageLevel = json match { | ||
| case _: JString => StorageLevel.fromString(json.extract[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably worth some comments about why there are two read paths
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| | "Port": 300 | ||
| | }, | ||
| | "Block ID": "rdd_0_0", | ||
| | "Storage Level": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll also want to retain some tests of the old format to ensure it's still read. Maybe there are outside of the diff and I'm not seeing them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a to/from JSON test for a custom StorageLevel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I mean, doesn't this no longer test whether it can read the verbose, old style format? like this test does here and the ones above, that are being removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These strings are used with testEvent which assert that the serialized representation matches the one given in the string literal. See https://github.com/criteo-forks/spark/blob/7869e63a569a6fb6725996084f0c5c55fc130ac8/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala#L457
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I share sean's concern, I don't think your response is addressing it. You added one test on L151 makes sure that some event which is not predefined still works. But you don't have a test making sure that the old, verbose string can still be parsed (or is it somewhere else?)
probably this is indirectly covered by HistoryServerSuite but a more direrct test would be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a test ensuring all predefine storage levels can be read from the legacy format.
Sidenote: I've also noticed that the legacy format incorrectly handled the predefined StorageLevel.OFF_HEAP and an fact any other custom storage level with useOffHeap = true. It looks like a bug to me, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, I completely agree that off heap is not respected in the json format. can you file a bug? I think its still relevant even after this goes in, for custom levels
Note that the previous commit contained a bug -- user-defined storage levels caused an exception in JsonProtocol.
| override def hashCode(): Int = toInt * 41 + replication | ||
|
|
||
| /** Name of the storage level if it is predefined or [[None]] otherwise. */ | ||
| def name: Option[String] = this match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fromString below has basically the opposite of this. How about storing the mapping in a Seq[(StorageLevel, String)] and using that in both methods? e.g. here it would be:
knownLevels.collect { case (level, name) if level == this => name }.headOption
And pretty similar code in fromString.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds good, will do. A slightly unrelated point: I feel that the name fromString somehow implies that it's the opposite of toString. What do you think about renaming it to fromName now that we have name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about renaming
It's a public method so renaming means breaking compatibility.
|
ok to test |
|
Test build #84984 has finished for PR 19992 at commit
|
|
Test build #85006 has finished for PR 19992 at commit
|
|
Minor update: I've simulated #18162 on one of our 80G event logs and (unless there is a bug in the filtering code) the log shrank to 157M. The effect of this patch was almost negligible, it brought the size down to 155M. It is unclear for now if this pattern generalizes to other workloads. See JIRA ticket for details. |
|
Test build #85012 has finished for PR 19992 at commit
|
|
Test build #85019 has finished for PR 19992 at commit
|
|
Can someone have a look at the tests, please? I can't see the failure (and in theory, the change should not affect SparkR). |
|
Ignore the error, it's being fixed separately. |
|
Test build #4016 has finished for PR 19992 at commit
|
|
|
||
| /** Name of the storage level if it is predefined or `None` otherwise. */ | ||
| def name: Option[String] = StorageLevel.PREDEFINED | ||
| .collectFirst { case (storageLevel, name) if storageLevel == this => name } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the method body needs multiple lines it's better to wrap it with { }. Same thing below.
| ("Deserialized" -> storageLevel.deserialized) ~ | ||
| ("Replication" -> storageLevel.replication) | ||
| } | ||
| def storageLevelToJson(storageLevel: StorageLevel): JValue = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Braces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, missed it. I've decided not to add braces to storageLevelFromJson because it seems to look OK with the toplevel match.
|
Test build #85135 has finished for PR 19992 at commit
|
| ("Replication" -> storageLevel.replication) | ||
| } | ||
| def storageLevelToJson(storageLevel: StorageLevel): JValue = | ||
| storageLevel.name match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
storageLevel.name.getOrElse(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Sadly, in this case, getOrElse requires explicit type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, after seeing the compilation error, I recall why I went with a match instead of getOrElse -- the former does not require an explicit conversion to JString.
| val replication = (json \ "Replication").extract[Int] | ||
| StorageLevel(useDisk, useMemory, deserialized, replication) | ||
| case _ => | ||
| throw new IllegalArgumentException(json.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
throw new IllegalArgumentException(s"Invalid storage level from json: $json.")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I've changed the message to match the one in accumValueFromJson.
|
Test build #85402 has finished for PR 19992 at commit
|
1fc6e75 to
b1e6f5f
Compare
|
LGTM |
|
Test build #85403 has finished for PR 19992 at commit
|
|
Test build #85579 has finished for PR 19992 at commit
|
9fbfe40 to
cb1fe6a
Compare
|
Test build #85580 has finished for PR 19992 at commit
|
|
change is fine, but from discussion on the jira I'm unclear if this is really worth it -- gain seems pretty small after the other fix in 2.3. |
|
Can one of the admins verify this patch? |
|
@squito I think it's fine to just close the PR/JIRA issue. |
|
thanks for looking into this @superbobry -- can you actually close this yourself? we can't directly close it (there is a way but its more complicated) |
What changes were proposed in this pull request?
The format of event logs uses a redundant representation for storage
levels, for instance, StorageLevel.DISK_ONLY is represented as
which is 64 bytes more. This commit changes the event log representation
of the StorageLevel to predefined constants: NONE, DISK_ONLY, etc. The
change is fully backwards compatible.
How was this patch tested?
coreunit tests.