-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client]Duplicate messages when use MultiTopicsConsumerImpl #17443
[fix][client]Duplicate messages when use MultiTopicsConsumerImpl #17443
Conversation
This PR should merge into these branches (because this test was appended in branch-2.11):
Note: new features have been added in |
/pulsarbot rerun-failure-checks |
bcfde76
to
f26dd8a
Compare
@poorbarcode Could you please provide more context about how the race condition will happen? |
Already added the description. I'm sorry I didn't finish |
|
||
private CompletableFuture<Void> internalSeekAsync(Function<Consumer, CompletableFuture<Void>> childSeekFunction) { | ||
pause(); | ||
CompletableFuture<Void> res = FutureUtil.waitForAll(receiveMessageFutures).thenCompose(__ -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to wait for completion of these ongoing receives as the messages will be discarded anyway, can we just cancel them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, but we don't have the operability of cancel
, and the waiting-event only happens on this scenario: call seek
multi times quickly, along with read
, this is very rare.
@poorbarcode Thanks. I understand the problem for now. There is ML discussion https://lists.apache.org/thread/97o9t4ltkds5pfq41l9xbbd31t41qm8w And this one https://lists.apache.org/thread/gnqwxo7w6n6g72ochvgpgv4s6r8mnwb7 is also a duplicated message These are not exactly the same problem, but tight related. For this PR. we will introduce many queue operations to avoid duplicated messages. |
PIP-194 will fix this problem |
Fixes
Motivation
There has a race condition between
add messages to incoming queue
andclean incoming queue
.When we call
MultiTopicsConsumerImpl.seek
, we expect the process executed like this:seek
seek
finishedBut the real process might be executed like this:
receiveMessageFromConsumer
is being executed at this time, many messages being added to the incoming queueseek
receiveMessageFromConsumer
is being executed at step 2, and many messages are being added to the incoming queue at this stepseek
finishedthen the same message will be consumed twice:
How to reproduce?
execute
TopicReaderTest.testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic
100 ~400 timesModifications
Make
seek
to two parts action:part-1:
pause
pause
, all actionreceiveMessageFromConsumer
will not execute immediately, just into thepending queue
part-2:
resume
pending queue
Documentation
doc-required
doc-not-needed
doc
doc-complete