-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19911][STREAMING] Add builder interface for Kinesis DStreams #17250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Open questions I'd like feedback on:
I'd like to also extend this to allow configuring CloudWatch and DynamoDB-specific authorization which I imagine will be quite helpful to users. |
|
Test build #74342 has finished for PR 17250 at commit
|
8552caf to
bcb7667
Compare
|
Forgot to stop the |
|
Test build #74351 has finished for PR 17250 at commit
|
bcb7667 to
a604dc5
Compare
|
Test build #74379 has finished for PR 17250 at commit
|
a604dc5 to
8aeef08
Compare
|
Found a bit more time to work on this. Changes made:
|
|
Test build #74418 has finished for PR 17250 at commit
|
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The direction is great. Major feedback is:
- Builder shouldn't have constructor params. It should check required values at the end.
- Please at
@deprecatedtoKinesisUtils.createStreammethods. - We can have the
createStreammethods re-use the builder pattern in a separate PR. - We should think more about how to provide credentials. Ideally we only need 3 methods, not 3 x 6.
Maybe we should also have aAWSCredentialsBuilder. What do you think?
Ideally, the Builder class will be in KinesisInputDStream. KinesisInputDStream will have a private constructor, i.e.
private[kinesis] KinesisInputDStream private (
...)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that builders rarely have constructor params. I understand you wanted to make these the required options, but I would just rather have the builder have a zero-param constructor, and it checks for the required fields upon build(). What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably the first builder class I've implemented so I'll defer to your judgment here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also make this a required field, otherwise people will face confusing issues when they start 2 streams from the same Spark application.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. Feel free to push back on any other defaults as well-- figured these would just be a starting point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great to document what happens when both region and endpoint is set, but are referring to different regions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a look and see. To be honest supplying both of these has always felt pretty redundant to me. The AWS SDK has changed a bit in how it handles endpoints and regions as well, so it may also be worth revisiting how KinesisReceiver uses these params.
Long term, it may also be nice to allow for different endpoints to be specified for Kinesis, DynamoDB and CloudWatch (I think the KCL should support this...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you link InitialPositionInStream for simplicity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't default to the Spark app name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll make this required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have the builder take setStreamingContext, one which takes StreamingContext and the other JavaStreamingContext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be made required builder arguments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also have a setMessageHandler function as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move = to line above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the underscore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Declaring val storageLevel collides with DStream.storageLevel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this many methods for the credential provider, I feel we need a credential provider builder. I wouldn't want to re-enter everything again between dynamoDb and cloudWatch. If I want to keep them separate from the kinesis credentials
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look at introducing a credential builder
|
@brkyvz Thanks for taking a look! Re: major feedback:
I can move the builder to the companion object of |
|
@brkyvz Actually, now that I think about it, do we need to make |
|
Good point @budde. I can think of two options:
class Builder {
def build(): KinesisInputDStream[Array[Byte]]
def buildWithMessageHandler[T](f: Record => T): KinesisInputDStream[T]
}It's a matter of taking it as the first parameter or the final parameter. There are other ways to do it as well, but will throw runtime exceptions instead of at compile time. cc @rxin for input on APIs |
|
@brkyvz I think if we're eliminating the constructor arguments then the second approach you've proposed might make more sense. I can't think of anything cleaner. |
|
@budde Do you think you can update this PR? The 2.2 branch will be cut on Monday (2017-03-18). |
|
@brkyvz A conference took up a lot of my time last week but I should have it updated later today |
8aeef08 to
d6afaef
Compare
|
@brkyvz PR has been updated, apologies for the delay. I've added |
|
Test build #74901 has finished for PR 17250 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would you mind just re-using the code in KinesisUtils instead of copying the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about keeping it here and refactoring KinesisUtils to use it? I think this is what I was intending to do originally, just forgot to update the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
d6afaef to
3cc2df8
Compare
|
@brkyvz Updated the PR to remove |
|
Test build #74909 has finished for PR 17250 at commit
|
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a bit more to go.
I think we can rename SerializableCredentialProvider to CredentialProvider. Then it should have methods for keys and sts, not independent variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move = to the line above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix. Sorry I keep doing this :-/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. This isn't a great name for a user facing API, the user shouldn't have to care about if the provider is serializable or not, that's an implementation detail.
I understand your concerns with the AWSCredentials name collisions. However, I think it's the best name there is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about simply CredentialProvider
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we should definitely come up with a better name here. What about SparkAWSCredentials? Obviously it's not as succinct as AWSCredentials but I think it's a clear name that avoids collisions.
I'm okay with CredentialsProvider otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess SparkAWSCredentials also work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anyone who provides an accessKeyId should also provide a secretKey therefore I would take both together.
.withKeys(awsAccessKey: String, awsSecretKey: String)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll rework this builder to take multiple arguments for the long-lived keypair and STS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here:
def withSts(roleArn: String, sessionName: String)
def withSts(roleArn: String, sessionName: String, externalId: String)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you no longer need this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, thanks for catching it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't call these Stable just yet :) Let's call it evolving for one release cycle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sure. Honestly I just cribbed these annotations from SparkSession.Builder so I appreciate you letting me know what the proper convention is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, let's call it evolving for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
3cc2df8 to
337b6ba
Compare
|
@brkyvz Updated per your feedback. Most significant change is renaming |
|
Test build #75065 has finished for PR 17250 at commit
|
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking really good. I think we can merge it after this pass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The privates are unnecessary since the class itself is private. You can either:
- remove
private[kinesis] - make it
private valand then usePrivateMethodTesterin the tests.
I'm fine either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's really just there so that I could access the values directly from the test. I'll look into using PrivateMethodTester, thanks for the suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you keep it val it should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, was just thinking that'd be a lot simpler. I'll go that route. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkAWSCredentials
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Figured out why I didn't catch this before-- apparently I can't spell "serializable". Ugh!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix and do a "grep -r 'SerialziableCredentialsProvider' *" to make sure this isn't appearing anywhere else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does this come from? LocalJavaStreamingContext? If so, I wouldn't stop it. In fact, you can use Mockito to create a mock JavaStreamingContext if you like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll probably just go the mock route then and ignore the context all together. I was seeing a bunch of "Spark context is already running" error messages when I tried to run all of the streaming tests before I added this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a mock here and in the other test might not be very practical after all-- looks like the DStream constructor hooks into StreamingContext. We would at least need to mock it's getState() method as well as mocking a SparkContext along with its local properties.
Edit: this might not be as bad as I thought-- I'll keep trying the mock approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fine then. As long as we don't break the environment for other tests, do proper clean up, it should be fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this will need to be left as-is. In the current test implementation we check that checkpointInterval isn't a required option and its default value is obtained via ssc.graph.batchDuration, which we won't be able to mock because DStreamGraph is final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we're not really starting a stream, and testing API's, we should just mock it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would split this into 3 very small tests:
should raise an exception if StreamingContext is missing
should raise an exception if stream name is missing
should raise an exception if checkpoint app name is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you make this a single line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: single line please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix it. This happened since DefaultCredentialsProvider was shortened to DefaultCredentials so I'll try to check for other places where a multiline statement can be rolled up into a single line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we didn't have it before, but could you also check that after deserialization, they're equivalent?
val creds = BasicCredentials("x", "y")
assert(Utils.deserialize[BasicCredentials](Utils.serialize(creds)) === creds)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
337b6ba to
6f11978
Compare
|
@brkyvz Updated per your feedback, thanks for taking a thorough look. I also renamed the |
|
Thanks a lot for the quick turnaround @budde ! Could you also contribute to the docs as well with the new builder API? |
|
Test build #75111 has finished for PR 17250 at commit
|
|
@brkyvz Sure, want me to add it to this PR or open a new one? |
|
Thanks! new PR would be easier! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to add the note here as well?
* @note The given AWS credentials will get saved in DStream checkpoints if checkpointing
* is enabled. Make sure that your checkpoint directory is secure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
6f11978 to
5315f1e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Make sure that your checkpoint directory is secure. Prefer using the [https://link.to.amazon.docs default credential provider chain]] if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The link in this case will be quite long-- URL just by itself pushes it over the 100 char limit:
[[http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default default credential provider chain]]
Do you know if there's a way to safely split this into multiple lines? Should I just turn style checks off for this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to add
// scalastyle:off
// scalastyle:on
around the doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
- Add KinesisInputDStream.Builder class - Add KinesisInputDStreamBuilderSuite test suite - Add JavaKinesisInputDStreamBuilderSuite test suite - Rename SerializableCredentialsProvider -> SparkAWSCredentials - Add SparkAWSCredentials.Builder - Add SparkAWSCredentialsBuilderSuite test suite - Make KinesisInputDStream ctor args package private for testing - Add args to KinesisInputDStream and KinesisReceiver for optional service-specific auth (Kinesis, DynamoDB and CloudWatch)
5315f1e to
03f91da
Compare
|
LGTM pending tests. Thanks a lot for this PR @budde PS It's okay to make new commits, you don't have to squash commit every time :) |
|
Test build #75170 has finished for PR 17250 at commit
|
|
Test build #75172 has finished for PR 17250 at commit
|
|
Merging to master |
|
@brkyvz Awesome, thanks for reviewing this! |
What changes were proposed in this pull request?
service-specific auth (Kinesis, DynamoDB and CloudWatch)
How was this patch tested?
Added
KinesisDStreamBuilderSuiteto verify builder class works as expected