Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProcessRecordsInput.isAtShardEnd value is not passed to processRecords #934

Closed
ktulinger opened this issue Mar 30, 2022 · 2 comments
Closed

Comments

@ktulinger
Copy link

ktulinger commented Mar 30, 2022

Hello,

we identified a possible bug in handling the isAtShardEnd value for a record processor. When receiving the records in processRecords, the value of isAtShardEnd is always false.

After looking at the code base, it is never set in the builder object:

final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime())
.checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build();

We use alpakka kinesis library, which, we observed, relies on the shardEnd value to be correctly set in order to properly checkpoint the shards.

@sochi
Copy link
Contributor

sochi commented Mar 30, 2022

Akka's Alpakka Kinesis library relies on this functionality, see their ShardProcessor: https://github.com/akka/alpakka/blob/master/kinesis/src/main/scala/akka/stream/alpakka/kinesis/impl/ShardProcessor.scala

The library is packaged as akka-stream-alpakka-kinesis.

@stair-aws
Copy link
Contributor

Thanks for your contribution! PR merged; closing issue.

This should be included in our next release which will likely be version 2.4.6.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants