Skip to content

Commit 25abae8

Browse files
committed
[fix][client] Fix duplicate messages caused by seek
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 3752a11 commit 25abae8

File tree

2 files changed

+8
-0
lines changed

2 files changed

+8
-0
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+4
Original file line numberDiff line numberDiff line change
@@ -2778,6 +2778,10 @@ public Map<MessageIdImpl, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopic
27782778
return possibleSendToDeadLetterTopicMessages;
27792779
}
27802780

2781+
protected boolean isDuringSeek() {
2782+
return duringSeek.get();
2783+
}
2784+
27812785
private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
27822786

27832787
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

+4
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
252252
messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
253253
}
254254
messagesFuture.thenAcceptAsync(messages -> {
255+
if (consumer.isDuringSeek()) {
256+
receiveMessageFromConsumer(consumer, batchReceive);
257+
return;
258+
}
255259
if (log.isDebugEnabled()) {
256260
log.debug("[{}] [{}] Receive message from sub consumer:{}",
257261
topic, subscription, consumer.getTopic());

0 commit comments

Comments
 (0)