Skip to content

Commit

Permalink
Do not retry on authorization failure (#6577)
Browse files Browse the repository at this point in the history
* Do not retry on authorization failure

* Address feedback

* Fix logic

* Fix test

* Fixed more tests

* Fixed more test

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
(cherry picked from commit 6cb0d25)
  • Loading branch information
srkukarni authored and tuteng committed Apr 13, 2020
1 parent 584079e commit 5ff552a
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private void producerFailDoesNotFailOtherProducer(String topic1, String topic2)
mockBrokerService.setHandleProducer((ctx, producer) -> {
if (counter.incrementAndGet() == 2) {
// fail second producer
ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthenticationError, "msg"));
ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
return;
}
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
Expand Down Expand Up @@ -436,7 +436,7 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2)
mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
if (counter.incrementAndGet() == 2) {
// fail second producer
ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg"));
ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg"));
return;
}
ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
Expand Down Expand Up @@ -507,7 +507,7 @@ public void testOneConsumerFailShouldCloseAllConsumersInPartitionedConsumer() th
mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
System.err.println("subscribeCounter: " + subscribeCounter.get());
if (subscribeCounter.incrementAndGet() == 3) {
ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg"));
ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg"));
return;
}
ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
Expand All @@ -520,8 +520,8 @@ public void testOneConsumerFailShouldCloseAllConsumersInPartitionedConsumer() th

try {
client.newConsumer().topic("persistent://prop/use/ns/multi-part-t1").subscriptionName("sub1").subscribe();
fail("Should have failed with an authentication error");
} catch (PulsarClientException.AuthenticationException e) {
fail("Should have failed with an authorization error");
} catch (PulsarClientException.AuthorizationException e) {
}

// should call close for 3 partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void emptySubcriptionConsumerTest() throws Exception {
}
}

@Test(timeOut = 10000)
@Test(timeOut = 1000000)
public void conflictingConsumerTest() throws Exception {
final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/consumer/persistent/my-property/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
Expand Down Expand Up @@ -244,7 +244,7 @@ public void conflictingConsumerTest() throws Exception {
}
}

@Test(timeOut = 10000)
@Test(timeOut = 100000)
public void conflictingProducerTest() throws Exception {
final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get()
+ "/ws/v2/producer/persistent/my-property/my-ns/my-topic4?producerName=my-producer";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,4 +719,23 @@ public static PulsarClientException unwrap(Throwable t) {
return new PulsarClientException(t);
}
}

public static boolean isRetriableError(Throwable t) {
if (t instanceof TooManyRequestsException
|| t instanceof AuthorizationException
|| t instanceof InvalidServiceURL
|| t instanceof InvalidConfigurationException
|| t instanceof NotFoundException
|| t instanceof IncompatibleSchemaException
|| t instanceof TopicDoesNotExistException
|| t instanceof UnsupportedAuthenticationException
|| t instanceof InvalidMessageException
|| t instanceof InvalidTopicNameException
|| t instanceof NotSupportedException
|| t instanceof ChecksumException
|| t instanceof CryptoException) {
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ public ClientCnx cnx() {
return CLIENT_CNX_UPDATER.get(this);
}

protected boolean isRetriableError(PulsarClientException e) {
return e instanceof PulsarClientException.LookupException;
}

protected void setClientCnx(ClientCnx clientCnx) {
CLIENT_CNX_UPDATER.set(this, clientCnx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ public void connectionOpened(final ClientCnx cnx) {
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress());

if (e.getCause() instanceof PulsarClientException
&& getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
&& PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis() < subscribeTimeout) {
reconnectLater(e.getCause());
} else if (!subscribeFuture.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ public void connectionOpened(final ClientCnx cnx) {
producerCreatedFuture.completeExceptionally(cause);
client.cleanupProducer(this);
} else if (producerCreatedFuture.isDone() || //
(cause instanceof PulsarClientException && connectionHandler.isRetriableError((PulsarClientException) cause)
(cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause)
&& System.currentTimeMillis() < createProducerTimeout)) {
// Either we had already created the producer once (producerCreatedFuture.isDone()) or we are
// still within the initial timeout budget and we are dealing with a retriable error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ private void getPartitionedTopicMetadata(TopicName topicName,
lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
// skip retry scheduler when set lookup throttle in client or server side which will lead to `TooManyRequestsException`
boolean isLookupThrottling = e.getCause() instanceof PulsarClientException.TooManyRequestsException;
boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause());
if (nextDelay <= 0 || isLookupThrottling) {
future.completeExceptionally(e);
return null;
Expand Down

0 comments on commit 5ff552a

Please sign in to comment.