diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 4861117ef6ff5..1b63aa14dfe42 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -20,18 +20,26 @@ import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.util.Optional; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.BinaryProtoLookupService; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.testng.Assert; @@ -112,4 +120,32 @@ public void testLookup() throws Exception { Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(), 5.0d); } + + @Test + public void testLookupThrottling() throws Exception { + PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() + .serviceUrl(proxyService.getServiceUrl()).build(); + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + LookupService lookupService = client.getLookup(); + assertTrue(lookupService instanceof BinaryProtoLookupService); + ClientCnx lookupConnection = client.getCnxPool().getConnection(lookupService.resolveHost()).join(); + + // Make no permits to lookup. + Semaphore lookupSemaphore = proxyService.getLookupRequestSemaphore(); + int availablePermits = lookupSemaphore.availablePermits(); + lookupSemaphore.acquire(availablePermits); + + // Verify will receive too many request exception, and the socket will not be closed. + try { + lookupService.getBroker(TopicName.get(tpName)).get(); + fail("Expected too many request error."); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Too many")); + } + assertTrue(lookupConnection.ctx().channel().isActive()); + + // cleanup. + lookupSemaphore.release(availablePermits); + client.close(); + } }