From efbdf1a7942372ed4967ad7a42ef31b1e920e3c3 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Thu, 24 Nov 2022 11:14:46 +0800 Subject: [PATCH] [improve][test] Optimize TransactionEndToEndTest (#18522) ## Motivation 1. fix flaky test https://github.com/apache/pulsar/issues/18466 caused by txn async send method 2. decrease run time by optimizing receive method ## Modification 1. fix flaky test * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` This also can be resolved by https://github.com/apache/pulsar/pull/17836 and https://github.com/apache/pulsar/pull/18486 later. 2. decrease run time by optimizing receive method * modify ` Message message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNull(message);` to ` Message message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message);` * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);` * keep other `consumer.receive(x, y)` no change. --- .../client/impl/TransactionEndToEndTest.java | 75 +++++++++---------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index af18c2ed5c1736..663c1c50ce79c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -146,15 +146,15 @@ private void produceCommitTest(boolean enableBatch) throws Exception { int messageCnt = 1000; for (int i = 0; i < messageCnt; i++) { if (i % 5 == 0) { - producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync(); + producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send(); } else { - producer.newMessage(txn2).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync(); + producer.newMessage(txn2).value(("Hello Txn - " + i).getBytes(UTF_8)).send(); } txnMessageCnt++; } // Can't receive transaction messages before commit. - Message message = consumer.receive(5, TimeUnit.SECONDS); + Message message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); txn1.commit().get(); @@ -162,16 +162,13 @@ private void produceCommitTest(boolean enableBatch) throws Exception { int receiveCnt = 0; for (int i = 0; i < txnMessageCnt; i++) { - message = consumer.receive(); + message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNotNull(message); receiveCnt ++; } Assert.assertEquals(txnMessageCnt, receiveCnt); - message = consumer.receive(5, TimeUnit.SECONDS); - Assert.assertNull(message); - - message = consumer.receive(5, TimeUnit.SECONDS); + message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); // cleanup. @@ -208,13 +205,13 @@ public void produceAbortTest() throws Exception { Awaitility.await().until(consumer::isConnected); // Can't receive transaction messages before abort. - Message message = consumer.receive(2, TimeUnit.SECONDS); + Message message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); txn.abort().get(); // Cant't receive transaction messages after abort. - message = consumer.receive(2, TimeUnit.SECONDS); + message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); Awaitility.await().until(() -> { boolean flag = true; @@ -302,7 +299,7 @@ private void testAckWithTransactionReduceUnAckMessageCount(boolean enableBatch) Transaction txn = getTxn(); for (int i = 0; i < messageCount / 2; i++) { - Message message = consumer.receive(); + Message message = consumer.receive(5, TimeUnit.SECONDS); consumer.acknowledgeAsync(message.getMessageId(), txn).get(); } @@ -382,14 +379,14 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, // consume and ack messages with txn for (int i = 0; i < messageCnt; i++) { - Message message = consumer.receive(); + Message message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNotNull(message); log.info("receive msgId: {}, count : {}", message.getMessageId(), i); consumer.acknowledgeAsync(message.getMessageId(), txn).get(); } // the messages are pending ack state and can't be received - Message message = consumer.receive(2, TimeUnit.SECONDS); + Message message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); // 1) txn abort @@ -408,7 +405,7 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, commitTxn.commit().get(); // after transaction commit, the messages can't be received - message = consumer.receive(2, TimeUnit.SECONDS); + message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); Field field = TransactionImpl.class.getDeclaredField("state"); @@ -445,7 +442,7 @@ public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception { .topic(topicTwo).subscriptionName(sub).subscribe(); String content = "test"; producer.send(content); - assertEquals(consumer.receive().getValue(), content); + assertEquals(consumer.receive(3, TimeUnit.SECONDS).getValue(), content); // cleanup. producer.close(); @@ -484,7 +481,7 @@ public void txnMessageAckTest() throws Exception { log.info("produce transaction messages finished"); // Can't receive transaction messages before commit. - Message message = consumer.receive(2, TimeUnit.SECONDS); + Message message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); log.info("transaction messages can't be received before transaction committed"); @@ -493,7 +490,7 @@ public void txnMessageAckTest() throws Exception { int ackedMessageCount = 0; int receiveCnt = 0; for (int i = 0; i < messageCnt; i++) { - message = consumer.receive(); + message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNotNull(message); receiveCnt ++; if (i % 2 == 0) { @@ -503,7 +500,7 @@ public void txnMessageAckTest() throws Exception { } Assert.assertEquals(messageCnt, receiveCnt); - message = consumer.receive(2, TimeUnit.SECONDS); + message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); String checkTopic = TopicName.get(topic).getPartition(0).toString(); @@ -522,7 +519,7 @@ public void txnMessageAckTest() throws Exception { } Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt); - message = consumer.receive(2, TimeUnit.SECONDS); + message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); topic = TopicName.get(topic).getPartition(0).toString(); @@ -638,7 +635,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri } // the messages are pending ack state and can't be received - message = consumer.receive(2, TimeUnit.SECONDS); + message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); abortTxn.abort().get(); @@ -667,7 +664,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException); } - message = consumer.receive(1, TimeUnit.SECONDS); + message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message); } @@ -729,7 +726,7 @@ public void txnMetadataHandlerRecoverTest() throws Exception { Awaitility.await().until(consumer::isConnected); for (int i = 0; i < txnCnt * messageCnt; i++) { - Message message = consumer.receive(); + Message message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNotNull(message); } @@ -938,14 +935,14 @@ public void transactionTimeoutTest() throws Exception { .withTransactionTimeout(3, TimeUnit.SECONDS) .build().get(); - Message message = consumer.receive(); + Message message = consumer.receive(5, TimeUnit.SECONDS); consumer.acknowledgeAsync(message.getMessageId(), consumeTimeoutTxn).get(); - Message reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS); + Message reReceiveMessage = consumer.receive(300, TimeUnit.MILLISECONDS); assertNull(reReceiveMessage); - reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS); + reReceiveMessage = consumer.receive(5, TimeUnit.SECONDS); assertEquals(reReceiveMessage.getValue(), message.getValue()); @@ -992,9 +989,9 @@ public void txnTransactionRedeliverNullDispatcher(CommandAck.AckType ackType) th } Transaction txn = getTxn(); if (ackType == CommandAck.AckType.Individual) { - consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn); + consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), txn); } else { - consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), txn); + consumer.acknowledgeCumulativeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), txn); } topic = TopicName.get(topic).toString(); boolean exist = false; @@ -1117,7 +1114,7 @@ public void testTxnTimeOutInClient() throws Exception{ .InvalidTxnStatusException); } try { - Message message = consumer.receive(); + Message message = consumer.receive(5, TimeUnit.SECONDS); consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); Assert.fail(); } catch (Exception e) { @@ -1159,7 +1156,7 @@ public void testCumulativeAckRedeliverMessages() throws Exception { Message message = null; for (int i = 0; i < transactionCumulativeAck; i++) { - message = consumer.receive(); + message = consumer.receive(5, TimeUnit.SECONDS); } // receive transaction in order @@ -1182,7 +1179,7 @@ public void testCumulativeAckRedeliverMessages() throws Exception { // receive the rest of the message for (int i = 0; i < count; i++) { - message = consumer.receive(); + message = consumer.receive(5, TimeUnit.SECONDS); } Transaction commitTransaction = getTxn(); @@ -1195,7 +1192,7 @@ public void testCumulativeAckRedeliverMessages() throws Exception { commitTransaction.commit().get(); // then redeliver will not receive any message - message = consumer.receive(3, TimeUnit.SECONDS); + message = consumer.receive(300, TimeUnit.MILLISECONDS); assertNull(message); // cleanup. @@ -1270,7 +1267,7 @@ public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exce // receive the batch messages add to a list for (int i = 0; i < 5; i++) { - messageIds.add(consumer.receive().getMessageId()); + messageIds.add(consumer.receive(5, TimeUnit.SECONDS).getMessageId()); } MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0); @@ -1330,7 +1327,7 @@ public void testSendTxnAckMessageToDLQ() throws Exception { .build().get(); // consumer receive the message the first time, redeliverCount = 0 - consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get(); + consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), transaction).get(); transaction.abort().get(); @@ -1338,13 +1335,13 @@ public void testSendTxnAckMessageToDLQ() throws Exception { .build().get(); // consumer receive the message the second time, redeliverCount = 1, also can be received - consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get(); + consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), transaction).get(); transaction.abort().get(); // consumer receive the message the third time, redeliverCount = 2, // the message will be sent to DLQ, can't receive - assertNull(consumer.receive(3, TimeUnit.SECONDS)); + assertNull(consumer.receive(300, TimeUnit.MILLISECONDS)); assertEquals(((ConsumerImpl) consumer).getAvailablePermits(), 3); @@ -1394,7 +1391,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception { Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES) .build().get(); - Message message = consumer.receive(); + Message message = consumer.receive(5, TimeUnit.SECONDS); assertEquals(value1, new String(message.getValue())); // consumer receive the batch message one the first time, redeliverCount = 0 consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); @@ -1404,7 +1401,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception { // consumer will receive the batch message two and then receive // the message one and message two again, redeliverCount = 1 for (int i = 0; i < 3; i ++) { - message = consumer.receive(); + message = consumer.receive(5, TimeUnit.SECONDS); } transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES) @@ -1418,7 +1415,7 @@ public void testSendTxnAckBatchMessageToDLQ() throws Exception { // consumer receive the batch message the third time, redeliverCount = 2, // the message will be sent to DLQ, can't receive - assertNull(consumer.receive(3, TimeUnit.SECONDS)); + assertNull(consumer.receive(300, TimeUnit.MILLISECONDS)); assertEquals(((ConsumerImpl) consumer).getAvailablePermits(), 6); @@ -1473,7 +1470,7 @@ public void testDelayedTransactionMessages() throws Exception { // Failover consumer will receive the messages immediately while // the shared consumer will get them after the delay - Message msg = sharedConsumer.receive(1, TimeUnit.SECONDS); + Message msg = sharedConsumer.receive(300, TimeUnit.MILLISECONDS); assertNull(msg); for (int i = 0; i < 10; i++) {