Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8511] [WIP] KinesisIO.Read enhanced fanout #9899

Closed
wants to merge 1 commit into from

Conversation

jfarr
Copy link
Contributor

@jfarr jfarr commented Oct 27, 2019

Hi @aromanenko-dev, here is my idea of how KinesisIO could support enhanced fanout consumers using subscribeToShard from the AWS SDK v2. It's still WIP but I wanted to see what you think about the approach. As you probably know the main difference is that instead of polling getRecords() you subscribe to the shard with a callback. You also have to keep renewing the subscription every 5 minutes but subscribeToShard returns a CompletableFuture that completes when the subscription expires so you can just call join() then subscribeToShard() again in a loop. Other than that I think everything else is the same. So here's the approach I took:

  • I added a consumerArn param to KinesisIO.Read and plumbed that through to KinesisSource, DynamicCheckpointGenerator, ShardCheckpoint, and ShardRecordsIterator.
  • I added a subscribeToShard method to ShardCheckpoint that mirrors the starting position logic from getShardIterator. The exception is that it has a resubscribe param which will cause it to always subscribe from LATEST if true.
  • I added a subscribeToShard method to SimplifiedKinesisClient that handles calling KinesisAsyncClient.subscribeToShard and wraps any exceptions.
  • I added a subscribeToShard method to ShardRecordsIterator. This contains the Kinesis subscriber and error handler so this is where KinesisAsyncClient calls back to with SubscribeToShardEvents. This is so we can apply the RecordFilter to the list of returned events after we deaggregate and convert each KinesisClientRecord to a KinesisRecord and before handing it to the consumer that was passed in as a parameter. The CompletableFuture from subscribeToShard gets passed back to this point where we call join(), so this method is blocking for the duration of the subscription. This could maybe use some work to make it more responsive to interruption.
  • The readLoop method in ShardReadersPool is where we poll when calling getRecords(). I added a subscribeLoop method that loops on ShardRecordsIterator.subscribeToShard() instead. In startReadingShards() we call ShardRecordsIterator.hasConsumer() which checks whether consumerArn is populated then we run either readLoop() or subscribeLoop() on the executor service accordingly.
  • I factored out a putRecord method in ShardReadersPool that gets called from readLoop() and also gets passed to ShardRecordsIterator.subscribeToShard() as the KinesisRecord consumer. This encapsulates putting the record onto recordsQueue and incrementing the per-shard record count.
  • ShardRecordsIterator keeps track of whether to call ShardCheckpoint.subscribeToShard() with resubscribe = true using a boolean field resubscribe. This starts as false and is only updated to true when ackRecord() is called so we can guarantee that we were able to subscribe from the initial checkpoint starting position successfully before we start subsequently resubscribing from LATEST. Possibly this resubscribe param is overkill and it would work just as well if you always subscribe from the latest checkpoint but I haven't tested that yet.

I also converted the existing code to using the AWS SDK v2 exclusively, which meant removing the writer from the v2 KinesisIO for now as it does not appear that there's a KPL based on the v2 AWS SDK yet. I think this could be separated from adding the enhanced fanout support if you want though.

wdyt?


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@jfarr
Copy link
Contributor Author

jfarr commented Oct 27, 2019

Here are the diffs between old and new in case that makes it easier to see the changes.
diff-main.txt
diff-test.txt

@aromanenko-dev aromanenko-dev self-requested a review October 28, 2019 14:54
@jfarr jfarr changed the title [WIP] KinesisIO enhanced fanout [BEAM-8511] [WIP] KinesisIO enhanced fanout Oct 29, 2019
@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Oct 30, 2019

Thanks! Sorry for delay with response, I was a bit a busy this week but I'll take a look later.

@aromanenko-dev
Copy link
Contributor

Btw, afaik, @cmachgodaddy also was working on adding new Kinesis fanout support into KinesisIO based on AWS SDK V2 (see design doc). Could you sync with him about that?

@jfarr
Copy link
Contributor Author

jfarr commented Oct 30, 2019

Thanks @aromanenko-dev, will do.

@jfarr
Copy link
Contributor Author

