diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c9cf4a12b7541..7656f144acac0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2778,6 +2778,10 @@ public Map>> getPossibleSendToDeadLetterTopic return possibleSendToDeadLetterTopicMessages; } + protected boolean isDuringSeek() { + return duringSeek.get(); + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 986a13e446a8c..ad3c0fe5a46dd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer, boolean batchR messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList); } messagesFuture.thenAcceptAsync(messages -> { + if (consumer.isDuringSeek()) { + receiveMessageFromConsumer(consumer, batchReceive); + return; + } if (log.isDebugEnabled()) { log.debug("[{}] [{}] Receive message from sub consumer:{}", topic, subscription, consumer.getTopic());