Skip to content

Commit

Permalink
[Proxy] Prevent leak of unreleased lookupRequestSemaphore permits (ap…
Browse files Browse the repository at this point in the history
…ache#13812)

* [Proxy] Prevent leak of unreleased lookupRequestSemaphore permits

- should release permit in try-finally block

* Cleanup code in LookupProxyHandler

(cherry picked from commit 85b62e0)
  • Loading branch information
lhotari committed May 4, 2022
1 parent 85f0767 commit 50be626
Showing 1 changed file with 39 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
*/
package org.apache.pulsar.proxy.server;

import static org.apache.commons.lang3.StringUtils.isBlank;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;

import java.util.concurrent.Semaphore;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.protocol.Commands;
Expand All @@ -47,8 +46,8 @@

public class LookupProxyHandler {
private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
private final ProxyService service;
private final ProxyConnection proxyConnection;
private final BrokerDiscoveryProvider discoveryProvider;
private final boolean connectWithTLS;

private SocketAddress clientAddress;
Expand Down Expand Up @@ -83,9 +82,11 @@ public class LookupProxyHandler {
.build("pulsar_proxy_rejected_get_topics_of_namespace_requests",
"Counter of getTopicsOfNamespace requests rejected due to throttling")
.create().register();
private final Semaphore lookupRequestSemaphore;

public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) {
this.service = proxy;
this.discoveryProvider = proxy.getDiscoveryProvider();
this.lookupRequestSemaphore = proxy.getLookupRequestSemaphore();
this.proxyConnection = proxyConnection;
this.clientAddress = proxyConnection.clientAddress();
this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
Expand All @@ -98,28 +99,16 @@ public void handleLookup(CommandLookupTopic lookup) {
log.debug("Received Lookup from {}", clientAddress);
}
long clientRequestId = lookup.getRequestId();
if (this.service.getLookupRequestSemaphore().tryAcquire()) {
lookupRequests.inc();
String topic = lookup.getTopic();
String serviceUrl;
if (isBlank(brokerServiceURL)) {
ServiceLookupData availableBroker = null;
try {
availableBroker = service.getDiscoveryProvider().nextBroker();
} catch (Exception e) {
log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
e.getMessage(), clientRequestId));
return;
if (lookupRequestSemaphore.tryAcquire()) {
try {
lookupRequests.inc();
String serviceUrl = getBrokerServiceUrl(clientRequestId);
if (serviceUrl != null) {
performLookup(clientRequestId, lookup.getTopic(), serviceUrl, false, 10);
}
serviceUrl = this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls()
: availableBroker.getPulsarServiceUrl();
} else {
serviceUrl = this.connectWithTLS ? service.getConfiguration().getBrokerServiceURLTLS()
: service.getConfiguration().getBrokerServiceURL();
} finally {
lookupRequestSemaphore.release();
}
performLookup(clientRequestId, topic, serviceUrl, false, 10);
this.service.getLookupRequestSemaphore().release();
} else {
rejectedLookupRequests.inc();
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -203,9 +192,12 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part
log.debug("[{}] Received PartitionMetadataLookup", clientAddress);
}
final long clientRequestId = partitionMetadata.getRequestId();
if (this.service.getLookupRequestSemaphore().tryAcquire()) {
handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
this.service.getLookupRequestSemaphore().release();
if (lookupRequestSemaphore.tryAcquire()) {
try {
handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
} finally {
lookupRequestSemaphore.release();
}
} else {
rejectedPartitionsMetadataRequests.inc();
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -273,9 +265,12 @@ public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTop

final long requestId = commandGetTopicsOfNamespace.getRequestId();

if (this.service.getLookupRequestSemaphore().tryAcquire()) {
handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
this.service.getLookupRequestSemaphore().release();
if (lookupRequestSemaphore.tryAcquire()) {
try {
handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
} finally {
lookupRequestSemaphore.release();
}
} else {
rejectedGetTopicsOfNamespaceRequests.inc();
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -401,27 +396,23 @@ public void handleGetSchema(CommandGetSchema commandGetSchema) {
}

/**
* Get default broker service url or discovery an available broker
* Get default broker service url or discovery an available broker.
**/
private String getBrokerServiceUrl(long clientRequestId) {
if (isBlank(brokerServiceURL)) {
ServiceLookupData availableBroker;
try {
availableBroker = service.getDiscoveryProvider().nextBroker();
} catch (Exception e) {
log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
proxyConnection.ctx().writeAndFlush(Commands.newError(
clientRequestId, ServerError.ServiceNotReady, e.getMessage()
));
return null;
}
return this.connectWithTLS ?
availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
} else {
return this.connectWithTLS ?
service.getConfiguration().getBrokerServiceURLTLS() : service.getConfiguration().getBrokerServiceURL();
if (StringUtils.isNotBlank(brokerServiceURL)) {
return brokerServiceURL;
}

ServiceLookupData availableBroker;
try {
availableBroker = discoveryProvider.nextBroker();
} catch (Exception e) {
log.warn("[{}] Failed to get next active broker {}", clientAddress, e.getMessage(), e);
proxyConnection.ctx().writeAndFlush(Commands.newError(
clientRequestId, ServerError.ServiceNotReady, e.getMessage()
));
return null;
}
return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
}

private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {
Expand Down

0 comments on commit 50be626

Please sign in to comment.