Skip to content

Commit

Permalink
[fix][test] Fix flaky test: PersistentStreamingDispatcherBlockConsume…
Browse files Browse the repository at this point in the history
…rTest.testBrokerDispatchBlockAndSubAckBackRequiredMsgs (#17161)
  • Loading branch information
nicoloboschi authored Aug 31, 2022
1 parent dab0d1f commit d4beae7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* DispatcherBlockConsumerTest with {@link StreamingDispatcher}
*/
@Test(groups = "flaky")
@Test(groups = "broker")
public class PersistentStreamingDispatcherBlockConsumerTest extends DispatcherBlockConsumerTest {

@BeforeMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand All @@ -64,7 +65,7 @@
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Test(groups = "flaky")
@Test(groups = "broker")
public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class);

Expand Down Expand Up @@ -998,6 +999,11 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {
* maxUnAckPerDispatcher limit
**/
consumer1Sub2 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
// in the scope of this test is important to not group acks because
// the ack for messages happens after receiving a new message from the topic.
// If the acks are sent in batch it may happen that the dispatcher is still blocked until
// it receives the batched acks and the Consumer#receive will return null.
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionName(subscriberName2).receiverQueueSize(receiverQueueSize)
.subscriptionType(SubscriptionType.Shared).subscribe();
Set<MessageId> messages2 = Sets.newHashSet();
Expand All @@ -1014,15 +1020,15 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {
assertEquals(blockedDispatchers.size(), 2);

// (2.c) Now subscriber-2 is blocked: so acking back should unblock dispatcher
// Ack a sublist of unacked messages. The acked messages must unblock the dispatcher and they must be
// more than maxUnAckPerDispatcher's half
Iterator<MessageId> itrMsgs = messages2.iterator();
int additionalMsgConsumedAfterBlocked = messages2.size() - maxUnAckPerDispatcher + 1; // eg. 25 -20 = 5
for (int i = 0; i < (additionalMsgConsumedAfterBlocked + (maxUnAckPerDispatcher / 2)); i++) {
consumer1Sub2.acknowledge(itrMsgs.next());
}
// let ack completed
Thread.sleep(1000);
// verify subscriber2 is unblocked and ready to consume more messages
assertEquals(blockedDispatchers.size(), 1);
Awaitility.await().untilAsserted(() -> assertEquals(blockedDispatchers.size(), 1));
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer1Sub2.receive(200, TimeUnit.MILLISECONDS);
if (msg != null) {
Expand Down

0 comments on commit d4beae7

Please sign in to comment.