-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23683][SQL] FileCommitProtocol.instantiate() hardening #20824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23683][SQL] FileCommitProtocol.instantiate() hardening #20824
Conversation
…argument constructor, passing in the dynamicPartitionOverwrite parameter. If there is no such constructor, it falls back to the classic two-arg one. When InsertIntoHadoopFsRelationCommand passes down that dynamicPartitionOverwrite flag to FileCommitProtocol.instantiate(), it assumes that the instantiated protocol supports the specific requirements of dynamic partition overwrite. It does not notice when this does not hold, and so the output generated may be incorrect. This patch changes FileCommitProtocol.instantiate() so when dynamicPartitionOverwrite == true, it requires the protocol implementation to have a 3-arg constructor. Tests verify that * classes with only 2-arg constructor cannot be used with dynamic overwrite * classes with only 2-arg constructor can be used without dynamic overwrite * classes with 3 arg constructors can be used with both * the fallback to any two arg ctor takes place after the attempt to load the 3-arg ctor, * passing in invalid class types fail as expected (regression tests on expected behavior) Change-Id: I694868aecf865cfa552e031ea3f6dde8b600fa7b
|
Test build #88236 has finished for PR 20824 at commit
|
| instantiateClassic(true) | ||
| } | ||
| // check the contents of the message and rethrow if unexpected | ||
| if (!ex.toString.contains("Dynamic Partition Overwrite")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the test it's more conventional to write assert(ex.toString.contains("Dynamic Partition Overwrite"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but that loses the stack trace. And if there's one thing everyone hates is a jenkins build which says "you got the wrong exception but we won't say what", especially when you click through to the logs & see they've already been deleted.
This is why when I reimplemented intercept for java8 intercept/5
- added the ability to specify a string which was contained
- if the evaluated closure doesn't raise and exception, and doesn't return void or null, call toString() on the output (robustly) and include it in the message.
What I could do here is use fail(message, throwable) and make clear what the failure was? That way, the fact it's an assertion failure is visible, but the stack is retained?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about leaving a small comment here in order to prevent that someone fixes it without noticing that? I would wonder why assert(ex.toString.contains(...)) is not used here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
LGTM, pending Jenkins, thanks! |
…tion Change-Id: I92500e88d6fca40f7d7dfc7e073727c987b7c45c
|
Test build #88241 has finished for PR 20824 at commit
|
|
Hey @steveloughran, can you fix the title to like |
|
Or .. should it be |
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Fixed the title, used the new JIRA. |
…xception handling Change-Id: I9858d9fc625e64c3de75dc69c79b12fffdf79b06
|
Test build #88263 has finished for PR 20824 at commit
|
| logDebug("Falling back to (String, String) constructor") | ||
| require(!dynamicPartitionOverwrite, | ||
| "Dynamic Partition Overwrite is enabled but" + | ||
| s" the committer ${className} does not have the appropriate constructor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, why don't we warn and continue? Just wanted to make sure that we took this case into account. For example,
wouldn't this invalidate the case below?
private class CommitProtocol(arg1: String, arg2: String)
extends HadoopMapReduceCommitProtocol(arg1, arg2, true) {
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem is that the dynamic partition logic in InsertIntoHadoopFsRelationCommand assumes that rename() is a fast reliable operation you can do with any implementation of the FileCommitProtocol, sets itself up for it when enabled, then instantiates the inner committer, and carries on with the dynamic partitioning, irrespective of whether or not. rename() doesn't always work like that, breaking the rest of the algorithm.
If the committer doesn't have that 3-arg constructor, you can't be confident that you can do that. To silently log and continue is to run the risk that the underlying committers commit algorithm isn't compatible with the algorithm.
A fail-fast ensures that when the outcome is going to be unknown, you aren' t left trying to work out what's happened.
Regarding your example, yes, it's in trouble. Problem is: how to differentiate that from subclasses which don't know anything at all about the new feature. you can't even look for an interface on the newly created object if the base class implements it; you are left with some dynamic probe of the instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm .. actually we could do .. for example ..
abstract class FileCommitProtocol {
...
def dynamicPartitionOverwrite: Boolean = false
}class HadoopMapReduceCommitProtocol(
jobId: String,
path: String,
override val dynamicPartitionOverwrite: Boolean = false)(^ it's not double checked closely, for example, if the signature is safe or not. Was just an idea)
and use committer.dynamicPartitionOverwrite in InsertIntoHadoopFsRelationCommand to respect if the commit protocol supports or not, if I understood all correctly, and then produce a warning (or throw an error?) saying dynamic partition overwrite will be ignored (or not).
However, sure. I think this case is kind of a made-up case and should be a corner case I guess. I don't want to suggest an overkill (maybe) and I think we don't have to make this complicated too much for now.
I am okay as is. Just wanted to make sure that we considered and checked other possible stories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something like that would work, though it'd be bit more convoluted...InsertIntoFSRelation would have to check, and then handle the situation of missing support.
One thing to consider in any form is: all implementations of FileCommitProtocol should be aware of the new Dynamic Partition overwrite feature...adding a new 3-arg constructor is an implicit way of saying "I understand this". Where it's weak is there's no way for for it to say "I understand this and will handle it myself" Because essentially that's what being done in the [Netflix Partioned committer(https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java#L142), which purges all parts for which the new job has data. With that committer, if the insert asks for the feature then the FileCommitProtocol binding to it could (somehow) turn this on and so handle everything internally.
Like I said, a more complex model. It'd need changes a fair way through things and then the usual complexity of getting commit logic.
|
retest this please |
|
Test build #88267 has finished for PR 20824 at commit
|
|
Not a big deal at all but I acutally meant |
|
thanks, merging to master! |
## What changes were proposed in this pull request? With SPARK-20236, `FileCommitProtocol.instantiate()` looks for a three argument constructor, passing in the `dynamicPartitionOverwrite` parameter. If there is no such constructor, it falls back to the classic two-arg one. When `InsertIntoHadoopFsRelationCommand` passes down that `dynamicPartitionOverwrite` flag `to FileCommitProtocol.instantiate(`), it assumes that the instantiated protocol supports the specific requirements of dynamic partition overwrite. It does not notice when this does not hold, and so the output generated may be incorrect. This patch changes `FileCommitProtocol.instantiate()` so when `dynamicPartitionOverwrite == true`, it requires the protocol implementation to have a 3-arg constructor. Classic two arg constructors are supported when it is false. Also it adds some debug level logging for anyone trying to understand what's going on. ## How was this patch tested? Unit tests verify that * classes with only 2-arg constructor cannot be used with dynamic overwrite * classes with only 2-arg constructor can be used without dynamic overwrite * classes with 3 arg constructors can be used with both. * the fallback to any two arg ctor takes place after the attempt to load the 3-arg ctor, * passing in invalid class types fail as expected (regression tests on expected behavior) Author: Steve Loughran <stevel@hortonworks.com> Closes apache#20824 from steveloughran/stevel/SPARK-23683-protocol-instantiate.
What changes were proposed in this pull request?
With SPARK-20236,
FileCommitProtocol.instantiate()looks for a three argument constructor, passing in thedynamicPartitionOverwriteparameter. If there is no such constructor, it falls back to the classic two-arg one.When
InsertIntoHadoopFsRelationCommandpasses down thatdynamicPartitionOverwriteflagto FileCommitProtocol.instantiate(), it assumes that the instantiated protocol supports the specific requirements of dynamic partition overwrite. It does not notice when this does not hold, and so the output generated may be incorrect.This patch changes
FileCommitProtocol.instantiate()so whendynamicPartitionOverwrite == true, it requires the protocol implementation to have a 3-arg constructor. Classic two arg constructors are supported when it is false.Also it adds some debug level logging for anyone trying to understand what's going on.
How was this patch tested?
Unit tests verify that