Skip to content

Commit

Permalink
Fix #91 - Intermittent test failure on SimpleProducerConsumerTest.tes…
Browse files Browse the repository at this point in the history
…tSharedConsumerAckDifferentConsumer (#92)
  • Loading branch information
merlimat authored Oct 29, 2016
1 parent 551d42d commit 340dc8d
Showing 1 changed file with 50 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -568,12 +568,12 @@ public void testSendBigMessageSize() throws Exception {
/**
* Usecase 1: Only 1 Active Subscription - 1 subscriber - Produce Messages - EntryCache should cache messages -
* EntryCache should be cleaned : Once active subscription consumes messages
*
*
* Usecase 2: 2 Active Subscriptions (faster and slower) and slower gets closed - 2 subscribers - Produce Messages -
* 1 faster-subscriber consumes all messages and another slower-subscriber none - EntryCache should have cached
* messages as slower-subscriber has not consumed messages yet - close slower-subscriber - EntryCache should be
* cleared
*
*
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -864,19 +864,19 @@ public void testSendCallBack() throws Exception {
assertEquals(message.getBytes().length, msgLength.get());
}
}

/**
* consume message from consumer1 and send acknowledgement from different consumer subscribed under same
* subscription-name
*
*
* @throws Exception
*/
@Test
@Test(timeOut = 30000)
public void testSharedConsumerAckDifferentConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setReceiverQueueSize(5);
conf.setReceiverQueueSize(1);
conf.setSubscriptionType(SubscriptionType.Shared);
Consumer consumer1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);
Expand All @@ -895,15 +895,13 @@ public void testSharedConsumerAckDifferentConsumer() throws Exception {
Set<Message> consumerMsgSet1 = Sets.newHashSet();
Set<Message> consumerMsgSet2 = Sets.newHashSet();
for (int i = 0; i < 5; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
msg = consumer1.receive();
consumerMsgSet1.add(msg);

}
for (int i = 0; i < 5; i++) {
msg = consumer2.receive(1, TimeUnit.SECONDS);
msg = consumer2.receive();
consumerMsgSet2.add(msg);

}

consumerMsgSet1.stream().forEach(m -> {
try {
consumer2.acknowledge(m);
Expand All @@ -923,7 +921,8 @@ public void testSharedConsumerAckDifferentConsumer() throws Exception {
consumer2.redeliverUnacknowledgedMessages();

try {
if (consumer1.receive(1, TimeUnit.SECONDS) != null || consumer2.receive(1, TimeUnit.SECONDS) != null) {
if (consumer1.receive(100, TimeUnit.MILLISECONDS) != null
|| consumer2.receive(100, TimeUnit.MILLISECONDS) != null) {
fail();
}
} finally {
Expand Down Expand Up @@ -961,16 +960,16 @@ private void receiveAsync(Consumer consumer, int totalMessage, int currentMessag
});
}
}

/**
* Verify: Consumer stops receiving msg when reach unack-msg limit and
* Verify: Consumer stops receiving msg when reach unack-msg limit and
* starts receiving once acks messages
* 1. Produce X (600) messages
* 1. Produce X (600) messages
* 2. Consumer has receive size (10) and receive message without acknowledging
* 3. Consumer will stop receiving message after unAckThreshold = 500
* 4. Consumer acks messages and starts consuming remanining messages
* This testcase enables checksum sending while producing message and broker verifies the checksum for the message.
*
* This testcase enables checksum sending while producing message and broker verifies the checksum for the message.
*
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -1048,19 +1047,19 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
}

/**
* Verify: iteration of
* a. message receive w/o acking
* Verify: iteration of
* a. message receive w/o acking
* b. stop receiving msg
* c. ack msgs
* d. started receiving msgs
*
* d. started receiving msgs
*
* 1. Produce total X (1500) messages
* 2. Consumer consumes messages without acking until stop receiving
* 2. Consumer consumes messages without acking until stop receiving
* from broker due to reaching ack-threshold (500)
* 3. Consumer acks messages after stop getting messages
* 3. Consumer acks messages after stop getting messages
* 4. Consumer again tries to consume messages
* 5. Consumer should be able to complete consuming all 1500 messages in 3 iteration (1500/500)
*
*
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -1133,12 +1132,12 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration() throws Ex
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}


/**
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
*
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
Expand All @@ -1163,7 +1162,7 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
"subscriber-1", conf);

ProducerConfiguration producerConf = new ProducerConfiguration();

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
producerConf);

Expand All @@ -1173,7 +1172,7 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
producer.send(message.getBytes());
}


// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message msg = null;
Expand Down Expand Up @@ -1239,7 +1238,7 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}

@Test
public void testUnackBlockRedeliverMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down Expand Up @@ -1310,7 +1309,7 @@ public void testUnackBlockRedeliverMessages() throws Exception {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}

@Test(dataProvider = "batch")
public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
Expand All @@ -1330,7 +1329,7 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
"subscriber-1", conf);

ProducerConfiguration producerConf = new ProducerConfiguration();

if (batchMessageDelayMs != 0) {
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
Expand All @@ -1348,7 +1347,7 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
}

FutureUtil.waitForAll(futures).get();

// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message msg = null;
Expand Down Expand Up @@ -1398,11 +1397,11 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}

/**
* Verify: Consumer2 sends ack of Consumer1 and consumer1 should be unblock if it is blocked due to unack-messages
*
*
*
*
* @param batchMessageDelayMs
* @throws Exception
*/
Expand All @@ -1427,7 +1426,7 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
"subscriber-1", conf);

ProducerConfiguration producerConf = new ProducerConfiguration();

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic",
producerConf);

Expand All @@ -1437,7 +1436,7 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
producer.send(message.getBytes());
}


// (2) Consumer1: consume without ack:
// try to consume messages: but will be able to consume number of messages = maxUnackedMessages
Message msg = null;
Expand All @@ -1452,9 +1451,9 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
break;
}
}

assertEquals(messages.size(), maxUnackedMessages); //consumer1

// (3) ack for all UnackedMessages from consumer2
messages.forEach(m -> {
try {
Expand Down Expand Up @@ -1485,7 +1484,7 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
break;
}
}

// verify total-consumer messages = total-produce messages
assertEquals(totalProducedMsgs, totalReceiveMessages);
producer.close();
Expand Down Expand Up @@ -1537,13 +1536,13 @@ public void testEnabledChecksumClient() throws Exception {
consumer.close();
log.info("-- Exiting {} test --", methodName);
}

/**
* It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets
* blocked due to unacked messsages
*
*
* Usecase: produce message with 10ms interval: so, consumer can consume only 10 messages without acking
*
*
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -1622,14 +1621,14 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause()
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}

/**
* It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets
* blocked due to unacked messsages
*
* Usecase: Consumer starts consuming only after all messages have been produced.
*
* Usecase: Consumer starts consuming only after all messages have been produced.
* So, consumer consumes total receiver-queue-size number messages => ask for redelivery and receives all messages again.
*
*
* @throws Exception
*/
@Test(invocationCount=10)
Expand Down Expand Up @@ -1714,5 +1713,5 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}

}

0 comments on commit 340dc8d

Please sign in to comment.