From 25abae8f9fb04e9e57d4951bc88c4fe6b6d591ad Mon Sep 17 00:00:00 2001 From: Zixuan Liu <nodeces@gmail.com> Date: Tue, 12 Jul 2022 23:24:57 +0800 Subject: [PATCH] [fix][client] Fix duplicate messages caused by seek Signed-off-by: Zixuan Liu <nodeces@gmail.com> --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 4 ++++ .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c9cf4a12b7541..7656f144acac0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2778,6 +2778,10 @@ public Map<MessageIdImpl, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopic return possibleSendToDeadLetterTopicMessages; } + protected boolean isDuringSeek() { + return duringSeek.get(); + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 986a13e446a8c..ad3c0fe5a46dd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList); } messagesFuture.thenAcceptAsync(messages -> { + if (consumer.isDuringSeek()) { + receiveMessageFromConsumer(consumer, batchReceive); + return; + } if (log.isDebugEnabled()) { log.debug("[{}] [{}] Receive message from sub consumer:{}", topic, subscription, consumer.getTopic());