Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13249: Always update changelog offsets before writing the checkpoint file #11283

Merged
merged 3 commits into from
Sep 13, 2021

Conversation

hutchiko
Copy link
Contributor

When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .

I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.

@hutchiko hutchiko force-pushed the KAFKA-13249 branch 2 times, most recently from 516f9e5 to cbb8c99 Compare August 30, 2021 06:22
@ableegoldman
Copy link
Contributor

Hey @hutchiko , thanks for digging into this bug so thoroughly and providing a patch! One quick question before I review -- does this only affect version 2.8 or below specifically, or could this be present on trunk/3.0 as well? Unless you already checked this, my guess would be the latter. If you can verify that is true, then can you please retarget this PR against the trunk branch? Once it gets merged then we can cherrypick back to 2.8 from there. Thanks!

@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
// commitNeeded indicates we may have processed some records since last commit
// and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
if (commitNeeded) {
if (commitNeeded || enforceCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just removed the check altogether? It's not like updating the changelog offsets is a particularly "heavy" call, we may as well future-proof things even more by just updating the offsets any time.

In fact, why do we even have this weird split brain logic to begin with...it would make more sense to just update the offsets inside the StreamTask#maybeWriteCheckpoint and stateMgr.checkpoint() methods, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same thing but not knowing enough about all the streams internals I thought I'd just go with the most minimal change possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that I added this check is that checkpointableOffsets() can potentially be expensive. I think the fix to have commitNeeded || enforceCheckpoint is actually elegant as we did not introduce extra unnecessary overhead much, since it is only true when closing the task.

@hutchiko
Copy link
Contributor Author

hutchiko commented Aug 31, 2021

@ableegoldman I did not test against 3.0 just 2.7 and 2.8. I'll rebase onto trunk as see how that goes.

…with change log after clean shutdown

Adds test to verify that state store and checkpoint file are in sync with change log after clean shutdown
@hutchiko hutchiko changed the base branch from 2.8 to trunk August 31, 2021 01:37
@hutchiko
Copy link
Contributor Author

Yeah I've rebased and verified the issue is there in trunk too.

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Thanks for the quick fix @hutchiko !

@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
// commitNeeded indicates we may have processed some records since last commit
// and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
if (commitNeeded) {
if (commitNeeded || enforceCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that I added this check is that checkpointableOffsets() can potentially be expensive. I think the fix to have commitNeeded || enforceCheckpoint is actually elegant as we did not introduce extra unnecessary overhead much, since it is only true when closing the task.

@guozhangwang
Copy link
Contributor

but there was a relevant failure in the build: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11283/5/testReport/org.apache.kafka.streams.integration/EosIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldWriteLatestOffsetsToCheckpointOnShutdown_at_least_once_/

Guessing it's just some flakiness in the test, can you check that out before I merge?

@hutchiko I looked at the test code, and it seems to me there's indeed a timing-related flakiness. Could you try to fix it before we merge (you can first try to reproduce it, e.g. on IDE with repeated runs and see how often it could fail; and after you fix it usually we would try to verify that after say 1000 runs, there's no more failure).

@mjsax mjsax added the streams label Sep 10, 2021
@hutchiko hutchiko force-pushed the KAFKA-13249 branch 3 times, most recently from 4a3df41 to 39f2634 Compare September 12, 2021 21:56
@hutchiko
Copy link
Contributor Author

@guozhangwang @ableegoldman unfortunately I could never reproduce the CI failures however I have pushed up a refactor of the method which I think was responsible for the flakiness.

The original version of the the method was scanning backwards through the changelog topic searching for the top record so I could cross check that record's offset with the checkpointed offset. It had an implicit assumption that the consumer it was driving backwards would always get some records after a 50ms poll - thinking this through it's obviously a false assumption.

I switched the logic around so it just consumes forwards until it finds the end of the topic there are no assumptions about timing in the new logic so I'm hoping that will fix the flakiness.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hutchiko for the updated tests. I agree that with 10 recs written to partition, reading forward is easier, and not polling for 50ms only makes sense (since with EOS some offsets would just be txn markers).

@guozhangwang
Copy link
Contributor

I checked the failures of the new run and they are not related to the new tests. Merging to trunk now.

@guozhangwang guozhangwang merged commit a03bda6 into apache:trunk Sep 13, 2021
guozhangwang pushed a commit that referenced this pull request Sep 13, 2021
…point file (#11283)

When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .

I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Contributor

cc @kkonstantine , this is a critical bug fix hence I'm cherry-picking to 3.0 as well. If RC2 is voted through then it will fall on 3.0.1, otherwise if we vote for another RC3 I think it'd be a great-to-have in 3.0.0

@ableegoldman
Copy link
Contributor

@guozhangwang can you cherrypick this back to 2.8 at least? Maybe also 2.7 if there aren't any conflicts (seems like it should be a smooth merge but 🤷‍♀️ )

guozhangwang pushed a commit that referenced this pull request Sep 14, 2021
…point file (#11283)

When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .

I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Contributor

I resolved some conflicts and cherry-picked to 2.8; there are too many conflicts in 2.7 though.

@mjsax
Copy link
Member

mjsax commented Sep 29, 2021

@guozhangwang Now that 3.0.0 is released, should this fix be cherry-picked to 3.0 branch (and the jira be updated with fixed version 3.0.1?)

@guozhangwang
Copy link
Contributor

SG, will cherry-pick.

mjsax pushed a commit to confluentinc/kafka that referenced this pull request Oct 1, 2021
…point file (apache#11283)

When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .

I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <wangguoz@gmail.com>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…point file (apache#11283)

When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .

I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants