Skip to content

Commit

Permalink
[test]Fix Flaky-test: BrokerServiceTest.testLookupThrottlingForClient…
Browse files Browse the repository at this point in the history
…ByClient (#16540)
  • Loading branch information
AnonHxy authored Aug 10, 2022
1 parent e5d5fbe commit e826d84
Showing 1 changed file with 59 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand Down Expand Up @@ -88,10 +89,13 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -966,8 +970,6 @@ public void testTlsAuthUseTrustCert() throws Exception {
*/
@Test
public void testLookupThrottlingForClientByClient() throws Exception {
// This test looks like it could be flakey, if the broker responds
// quickly enough, there may never be concurrency in requests
final String topicName = "persistent://prop/ns-abc/newTopic";

PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
Expand All @@ -979,7 +981,30 @@ public void testLookupThrottlingForClientByClient() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
long reqId = 0xdeadbeef;
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
// Using an AtomicReference in order to reset a new CountDownLatch
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
latchRef.set(new CountDownLatch(1));
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop, () -> new ClientCnx(conf, eventLoop) {
@Override
protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) {
try {
latchRef.get().await();
} catch (InterruptedException e) {
// ignore
}
super.handleLookupResponse(lookupResult);
}

@Override
protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse lookupResult) {
try {
latchRef.get().await();
} catch (InterruptedException e) {
// ignore
}
super.handlePartitionResponse(lookupResult);
}
})) {
// for PMR
// 2 lookup will succeed
long reqId1 = reqId++;
Expand All @@ -990,12 +1015,18 @@ public void testLookupThrottlingForClientByClient() throws Exception {
long reqId2 = reqId++;
ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2);
CompletableFuture<?> f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request2, reqId2));
.thenCompose(clientCnx -> {
CompletableFuture<?> future = clientCnx.newLookup(request2, reqId2);
// pending other responses in `ClientCnx` until now
latchRef.get().countDown();
return future;
});

f1.get();
f2.get();

// 3 lookup will fail
latchRef.set(new CountDownLatch(1));
long reqId3 = reqId++;
ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3);
f1 = pool.getConnection(resolver.resolveHost())
Expand All @@ -1009,11 +1040,16 @@ public void testLookupThrottlingForClientByClient() throws Exception {
long reqId5 = reqId++;
ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5);
CompletableFuture<?> f3 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request5, reqId5));
.thenCompose(clientCnx -> {
CompletableFuture<?> future = clientCnx.newLookup(request5, reqId5);
// pending other responses in `ClientCnx` until now
latchRef.get().countDown();
return future;
});

f1.get();
f2.get();
try {
f1.get();
f2.get();
f3.get();
fail("At least one should fail");
} catch (ExecutionException e) {
Expand All @@ -1029,6 +1065,7 @@ public void testLookupThrottlingForClientByClient() throws Exception {

// for Lookup
// 2 lookup will succeed
latchRef.set(new CountDownLatch(1));
long reqId6 = reqId++;
ByteBuf request6 = Commands.newLookup(topicName, true, reqId6);
f1 = pool.getConnection(resolver.resolveHost())
Expand All @@ -1037,12 +1074,18 @@ public void testLookupThrottlingForClientByClient() throws Exception {
long reqId7 = reqId++;
ByteBuf request7 = Commands.newLookup(topicName, true, reqId7);
f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request7, reqId7));
.thenCompose(clientCnx -> {
CompletableFuture<?> future = clientCnx.newLookup(request7, reqId7);
// pending other responses in `ClientCnx` until now
latchRef.get().countDown();
return future;
});

f1.get();
f2.get();

// 3 lookup will fail
latchRef.set(new CountDownLatch(1));
long reqId8 = reqId++;
ByteBuf request8 = Commands.newLookup(topicName, true, reqId8);
f1 = pool.getConnection(resolver.resolveHost())
Expand All @@ -1056,11 +1099,16 @@ public void testLookupThrottlingForClientByClient() throws Exception {
long reqId10 = reqId++;
ByteBuf request10 = Commands.newLookup(topicName, true, reqId10);
f3 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request10, reqId10));
.thenCompose(clientCnx -> {
CompletableFuture<?> future = clientCnx.newLookup(request10, reqId10);
// pending other responses in `ClientCnx` until now
latchRef.get().countDown();
return future;
});

f1.get();
f2.get();
try {
f1.get();
f2.get();
f3.get();
fail("At least one should fail");
} catch (ExecutionException e) {
Expand All @@ -1073,7 +1121,6 @@ public void testLookupThrottlingForClientByClient() throws Exception {
throw e;
}
}

}
}

Expand Down

0 comments on commit e826d84

Please sign in to comment.