diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java index c0259ae375c47..a3d319ca0c591 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java @@ -26,7 +26,7 @@ /** * DispatcherBlockConsumerTest with {@link StreamingDispatcher} */ -@Test(groups = "flaky") +@Test(groups = "broker") public class PersistentStreamingDispatcherBlockConsumerTest extends DispatcherBlockConsumerTest { @BeforeMethod diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index 1a56d689799be..7696afdeb2885 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -56,6 +56,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -64,7 +65,7 @@ import org.testng.annotations.Test; import org.testng.collections.Lists; -@Test(groups = "flaky") +@Test(groups = "broker") public class DispatcherBlockConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class); @@ -998,6 +999,11 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() { * maxUnAckPerDispatcher limit **/ consumer1Sub2 = (ConsumerImpl) pulsarClient.newConsumer().topic(topicName) + // in the scope of this test is important to not group acks because + // the ack for messages happens after receiving a new message from the topic. + // If the acks are sent in batch it may happen that the dispatcher is still blocked until + // it receives the batched acks and the Consumer#receive will return null. + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) .subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize) .subscriptionType(SubscriptionType.Shared).subscribe(); Set messages2 = Sets.newHashSet(); @@ -1014,15 +1020,15 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() { assertEquals(blockedDispatchers.size(), 2); // (2.c) Now subscriber-2 is blocked: so acking back should unblock dispatcher + // Ack a sublist of unacked messages. The acked messages must unblock the dispatcher and they must be + // more than maxUnAckPerDispatcher's half Iterator itrMsgs = messages2.iterator(); int additionalMsgConsumedAfterBlocked = messages2.size() - maxUnAckPerDispatcher + 1; // eg. 25 -20 = 5 for (int i = 0; i < (additionalMsgConsumedAfterBlocked + (maxUnAckPerDispatcher / 2)); i++) { consumer1Sub2.acknowledge(itrMsgs.next()); } - // let ack completed - Thread.sleep(1000); // verify subscriber2 is unblocked and ready to consume more messages - assertEquals(blockedDispatchers.size(), 1); + Awaitility.await().untilAsserted(() -> assertEquals(blockedDispatchers.size(), 1)); for (int j = 0; j < totalProducedMsgs; j++) { msg = consumer1Sub2.receive(200, TimeUnit.MILLISECONDS); if (msg != null) {