Skip to content

Commit

Permalink
close client connectionon tooManyRequest and internal-server error (#274
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rdhabalia authored and merlimat committed Mar 11, 2017
1 parent 146738c commit 6d4bd9d
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1030,11 +1030,11 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
clientAppId, dn.toString(), authException.getMessage()));
}
} catch (Exception ex) {
// unknown error marked as internal server error
// throw without wrapping to PulsarClientException that considers: unknown error marked as internal
// server error
log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
dn.toString(), ex.getMessage(), ex);
throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
clientAppId, dn.toString(), ex.getMessage()));
throw ex;
}
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(),
dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ public BrokerService(PulsarService pulsar) throws Exception {
this.backlogQuotaChecker = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
this.authenticationService = new AuthenticationService(pulsar.getConfiguration());

this.dynamicConfigurationCache = new ZooKeeperDataCache<Map<String, String>>(pulsar().getLocalZkCache()) {
@Override
public Map<String, String> deserialize(String key, byte[] content) throws Exception {
Expand Down Expand Up @@ -853,7 +852,7 @@ public Map<String, PersistentTopicStats> getTopicStats() {
public AuthenticationService getAuthenticationService() {
return authenticationService;
}

public List<PersistentTopic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
return multiLayerTopicsMap.get(namespace).get(bundle).values();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,17 @@ protected void validateSuperUserAccess() {
* if not authorized
*/
protected void validateAdminAccessOnProperty(String property) {
validateAdminAccessOnProperty(pulsar(), clientAppId(), property);
try {
validateAdminAccessOnProperty(pulsar(), clientAppId(), property);
} catch (RestException e) {
throw e;
} catch (Exception e) {
log.error("Failed to get property admin data for property");
throw new RestException(e);
}
}

protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) {
protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{
if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property,
(isClientAuthenticated(clientAppId)), clientAppId);
Expand All @@ -178,9 +185,6 @@ protected static void validateAdminAccessOnProperty(PulsarService pulsar, String
} catch (KeeperException.NoNodeException e) {
log.warn("Failed to get property admin data for non existing property {}", property);
throw new RestException(Status.UNAUTHORIZED, "Property does not exist");
} catch (Exception e) {
log.error("Failed to get property admin data for property");
throw new RestException(e);
}

if (!propertyAdmin.getAdminRoles().contains(clientAppId)) {
Expand Down Expand Up @@ -565,20 +569,16 @@ protected void checkConnect(DestinationName destination) throws RestException, E
checkAuthorization(pulsar(), destination, clientAppId());
}

protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role) throws RestException, Exception{
protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role)
throws RestException, Exception {
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
// No enforcing of authorization policies
return;
}
try {
// get zk policy manager
if (!pulsarService.getBrokerService().getAuthorizationManager().canLookup(destination, role)) {
log.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
}
} catch (RestException e) {
// Let it through
throw e;
// get zk policy manager
if (!pulsarService.getBrokerService().getAuthorizationManager().canLookup(destination, role)) {
log.warn("[{}] Role {} is not allowed to lookup topic", destination, role);
throw new RestException(Status.UNAUTHORIZED, "Don't have permission to connect to this namespace");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,5 +728,4 @@ public void testLookupThrottlingForClientByClient() throws Exception {
// ok as throttling set to 0
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand All @@ -43,6 +48,7 @@
import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.namespace.OwnershipCache;
import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Message;
Expand Down Expand Up @@ -75,23 +81,22 @@ protected void setup() throws Exception {
protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider
public Object[][] subType() {
return new Object[][] {{SubscriptionType.Shared}, {SubscriptionType.Failover}};
return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } };
}


/**
* Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle.
*
* <pre>
* 1. after disabling broker fron loadbalancer
* 2. unload namespace-bundle "my-ns1" which disconnects client (producer/consumer) connected on that namespacebundle
* 3. but doesn't close the connection for namesapce-bundle "my-ns2" and clients are still connected
* 4. verifies unloaded "my-ns1" should not connected again with the broker as broker is disabled
* 5. unload "my-ns2" which closes the connection as broker doesn't have any more client connected on that connection
* 6. all namespace-bundles are in "connecting" state and waiting for available broker
*
* </pre>
*
* @throws Exception
*/
Expand All @@ -105,7 +110,8 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception {

final String dn1 = "persistent://" + ns1 + "/my-topic";
final String dn2 = "persistent://" + ns2 + "/my-topic";
ConsumerImpl cons1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration());
ConsumerImpl cons1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name",
new ConsumerConfiguration());
ProducerImpl prod1 = (ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration());
ProducerImpl prod2 = (ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration());
ConsumerImpl consumer1 = spy(cons1);
Expand Down Expand Up @@ -182,7 +188,6 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception {
assertTrue(prod2.getClientCnx() != null);
assertTrue(prod2.getState().equals(State.Ready));


// unload ns-bundle2 as well
pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle2);
verify(producer2, atLeastOnce()).connectionClosed(anyObject());
Expand All @@ -208,10 +213,9 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception {

}


/**
* Verifies: 1. Closing of Broker service unloads all bundle gracefully and there must not be any connected bundles
* after closing broker service
* after closing broker service
*
* @throws Exception
*/
Expand All @@ -225,18 +229,19 @@ public void testCloseBrokerService() throws Exception {

final String dn1 = "persistent://" + ns1 + "/my-topic";
final String dn2 = "persistent://" + ns2 + "/my-topic";

ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name", new ConsumerConfiguration());

ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.subscribe(dn1, "my-subscriber-name",
new ConsumerConfiguration());
ProducerImpl producer1 = (ProducerImpl) pulsarClient.createProducer(dn1, new ProducerConfiguration());
ProducerImpl producer2 = (ProducerImpl) pulsarClient.createProducer(dn2, new ProducerConfiguration());

//unload all other namespace
// unload all other namespace
pulsar.getBrokerService().close();

// [1] OwnershipCache should not contain any more namespaces
OwnershipCache ownershipCache = pulsar.getNamespaceService().getOwnershipCache();
assertTrue(ownershipCache.getOwnedBundles().keySet().isEmpty());

// [2] All clients must be disconnected and in connecting state
// producer1 must not be able to connect again
assertTrue(producer1.getClientCnx() == null);
Expand All @@ -247,11 +252,11 @@ public void testCloseBrokerService() throws Exception {
// producer2 must not be able to connect again
assertTrue(producer2.getClientCnx() == null);
assertTrue(producer2.getState().equals(State.Connecting));

producer1.close();
producer2.close();
consumer1.close();

}

/**
Expand Down Expand Up @@ -449,6 +454,105 @@ public void testResetCursor(SubscriptionType subType) throws Exception {
Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset");
}

/**
* <pre>
* Verifies: that client-cnx gets closed when server gives TooManyRequestException in certain time frame
* 1. Client1: which has set MaxNumberOfRejectedRequestPerConnection=0
* 2. Client2: which has set MaxNumberOfRejectedRequestPerConnection=100
* 3. create multiple producer and make lookup-requests simultaneously
* 4. Client1 receives TooManyLookupException and should close connection
* </pre>
*
* @throws Exception
*/
@Test(timeOut = 5000)
public void testCloseConnectionOnBrokerRejectedRequest() throws Exception {

final PulsarClient pulsarClient;
final PulsarClient pulsarClient2;

final String topicName = "persistent://prop/usw/my-ns/newTopic";

final int concurrentLookupRequests = 20;
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setMaxNumberOfRejectedRequestPerConnection(0);
stopBroker();
pulsar.getConfiguration().setMaxConcurrentLookupRequest(1);
startBroker();
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
pulsarClient = PulsarClient.create(lookupUrl, clientConf);

ClientConfiguration clientConf2 = new ClientConfiguration();
clientConf2.setStatsInterval(0, TimeUnit.SECONDS);
clientConf2.setIoThreads(concurrentLookupRequests);
clientConf2.setConnectionsPerBroker(20);
pulsarClient2 = PulsarClient.create(lookupUrl, clientConf2);

ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());
ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests);
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
pulsarClient2.createProducerAsync(topicName).handle((ok, e) -> {
return null;
});
pulsarClient.createProducerAsync(topicName).handle((ok, e) -> {
return null;
});

});
if (!cnx.channel().isActive()) {
break;
}
if (i % 10 == 0) {
Thread.sleep(100);
}
}
// connection must be closed
assertFalse(cnx.channel().isActive());
pulsarClient.close();
pulsarClient2.close();
}

/**
* It verifies that client closes the connection on internalSerevrError which is "ServiceNotReady" from Broker-side
*
* @throws Exception
*/
@Test(timeOut = 5000)
public void testCloseConnectionOnInternalServerError() throws Exception {

try {
final PulsarClient pulsarClient;

final String topicName = "persistent://prop/usw/my-ns/newTopic";

ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
pulsarClient = PulsarClient.create(lookupUrl, clientConf);

ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
ClientCnx cnx = producer.cnx();
assertTrue(cnx.channel().isActive());
// this will throw NPE at broker while authorizing and it will throw InternalServerError
pulsar.getConfiguration().setAuthorizationEnabled(true);
try {
pulsarClient.createProducer(topicName);
fail("it should have fail with lookup-exception:");
} catch (Exception e) {
// ok
}
// connection must be closed
assertFalse(cnx.channel().isActive());
pulsarClient.close();
} finally {
pulsar.getConfiguration().setAuthorizationEnabled(false);
}
}

private static class TimestampEntryCount {
private final long timestamp;
private int numMessages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class ClientConfiguration implements Serializable {
private String tlsTrustCertsFilePath = "";
private boolean tlsAllowInsecureConnection = false;
private int concurrentLookupRequest = 5000;
private int maxNumberOfRejectedRequestPerConnection = 50;

/**
* @return the authentication provider to be used
Expand Down Expand Up @@ -330,4 +331,25 @@ public int getConcurrentLookupRequest() {
public void setConcurrentLookupRequest(int concurrentLookupRequest) {
this.concurrentLookupRequest = concurrentLookupRequest;
}

/**
* Get configured max number of reject-request in a time-frame (30 seconds) after which connection will be closed
*
* @return
*/
public int getMaxNumberOfRejectedRequestPerConnection() {
return maxNumberOfRejectedRequestPerConnection;
}

/**
* Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
* will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
* 50)</i>
*
* @param maxNumberOfRejectedRequestPerConnection
*/
public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
this.maxNumberOfRejectedRequestPerConnection = maxNumberOfRejectedRequestPerConnection;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
lookupDataResult.redirect, lookupDataResult.partitions, e.getMessage())));
}
}).exceptionally((e) -> {
log.warn("[{}] failed to get Partitioned metadata : {}", destination.toString(), e.getMessage(), e);
log.warn("[{}] failed to get Partitioned metadata : {}", destination.toString(),
e.getCause().getMessage(), e);
partitionFuture.completeExceptionally(e);
return null;
});
Expand Down
Loading

0 comments on commit 6d4bd9d

Please sign in to comment.