Skip to content

Commit

Permalink
fix some test
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 committed Nov 20, 2024
1 parent e6f4e9a commit ddc7def
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoc
}

private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
// broker side epoch is smaller than consumer epoch, so don't need to handle this redeliver request
// broker side epoch is bigger than consumer epoch, so don't need to handle this redeliver request
if (consumerEpoch < consumer.getConsumerEpoch()) {
log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since broker epoch [{}] is smaller than "
+ "consumer epoch [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,13 @@ public void testRedeliveryBrokerIgnoreSmallerEpoch() throws Exception{
"persistent://public/default/testRedeliveryBrokerAbortSmallerEpoch", false).get().get();
Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher()
.getConsumers().get(0).getConsumerEpoch() == 1);
consumer.setConsumerEpoch(0);
consumer.setConsumerEpoch(-1);
producer.send("Hello Pulsar!");

// ignore this redeliver request
consumer.redeliverUnacknowledgedMessages();
consumer.receive();
assertEquals(consumer.getConsumerEpoch(), 0);
assertEquals(persistentTopic.getSubscription(subName).getDispatcher()
.getConsumers().get(0).getConsumerEpoch(), 1);
}
Expand All @@ -377,8 +378,7 @@ public void testRedeliveryCommandDontCheckClientConnectionState() throws Excepti
producer.send("Hello Pulsar!");
consumer.receive();
consumer.redeliverUnacknowledgedMessages();
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(
"persistent://public/default/testRedeliveryCommandDontCheckClientConnectionState",
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).get().get();
Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher()
.getConsumers().get(0).getConsumerEpoch() == 1);
Expand Down

0 comments on commit ddc7def

Please sign in to comment.