Skip to content

Commit

Permalink
fix flaky unit test (#9262)
Browse files Browse the repository at this point in the history
* fix unit test

* set timeout to 10 sec
  • Loading branch information
315157973 authored and eolivelli committed May 25, 2021
1 parent 4c369c9 commit eba5d11
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -46,6 +48,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -180,43 +183,32 @@ public void testOffloadPolicies() throws Exception {
public void testOffloadPoliciesApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
//wait for server init
Thread.sleep(1000);
pulsarClient.newProducer().topic(topicName).create().close();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
OffloadPolicies offloadPolicies = admin.topics().getOffloadPolicies(topicName);
assertNull(offloadPolicies);
OffloadPolicies offload = new OffloadPolicies();
String path = "fileSystemPath";
offload.setFileSystemProfilePath(path);
admin.topics().setOffloadPolicies(topicName, offload);
for (int i = 0; i < 50; i++) {
if (admin.topics().getOffloadPolicies(topicName) != null) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.topics().getOffloadPolicies(topicName)));
assertEquals(admin.topics().getOffloadPolicies(topicName), offload);
assertEquals(admin.topics().getOffloadPolicies(topicName).getFileSystemProfilePath(), path);
admin.topics().removeOffloadPolicies(topicName);
for (int i = 0; i < 50; i++) {
if (admin.topics().getOffloadPolicies(topicName) == null) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.topics().getOffloadPolicies(topicName)));
assertNull(admin.topics().getOffloadPolicies(topicName));
}

@Test
public void testTopicLevelOffloadPartitioned() throws Exception {
//wait for cache init
Thread.sleep(2000);
testOffload(true);
}

@Test
public void testTopicLevelOffloadNonPartitioned() throws Exception {
//wait for cache init
Thread.sleep(2000);
testOffload(false);
}

Expand All @@ -230,6 +222,8 @@ public void testOffload(boolean isPartitioned) throws Exception {
admin.topics().createNonPartitionedTopic(topicName);
}
pulsarClient.newProducer().topic(topicName).enableBatching(false).create().close();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
//2 namespace level policy should use NullLedgerOffloader by default
if (isPartitioned) {
for (int i = 0; i < partitionNum; i++) {
Expand Down Expand Up @@ -259,12 +253,8 @@ public void testOffload(boolean isPartitioned) throws Exception {

//4 set topic level offload policies
admin.topics().setOffloadPolicies(topicName, offloadPolicies);
for (int i = 0; i < 50; i++) {
if (admin.topics().getOffloadPolicies(topicName) != null) {
break;
}
Thread.sleep(500);
}
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.topics().getOffloadPolicies(topicName)));
//5 name of offload should become "mock"
if (isPartitioned) {
for (int i = 0; i < partitionNum; i++) {
Expand All @@ -289,12 +279,8 @@ public void testOffload(boolean isPartitioned) throws Exception {
doReturn(map).when(pulsar).getLedgerOffloaderMap();

admin.topics().removeOffloadPolicies(topicName);
for (int i = 0; i < 50; i++) {
if (admin.topics().getOffloadPolicies(topicName) == null) {
break;
}
Thread.sleep(500);
}
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.topics().getOffloadPolicies(topicName)));
// topic level offloader should be closed
if (isPartitioned) {
verify(topicOffloader, times(partitionNum)).close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -47,6 +49,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;

public class MaxUnackedMessagesTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -79,20 +82,12 @@ public void testMaxUnackedMessagesOnSubscriptionApi() throws Exception {
assertNull(max);

admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 2048);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName).intValue(), 2048);
admin.topics().removeMaxUnackedMessagesOnSubscription(topicName);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
assertNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName));
}

Expand All @@ -115,13 +110,8 @@ public void testMaxUnackedMessagesOnSubscription() throws Exception {
List<Consumer<?>> consumers = Lists.newArrayList(consumer1, consumer2, consumer3);
waitCacheInit(topicName);
admin.topics().setMaxUnackedMessagesOnSubscription(topicName, unackMsgAllowed);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) {
break;
}
Thread.sleep(100);
}

Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName)));
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

// (1) Produced Messages
Expand Down Expand Up @@ -201,20 +191,12 @@ public void testMaxUnackedMessagesOnConsumerApi() throws Exception {
assertNull(max);

admin.topics().setMaxUnackedMessagesOnConsumer(topicName, 2048);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), 2048);
admin.topics().removeMaxUnackedMessagesOnConsumer(topicName);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) == null) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName));
}

Expand Down Expand Up @@ -260,12 +242,8 @@ public void testMaxUnackedMessagesOnConsumer() throws Exception {
// 3) Set restrictions, so only part of the data can be consumed
waitCacheInit(topicName);
admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) {
break;
}
Thread.sleep(100);
}
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(()
-> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed);
// 4) Start 2 consumer, each consumer can only consume 100 messages
@Cleanup
Expand Down Expand Up @@ -304,18 +282,9 @@ private void startConsumer(Consumer<String> consumer, AtomicInteger consumerCoun
}

private void waitCacheInit(String topicName) throws Exception {
for (int i = 0; i < 50; i++) {
//wait for server init
Thread.sleep(1000);
try {
admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
break;
} catch (Exception e) {
//ignore
}
if (i == 49) {
throw new RuntimeException("Waiting for cache initialization has timed out");
}
}
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
TopicName topic = TopicName.get(topicName);
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(topic));
}
}

0 comments on commit eba5d11

Please sign in to comment.