-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass isAtShardEnd
correctly to processRecords
call
#935
Conversation
The default is false otherwise, i.e., the processor is always getting isAtShardEnd=false.
Hello @zengyu714, could you please make a review on this one? (I've seen you active on other PRs) |
Hi @shanmsac or @namanksrivastava, could you take a look? |
Hi @shanmsac and @zengyu714, I've seen you recently reviewed a few other PRs and issues. The bug described is annoying, and any code that relies on this flag being set misbehaves, e.g., Alpakka uses it (https://doc.akka.io/docs/alpakka/current/). |
@shanmsac @namanksrivastava @zengyu714 Review please! |
@shanmsac @namanksrivastava @zengyu714 again, please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sochi @ktulinger @velmont Thanks for your unmatched patience! It's unfortunate that this PR has lingered since 2022/03/30.
LGTM.
final ProcessRecordsInput processRecordsInput = ProcessRecordsInput.builder().records(records).cacheExitTime(input.cacheExitTime()).cacheEntryTime(input.cacheEntryTime()) | ||
.checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build(); | ||
.isAtShardEnd(input.isAtShardEnd()).checkpointer(recordProcessorCheckpointer).millisBehindLatest(input.millisBehindLatest()).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: ProcessRecordsInput
is annotated by @Builder(toBuilder = true)
. Leveraging this functionality would simplify the code, affect this desired change, and avoid future issues if a new variable is ever added to ProcessRecordsInput
. Example:
final ProcessRecordsInput processRecordsInput = input.toBuilder()
.records(records).checkpointer(recordProcessorCheckpointer).build();
The processor's
processRecords
is always called withisAtShardEnd=false
. The flag is not being set correctly.Issue #, if available:
#934
The client never gets that it reached the end of shard. The information is only available via side channel lifecycle
TaskExecutionListener
, however the shard is immediately checkpointed (regardless the rest of the processing). If the code using KCL exists after this point, it may effectively lose the data between the previous checkpoint and the shard end.Description of changes:
Propagate the
isAtShardEnd
from sourceProcessRecordsInput
to the newly builtProcessRecordsInput
that is then passed toprocessRecords
.By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.