jfarr commented Oct 31, 2019

This diff has just the changes for enhanced fanout before I did the SDK v2 conversion. There have been a few small changes / bugfixes since then but hopefully you can get the idea.
diff.txt

@cmachgodaddy
Copy link
Contributor

@aromanenko-dev and @iemejia , as we have discussed about this IO migration, we may need to consider:

  1. A separate subscribe api/function for the IO, so the existing users don’t “have to be migrated” to enhanced-fan-out. Using enhanced-fan-out is quite expensive, so not sure everyone want it until they need speed and latency
  2. Keep existing IO’s read and just convert it to V2, to support backward compatibility
  3. Move KinesisIO V2 to amazon-web-service2 submodel.

However, this PR seems to support a "fall back" strategy, which means if users don't provide consumer's ARN, it will fall back and use the standard way of pull records. I don't have any preferences, for which one would work better or cleaner, as long as we support backward compatibility. What do you think?

Copy link
Contributor

@cmachgodaddy cmachgodaddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test adding reviewer

@jfarr
Copy link
Contributor Author

jfarr commented Nov 13, 2019

Hi @aromanenko-dev any thoughts on this PR or Cam's comments?

@aromanenko-dev
Copy link
Contributor

@jfarr Sorry again for a delay with review, I'll try to do this asap

@aromanenko-dev
Copy link
Contributor

Run Python PreCommit

@aromanenko-dev aromanenko-dev changed the title [BEAM-8511] [WIP] KinesisIO enhanced fanout [BEAM-8511] [WIP] KinesisIO.Read enhanced fanout Nov 15, 2019
Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for your contribution! And thank you for separate diff files that include only your changes. I did a review based on them and I expect that all other functionality remains as it was before.

In general, it looks fine, I left several comments inline and there are several things that I wanted to mention in general:

  1. I think it would make sense to move KinesisIO SDK V2 into new package org.apache.beam.sdk.io.aws2 with all other Amazon SDK V2 related IOs. Initially, it was in separate package since KPL had Apache incompatible license, now it's fixed (see BEAM-3549 and BEAM-7894 ).
  2. Could you add unit tests for new functionality?
  3. Have you tested it on real cluster and real data? If yes, any additional info about that would be very appreciated to have.
  4. I agree with @cmachgodaddy point that we need to make it clear for users that it supports two modes of reading - pull and subscribe. I think more examples in KinesisIO Javadoc should be enough.
  5. Does "subscribe mode" support closing or splitting the shards?

import software.amazon.kinesis.common.InitialPositionInStream;

/**
* {@link PTransform}s for reading from and writing to <a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, remove "writing" since this PR adds support of reading only.

* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withCustomWatermarkPolicy(new MyCustomPolicyFactory())
* }</pre>
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add a quick example that shows using withConsumerArn()

Instant readTime,
String streamName,
String shardId) {
this.data = copyData(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a deep copy here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deep copy here is because of the call to KinesisClientRecord::fromRecord here:

https://github.com/jfarr/beam/blob/c514ebe7bfbdbc9335809446ff8a7716e46cb923/sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/SimplifiedKinesisClient.java#L202

This converts the AWS SDK Record to a KCL KinesisClientRecord so we can call deaggregate(), but also causes KinesisClientRecord's data to be converted to a read-only HeapByteBuffer, which then throws ReadOnlyBufferException when we call array() here:

https://github.com/jfarr/beam/blob/c514ebe7bfbdbc9335809446ff8a7716e46cb923/sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/KinesisRecord.java#L102

As far as I can tell a read-only byte buffer will not allow direct access to the backing array which I think is kind of the point. If I'm mistaken please let me know and I can change this. Initially I had fixed it by copying the data to a new byte array in getDataAsBytes() but it seemed more efficient to take the hit only once in the constructor.

I will add a comment here to explain the reason for the deep copy. Alternately, we could probably roll our own version of KinesisClientRecord::fromRecord that doesn't convert the data to a read-only buffer if you'd rather do that.

.apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes()))
.apply(
"Write to Kinesis",
org.apache.beam.sdk.io.kinesis.KinesisIO.write()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd not recommend to depend on previous version of KinesisIO.Write. Just write messages using standard Kinesis client until we don't have KinesisIO.Write SDK V2 implemented.

}

/** Read test dataset from Kinesis stream. */
private void runRead() {
Copy link
Contributor

@aromanenko-dev aromanenko-dev Nov 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can read messages using two different modes - pull and subscribe - then I think we need to have two different IT Read tests for that.

@jfarr
Copy link
Contributor Author

jfarr commented Nov 20, 2019

@aromanenko-dev thank you for the feedback. I’ll work on getting those changes in. I’m sorry, the last week or so has been very busy for me. I will be on vacation from my day job all of next week so I hope to wrap up this one and #9765 as much as I can. I have been testing this against our production data stream for a while now and functionally it’s been working very well but I am still seeing some unexpected latencies at higher throughputs and working to get to the bottom of that. I’ll try to post some info about that as well.

Copy link
Contributor

@cmachgodaddy cmachgodaddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one minor question. I couldn't open the test file, is it required permission?


@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also new. any reason, @jfarr ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmachgodaddy I found it useful for debugging, so I left it in.

@aromanenko-dev
Copy link
Contributor

@cmachgodaddy Which test file?

@cmachgodaddy
Copy link
Contributor

diff-test.txt attached in this PR

@aromanenko-dev
Copy link
Contributor

@cmachgodaddy Sorry, no idea about permission. I just copied that to gist: https://gist.github.com/aromanenko-dev/b99ad6eb9d7b6de31b8e9607b37aaefb

@aromanenko-dev
Copy link
Contributor

Hi @jfarr, do you have any updates on this PR?

@jfarr
Copy link
Contributor Author

jfarr commented Dec 17, 2019

Hi @aromanenko-dev yes I'm sorry I haven't gotten back to this yet but I would like to finish it up. It looks like I have some tests to write and update, some Javadocs to update and add some examples, and to move the code to the new aws2 package, and also I need to look into shard closing and splitting as you pointed out because I think right now it does not handle that. This Christmas break coming up next week is looking pretty good for that.

@aromanenko-dev
Copy link
Contributor

@jfarr No pressure, please, take your time and let me know if you need any help. Thanks!

@stale
Copy link

stale bot commented Mar 3, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Mar 3, 2020
@aromanenko-dev
Copy link
Contributor

@jfarr Just wanted to ping you about your plans for this PR.

@stale stale bot removed the stale label Mar 3, 2020
@jfarr
Copy link
Contributor Author

jfarr commented Mar 3, 2020

@aromanenko-dev Hi, I still have plans to finish it. I've made a small amount of progress since the last round of comments. I got the code moved into the amazon-web-services2 module and found some incompatibilities in the unit test frameworks being used, so I worked through that and got all the tests passing again. I've also done some research on how to implement shard splitting and merging and I think I know what needs to be done but haven't had time to implement it yet. And of course I still need to finish the unit tests and javadocs, and it looks like the PR needs a rebase now. I've been more focused on getting BEAM-8382 finished so now that that one seems to be wrapping up I can pay more attention to this PR in the coming weeks.

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Mar 3, 2020

@jfarr Thank you for update!

@jfarr jfarr force-pushed the kinesis-enhanced-fanout branch from 4ae4739 to 4bc0e51 Compare April 5, 2020 18:15
@stale
Copy link

stale bot commented Jun 5, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Jun 5, 2020
@stale
Copy link

stale bot commented Jun 12, 2020

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this Jun 12, 2020
@aromanenko-dev
Copy link
Contributor

@jfarr Kind ping on this PR. Would you have a time to finish this one?

@jfarr
Copy link
Contributor Author

jfarr commented Jun 12, 2020

@jfarr Kind ping on this PR. Would you have a time to finish this one?

@aromanenko-dev Sure, I'm just waiting for a resolution on BEAM-9702. Did you still want to merge that first?

@aromanenko-dev
Copy link
Contributor

@jfarr I added my thoughts there. I think we should finish moving KinesisIO into AWS SDK V2 and afterwards add new features there. Please, let me know wdyt.

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Aug 4, 2020

@jfarr Since BEAM-9702 was resolved, are you going to continue to work on adding Kinesis enhanced fanout support?

@jfarr
Copy link
Contributor Author

jfarr commented Aug 5, 2020

@jfarr Since BEAM-9702 was resolved, are you going to continue to work on adding Kinesis enhanced fanout support?

Hi @aromanenko-dev, yes that's my plan.

@psolomin
Copy link
Contributor

Hello @aromanenko-dev @jfarr

What's the status of this feature? I came across a use-case where enhanced fan-out actually became necessary. Do you know if anyone is working on supporting this?

Thanks.

@aromanenko-dev
Copy link
Contributor

@jfarr Do you have any updates on this?

@psolomin The work on this PR is still in progress. I did a first round of review (see my comments above) and it waited for BEAM-9702 to be resolved (which actually is for now). Would you like to work on this (if @jfarr can't for some reasons)?

@jfarr
Copy link
Contributor Author

jfarr commented Aug 30, 2021

I am no longer working with Beam so it’s unlikely that I will ever finish this. Please feel free to pick up where I left off.

@psolomin
Copy link
Contributor

psolomin commented Sep 4, 2021

first round of review

Thanks

Would you like to work on this?

I will have some time to take a look next week, thanks

@aromanenko-dev
Copy link
Contributor

@psolomin It would be great! Don't hesitate to ask any questions here or on related Jira if you have.

@biancaberdugo
Copy link

Hey !

The work on this PR is still in progress. I did a first round of review (see my comments above) and it waited for BEAM-9702 to be resolved (which actually is for now). Would you like to work on this (if @jfarr can't for some reasons)?

As the ticket BEAM-9702 is already solved, it have plans to continue the work here?

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Oct 4, 2022

cc: @mosche

@biancaberdugo I'm not aware of any plans on this but it would be great feature to add.

@psolomin
Copy link
Contributor

psolomin commented Oct 5, 2022

Since the request got some activity, I had plans to give it a try within this week @biancaberdugo

@mosche
Copy link
Member

mosche commented Oct 5, 2022

Great to see activity on this 🎉
I'm happy to support or assist a bit in case there's any problems @psolomin.

Just for reference, the respective Github issue is #19967.

@mosche
Copy link
Member

mosche commented Oct 5, 2022

@psolomin Be careful if you intend to resume work from this stale branch. The number of conflicts with the main branch is huge. I would recommend starting from scratch and following the PR description here (if helpful)

@aromanenko-dev
Copy link
Contributor

Yes, this PR contains a lot of code and logic that either already implemented or won't be needed, so definitely this feature (enhanced fanout) should be developed from scratch but it will worth to take into account the ideas from here.

And of course, it should be implemented only for KinesisIO based on AWS SDK v2.

@psolomin
Copy link
Contributor

psolomin commented Oct 7, 2022

@mosche @aromanenko-dev I've prepared a draft PR which passes most basic smoke tests I did manually using my own AWS account. Mind giving a quick glance to validate the approach overall? Once I know I'm on the right direction, I will add more tests and clean my code a bit. It is still very raw now #23540

Thank you!

@psolomin psolomin mentioned this pull request Oct 11, 2022
4 tasks
@psolomin
Copy link
Contributor

psolomin commented Apr 13, 2023

Hi @biancaberdugo

#23540 was merged today and I expect it will be released with Beam 2.48.0

If you want to try out a pre-release version - in a couple of days it will be available in Maven snapshots repo, and you will be able to link it via:

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

...

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
            <version>2.48.0-SNAPSHOT</version>
        </dependency>

        < other beam sdk dependencies >

Please, note that KinesisIO parts of KDA snapshots created with previous versions of Beam will be not compatible with new KinesisIO - you will need to start apps with AT_TIMESTAMP / TRIM_HORIZON and set AllowNonRestoredState in the KDA configs.

@psolomin
Copy link
Contributor

I've also updated examples here: aws-samples/amazon-kinesis-data-analytics-examples#60

Just in case you still use legacy KinesisIO connector.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants