From ddc7def5e9519ca87fc4230d758dd5ec064caea6 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Wed, 20 Nov 2024 17:28:38 +0800 Subject: [PATCH] fix some test --- .../PersistentDispatcherSingleActiveConsumer.java | 2 +- .../apache/pulsar/client/impl/MessageRedeliveryTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index f34f91220ec59..f62c96c4d1feb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -279,7 +279,7 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoc } private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) { - // broker side epoch is smaller than consumer epoch, so don't need to handle this redeliver request + // broker side epoch is bigger than consumer epoch, so don't need to handle this redeliver request if (consumerEpoch < consumer.getConsumerEpoch()) { log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since broker epoch [{}] is smaller than " + "consumer epoch [{}]", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index c0953e19b68c0..be38f0e7bbb9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -345,12 +345,13 @@ public void testRedeliveryBrokerIgnoreSmallerEpoch() throws Exception{ "persistent://public/default/testRedeliveryBrokerAbortSmallerEpoch", false).get().get(); Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher() .getConsumers().get(0).getConsumerEpoch() == 1); - consumer.setConsumerEpoch(0); + consumer.setConsumerEpoch(-1); producer.send("Hello Pulsar!"); // ignore this redeliver request consumer.redeliverUnacknowledgedMessages(); consumer.receive(); + assertEquals(consumer.getConsumerEpoch(), 0); assertEquals(persistentTopic.getSubscription(subName).getDispatcher() .getConsumers().get(0).getConsumerEpoch(), 1); } @@ -377,8 +378,7 @@ public void testRedeliveryCommandDontCheckClientConnectionState() throws Excepti producer.send("Hello Pulsar!"); consumer.receive(); consumer.redeliverUnacknowledgedMessages(); - PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic( - "persistent://public/default/testRedeliveryCommandDontCheckClientConnectionState", + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get(); Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher() .getConsumers().get(0).getConsumerEpoch() == 1);