Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix duplicate messages caused by seek #16171

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2778,6 +2778,10 @@ public Map<MessageIdImpl, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopic
return possibleSendToDeadLetterTopicMessages;
}

protected boolean isDuringSeek() {
return duringSeek.get();
}

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
}
messagesFuture.thenAcceptAsync(messages -> {
if (consumer.isDuringSeek()) {
receiveMessageFromConsumer(consumer, batchReceive);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When incomingMessages is not empty, there are two possibilities:

  1. loop until there is no message in the incomingMessages
  2. If ConsumerImpl.seek fails, the messages have popped from incomingMessages cannot be consumed until redeliver executes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have a question about this part.

It looks like not only from the MultiTopicConsumer. If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2. If ConsumerImpl.seek fails, the messages have popped from incomingMessages cannot be consumed until redeliver executes

Good catch! We need to consider this.

It looks like not only from the MultiTopicConsumer.

Yes.

If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?

For MultiTopicConsumer, this is right, because the MultiTopicConsumer has a loop to pulling the messsage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer, but it can't fix the issue that the user facing. After the message polled from the internal consumer to the queue of the MultiTopicConsumer, the user will still have chance to get duplicated messages during the seek operation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer.

Right.

the user will still have chance to get duplicated messages during the seek operation?

I think we need to figure out the details of this seek.

  1. When to clean incomingMessages, after or before seek?
  2. During the seek, can the client continue to consume the incomingMessages?

return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
Expand Down