-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix messages being processed twice by the same consumer instance after rebalancing #591
Conversation
…ting a new stream
Saw that after a Revoked, Assigned only the non-assigned partitions' streams are ended.
These changes were copied over from zio#591.
…treams" test relating to shutdown. This was also fixed in #591
I was asked for a review so I'll try. This change is too large to give a qualified opinion that is correct in the details. This PR proposes to ignore the last committed offset and instead 'seek' to the last known consumed offset. My first worry is that this only solves part of the problem. When a partition is removed from one consumer and assigned to another, the other consumer does not know what has been consumed and has no choice but to look at what has been committed. In other words, when a partition moves to another consumer, we will continue to have duplicates. Another worry is that consumption is no guarantee for processing, only commits give this guarantee. This makes the strategy of this PR not suitable for conservative use cases where every messages is important. My final worry is that the current approach is highly dependent on knowning when a rebalance is happening. Unfortunately, using the rebalance listener for that is not reliable. It could be that the only change is that a partition is revoked. In this case we do not detect the end of a rebalance because only onRevoke will be called, and not onAssigned. IMHO the best solution is to make sure a revoked stream gets the chance to finish its work and commit everything before the rebalance starts. This is the purpose of the onRevoked callback of the rebalance listener. This means quite an overhaul of how the runloop works and I am happy to assist with doing this. |
…treams" test relating to shutdown. This was also fixed in #591
It appears I was mistaken about a few things. Let me correct that here:
I understand now that this PR will seek to the last know committed offset. This sounds safe to me!
This worry stands. However, this PR still makes the situation better.
This worry now falls away.
This worry was taken away by another PR. Runloop no longer depends on the this. In other words, this PR looks much better than I though before! |
#788 will implement a significant portion of this PR. We should probably harvest some things from this PR into separate PRs. |
Fix for issue #590
After rebalancing, partitions that were revoked and then reassigned to the consumer are resumed from the last committed offset, which is lower than the last fetched offset that may still be in the partition stream's buffer. This leads to the same consumer instance processing messages for the revoked topic-partition twice. The issue is exacerbated for higher values of
perPartitionChunkPrefetch
and thebufferSize
ofConsumer.plainStream
.The fix to minimize this issue is to close the old partition streams, wait until they are fully drained so that the last offset can be committed by the user code (depending on stream topology, eg offset batching over multiple partitions negatively affects this issue),
seek
to the last committed offset and only then start fetching for the partition.When using
plainStream
, there is a race condition between finalizing the partition stream and committing the last offsets, so some messages may be processed duplicately. When usingpartitionedStream
and committing offsets as part of each partition's stream, the number of messages processed duplicately can be reduced to zero.The
restartStreamsOnRebalancing
setting is removed and the former behavior of the valuetrue
is now the default. This, as a consequence, improves the semantics of thepartitionedAssignmentStream
method, where each emitted chunk of partition streams represents a generation of the consumer group.Also in this PR:
topic
andpartition
.currentState
Ref outside of therunFoldZIO
by moving more logic from the rebalance handler intohandlePoll
CooperativeStickyAssignor
by examining the consumer group generation ID after rebalance events and adding more testsTODO:
When do buffered records ( = unrequested partitions in poll result) appear..?