From c691c75f85972611c734f634b5b6df9fa7eef405 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 21 Sep 2023 23:18:45 +0800 Subject: [PATCH] [improve] [proxy] Not close the socket if lookup failed caused by too many requests (#21216) Motivation: The Pulsar client will close the socket if it receives a `ServiceNotReady` error when doing a lookup. The Broker will respond to the client with a `TooManyRequests` error if there are too many lookup requests in progress, but the Pulsar Proxy responds to the client with a `ServiceNotReady` error in the same scenario. Modifications: Make Pulsar Proxy respond to the client with a `TooManyRequests` error if there are too many lookup requests in progress. (cherry picked from commit d6c3fa42059d96b04b4132ccc9256c3e76d26959) (cherry picked from commit 9a7c4bbce69502d91f6ca5249a617471decd501f) --- .../server/ProxyLookupThrottlingTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 4861117ef6ff5..1b63aa14dfe42 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -20,18 +20,26 @@ import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.util.Optional; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.BinaryProtoLookupService; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.testng.Assert; @@ -112,4 +120,32 @@ public void testLookup() throws Exception { Assert.assertEquals(LookupProxyHandler.REJECTED_PARTITIONS_METADATA_REQUESTS.get(), 5.0d); } + + @Test + public void testLookupThrottling() throws Exception { + PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() + .serviceUrl(proxyService.getServiceUrl()).build(); + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + LookupService lookupService = client.getLookup(); + assertTrue(lookupService instanceof BinaryProtoLookupService); + ClientCnx lookupConnection = client.getCnxPool().getConnection(lookupService.resolveHost()).join(); + + // Make no permits to lookup. + Semaphore lookupSemaphore = proxyService.getLookupRequestSemaphore(); + int availablePermits = lookupSemaphore.availablePermits(); + lookupSemaphore.acquire(availablePermits); + + // Verify will receive too many request exception, and the socket will not be closed. + try { + lookupService.getBroker(TopicName.get(tpName)).get(); + fail("Expected too many request error."); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Too many")); + } + assertTrue(lookupConnection.ctx().channel().isActive()); + + // cleanup. + lookupSemaphore.release(availablePermits); + client.close(); + } }