From eba5d11ca6e58f45cdb8e902e1fd3b44e8ede13d Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Sat, 23 Jan 2021 03:47:09 +0800 Subject: [PATCH] fix flaky unit test (#9262) * fix unit test * set timeout to 10 sec --- .../broker/admin/AdminApiOffloadTest.java | 46 +++++-------- .../broker/admin/MaxUnackedMessagesTest.java | 69 +++++-------------- 2 files changed, 35 insertions(+), 80 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 6644d8c957ce5..92ade3769c643 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -32,6 +32,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -46,6 +48,7 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -180,43 +183,32 @@ public void testOffloadPolicies() throws Exception { public void testOffloadPoliciesApi() throws Exception { final String topicName = testTopic + UUID.randomUUID().toString(); admin.topics().createPartitionedTopic(topicName, 3); - //wait for server init - Thread.sleep(1000); + pulsarClient.newProducer().topic(topicName).create().close(); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName))); OffloadPolicies offloadPolicies = admin.topics().getOffloadPolicies(topicName); assertNull(offloadPolicies); OffloadPolicies offload = new OffloadPolicies(); String path = "fileSystemPath"; offload.setFileSystemProfilePath(path); admin.topics().setOffloadPolicies(topicName, offload); - for (int i = 0; i < 50; i++) { - if (admin.topics().getOffloadPolicies(topicName) != null) { - break; - } - Thread.sleep(100); - } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNotNull(admin.topics().getOffloadPolicies(topicName))); assertEquals(admin.topics().getOffloadPolicies(topicName), offload); assertEquals(admin.topics().getOffloadPolicies(topicName).getFileSystemProfilePath(), path); admin.topics().removeOffloadPolicies(topicName); - for (int i = 0; i < 50; i++) { - if (admin.topics().getOffloadPolicies(topicName) == null) { - break; - } - Thread.sleep(100); - } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNull(admin.topics().getOffloadPolicies(topicName))); assertNull(admin.topics().getOffloadPolicies(topicName)); } @Test public void testTopicLevelOffloadPartitioned() throws Exception { - //wait for cache init - Thread.sleep(2000); testOffload(true); } @Test public void testTopicLevelOffloadNonPartitioned() throws Exception { - //wait for cache init - Thread.sleep(2000); testOffload(false); } @@ -230,6 +222,8 @@ public void testOffload(boolean isPartitioned) throws Exception { admin.topics().createNonPartitionedTopic(topicName); } pulsarClient.newProducer().topic(topicName).enableBatching(false).create().close(); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName))); //2 namespace level policy should use NullLedgerOffloader by default if (isPartitioned) { for (int i = 0; i < partitionNum; i++) { @@ -259,12 +253,8 @@ public void testOffload(boolean isPartitioned) throws Exception { //4 set topic level offload policies admin.topics().setOffloadPolicies(topicName, offloadPolicies); - for (int i = 0; i < 50; i++) { - if (admin.topics().getOffloadPolicies(topicName) != null) { - break; - } - Thread.sleep(500); - } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNotNull(admin.topics().getOffloadPolicies(topicName))); //5 name of offload should become "mock" if (isPartitioned) { for (int i = 0; i < partitionNum; i++) { @@ -289,12 +279,8 @@ public void testOffload(boolean isPartitioned) throws Exception { doReturn(map).when(pulsar).getLedgerOffloaderMap(); admin.topics().removeOffloadPolicies(topicName); - for (int i = 0; i < 50; i++) { - if (admin.topics().getOffloadPolicies(topicName) == null) { - break; - } - Thread.sleep(500); - } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNull(admin.topics().getOffloadPolicies(topicName))); // topic level offloader should be closed if (isPartitioned) { verify(topicOffloader, times(partitionNum)).close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java index 77a9ffd088377..61892c500e3cf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java @@ -39,6 +39,8 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -47,6 +49,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; public class MaxUnackedMessagesTest extends ProducerConsumerBase { @@ -79,20 +82,12 @@ public void testMaxUnackedMessagesOnSubscriptionApi() throws Exception { assertNull(max); admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 2048); - for (int i = 0; i < 50; i++) { - if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) { - break; - } - Thread.sleep(100); - } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName))); assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName).intValue(), 2048); admin.topics().removeMaxUnackedMessagesOnSubscription(topicName); - for (int i = 0; i < 50; i++) { - if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null) { - break; - } - Thread.sleep(100); - } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName))); assertNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName)); } @@ -115,13 +110,8 @@ public void testMaxUnackedMessagesOnSubscription() throws Exception { List> consumers = Lists.newArrayList(consumer1, consumer2, consumer3); waitCacheInit(topicName); admin.topics().setMaxUnackedMessagesOnSubscription(topicName, unackMsgAllowed); - for (int i = 0; i < 50; i++) { - if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) { - break; - } - Thread.sleep(100); - } - + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName))); Producer producer = pulsarClient.newProducer().topic(topicName).create(); // (1) Produced Messages @@ -201,20 +191,12 @@ public void testMaxUnackedMessagesOnConsumerApi() throws Exception { assertNull(max); admin.topics().setMaxUnackedMessagesOnConsumer(topicName, 2048); - for (int i = 0; i < 50; i++) { - if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) { - break; - } - Thread.sleep(100); - } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName))); assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), 2048); admin.topics().removeMaxUnackedMessagesOnConsumer(topicName); - for (int i = 0; i < 50; i++) { - if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) == null) { - break; - } - Thread.sleep(100); - } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName))); assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)); } @@ -260,12 +242,8 @@ public void testMaxUnackedMessagesOnConsumer() throws Exception { // 3) Set restrictions, so only part of the data can be consumed waitCacheInit(topicName); admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed); - for (int i = 0; i < 50; i++) { - if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) { - break; - } - Thread.sleep(100); - } + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() + -> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName))); assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed); // 4) Start 2 consumer, each consumer can only consume 100 messages @Cleanup @@ -304,18 +282,9 @@ private void startConsumer(Consumer consumer, AtomicInteger consumerCoun } private void waitCacheInit(String topicName) throws Exception { - for (int i = 0; i < 50; i++) { - //wait for server init - Thread.sleep(1000); - try { - admin.topics().getMaxUnackedMessagesOnSubscription(topicName); - break; - } catch (Exception e) { - //ignore - } - if (i == 49) { - throw new RuntimeException("Waiting for cache initialization has timed out"); - } - } + pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close(); + TopicName topic = TopicName.get(topicName); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(topic)); } }