From b3750a9848594d14d23196e1ba8b554599200f4c Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Fri, 20 Mar 2020 13:03:55 -0700 Subject: [PATCH 1/6] Do not retry on authorization failure --- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 27a158d4813fd..bc03ff913b4ec 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -679,7 +679,8 @@ private void getPartitionedTopicMetadata(TopicName topicName, lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> { long nextDelay = Math.min(backoff.next(), remainingTime.get()); // skip retry scheduler when set lookup throttle in client or server side which will lead to `TooManyRequestsException` - boolean isLookupThrottling = e.getCause() instanceof PulsarClientException.TooManyRequestsException; + boolean isLookupThrottling = e.getCause() instanceof PulsarClientException.TooManyRequestsException || + e.getCause() instanceof PulsarClientException.AuthorizationException; if (nextDelay <= 0 || isLookupThrottling) { future.completeExceptionally(e); return null; From 882622babec15386ff9282e8fddb7f5b7664f007 Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Fri, 20 Mar 2020 14:52:58 -0700 Subject: [PATCH 2/6] Address feedback --- .../client/api/PulsarClientException.java | 19 +++++++++++++++++++ .../pulsar/client/impl/ConnectionHandler.java | 4 ---- .../pulsar/client/impl/ConsumerImpl.java | 2 +- .../pulsar/client/impl/ProducerImpl.java | 2 +- .../pulsar/client/impl/PulsarClientImpl.java | 3 +-- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index f751cc234a997..8703caa45abd3 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -719,4 +719,23 @@ public static PulsarClientException unwrap(Throwable t) { return new PulsarClientException(t); } } + + public static boolean isRetriableError(Throwable t) { + if (t instanceof TooManyRequestsException + || t instanceof AuthorizationException + || t instanceof InvalidServiceURL + || t instanceof InvalidConfigurationException + || t instanceof NotFoundException + || t instanceof IncompatibleSchemaException + || t instanceof TopicDoesNotExistException + || t instanceof UnsupportedAuthenticationException + || t instanceof InvalidMessageException + || t instanceof InvalidTopicNameException + || t instanceof NotSupportedException + || t instanceof ChecksumException + || t instanceof CryptoException) { + return false; + } + return true; + } } \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 8eab9ab469e30..0df0aaeab3867 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -130,10 +130,6 @@ public ClientCnx cnx() { return CLIENT_CNX_UPDATER.get(this); } - protected boolean isRetriableError(PulsarClientException e) { - return e instanceof PulsarClientException.LookupException; - } - protected void setClientCnx(ClientCnx clientCnx) { CLIENT_CNX_UPDATER.set(this, clientCnx); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 14d0aeb14daee..5312675972d62 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -628,7 +628,7 @@ public void connectionOpened(final ClientCnx cnx) { log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress()); if (e.getCause() instanceof PulsarClientException - && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause()) + && PulsarClientException.isRetriableError(e.getCause()) && System.currentTimeMillis() < subscribeTimeout) { reconnectLater(e.getCause()); } else if (!subscribeFuture.isDone()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index fc16fe869415b..989a284b8423a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1178,7 +1178,7 @@ public void connectionOpened(final ClientCnx cnx) { producerCreatedFuture.completeExceptionally(cause); client.cleanupProducer(this); } else if (producerCreatedFuture.isDone() || // - (cause instanceof PulsarClientException && connectionHandler.isRetriableError((PulsarClientException) cause) + (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause) && System.currentTimeMillis() < createProducerTimeout)) { // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are // still within the initial timeout budget and we are dealing with a retriable error diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index bc03ff913b4ec..c9e46509aea39 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -679,8 +679,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> { long nextDelay = Math.min(backoff.next(), remainingTime.get()); // skip retry scheduler when set lookup throttle in client or server side which will lead to `TooManyRequestsException` - boolean isLookupThrottling = e.getCause() instanceof PulsarClientException.TooManyRequestsException || - e.getCause() instanceof PulsarClientException.AuthorizationException; + boolean isLookupThrottling = PulsarClientException.isRetriableError(e.getCause()); if (nextDelay <= 0 || isLookupThrottling) { future.completeExceptionally(e); return null; From 5ba2079555556d789565fd6b41fc5382ff2da1ea Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Sat, 21 Mar 2020 09:29:54 -0700 Subject: [PATCH 3/6] Fix logic --- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index c9e46509aea39..31a4ed0be4e4a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -679,7 +679,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> { long nextDelay = Math.min(backoff.next(), remainingTime.get()); // skip retry scheduler when set lookup throttle in client or server side which will lead to `TooManyRequestsException` - boolean isLookupThrottling = PulsarClientException.isRetriableError(e.getCause()); + boolean isLookupThrottling = !PulsarClientException.isRetriableError(e.getCause()); if (nextDelay <= 0 || isLookupThrottling) { future.completeExceptionally(e); return null; From cda0e6c65e8f91408f3d9edde8a14b3d50b1ed88 Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Sat, 21 Mar 2020 12:28:56 -0700 Subject: [PATCH 4/6] Fix test --- .../java/org/apache/pulsar/client/api/ClientErrorsTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index cfb7c02d47490..74ed608b5a1c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -507,7 +507,7 @@ public void testOneConsumerFailShouldCloseAllConsumersInPartitionedConsumer() th mockBrokerService.setHandleSubscribe((ctx, subscribe) -> { System.err.println("subscribeCounter: " + subscribeCounter.get()); if (subscribeCounter.incrementAndGet() == 3) { - ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg")); + ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg")); return; } ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId())); @@ -520,8 +520,8 @@ public void testOneConsumerFailShouldCloseAllConsumersInPartitionedConsumer() th try { client.newConsumer().topic("persistent://prop/use/ns/multi-part-t1").subscriptionName("sub1").subscribe(); - fail("Should have failed with an authentication error"); - } catch (PulsarClientException.AuthenticationException e) { + fail("Should have failed with an authorization error"); + } catch (PulsarClientException.AuthorizationException e) { } // should call close for 3 partitions From ae1c5b6a0936bf80b15ea2b57e276ff05c92b14d Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Sat, 21 Mar 2020 12:34:31 -0700 Subject: [PATCH 5/6] Fixed more tests --- .../java/org/apache/pulsar/client/api/ClientErrorsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index 74ed608b5a1c9..7751342dfbb6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -208,7 +208,7 @@ private void producerFailDoesNotFailOtherProducer(String topic1, String topic2) mockBrokerService.setHandleProducer((ctx, producer) -> { if (counter.incrementAndGet() == 2) { // fail second producer - ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthenticationError, "msg")); + ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg")); return; } ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty)); @@ -436,7 +436,7 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2) mockBrokerService.setHandleSubscribe((ctx, subscribe) -> { if (counter.incrementAndGet() == 2) { // fail second producer - ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthenticationError, "msg")); + ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, "msg")); return; } ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId())); From 05aadf5f8b2f53a62dd1c445ba04f454229ded71 Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Sat, 21 Mar 2020 13:22:39 -0700 Subject: [PATCH 6/6] Fixed more test --- .../pulsar/websocket/proxy/ProxyPublishConsumeTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 009596f074628..d33ed58c831d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -208,7 +208,7 @@ public void emptySubcriptionConsumerTest() throws Exception { } } - @Test(timeOut = 10000) + @Test(timeOut = 1000000) public void conflictingConsumerTest() throws Exception { final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/consumer/persistent/my-property/my-ns/my-topic3/sub1?subscriptionType=Exclusive"; @@ -244,7 +244,7 @@ public void conflictingConsumerTest() throws Exception { } } - @Test(timeOut = 10000) + @Test(timeOut = 100000) public void conflictingProducerTest() throws Exception { final String producerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + "/ws/v2/producer/persistent/my-property/my-ns/my-topic4?producerName=my-producer";