Skip to content

Commit

Permalink
[fix] [client] Do no retrying for error subscription not found when d…
Browse files Browse the repository at this point in the history
…isabled allowAutoSubscriptionCreation (#22164)

Co-authored-by: zifengmo <38554710+zifengmo@users.noreply.github.com>
  • Loading branch information
Technoboy- and zifengmo committed Mar 13, 2024
1 parent da98b8d commit 97c639e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ private static ServerError getClientErrorCode(Throwable t, boolean checkCauseIfU
return ServerError.ServiceNotReady;
} else if (t instanceof TopicNotFoundException) {
return ServerError.TopicNotFound;
} else if (t instanceof SubscriptionNotFoundException) {
return ServerError.SubscriptionNotFound;
} else if (t instanceof IncompatibleSchemaException
|| t instanceof InvalidSchemaDataException) {
// for backward compatible with old clients, invalid schema data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,4 +371,36 @@ public void testMultipleIOThreads() throws PulsarAdminException, PulsarClientExc
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
assertTrue(consumer.isConnected());
}

@Test(timeOut = 30000)
public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClientException {
final var topic1 = newTopicName();
final var topic2 = newTopicName();

pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

try {
final var singleTopicConsumer = pulsarClient.newConsumer()
.topic(topic1)
.subscriptionName("sub-1")
.isAckReceiptEnabled(true)
.subscribe();
assertTrue(singleTopicConsumer instanceof ConsumerImpl);
} catch (Throwable t) {
assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException);
}

try {
final var multiTopicsConsumer = pulsarClient.newConsumer()
.topics(List.of(topic1, topic2))
.subscriptionName("sub-2")
.isAckReceiptEnabled(true)
.subscribe();
assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl);
} catch (Throwable t) {
assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException);
}

pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,22 @@ public TopicDoesNotExistException(String msg) {
}
}

/**
* Not found subscription that cannot be created.
*/
public static class SubscriptionNotFoundException extends PulsarClientException {
/**
* Constructs an {@code SubscriptionNotFoundException} with the specified detail message.
*
* @param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public SubscriptionNotFoundException(String msg) {
super(msg);
}
}

/**
* Lookup exception thrown by Pulsar client.
*/
Expand Down Expand Up @@ -1163,6 +1179,7 @@ public static boolean isRetriableError(Throwable t) {
|| t instanceof NotFoundException
|| t instanceof IncompatibleSchemaException
|| t instanceof TopicDoesNotExistException
|| t instanceof SubscriptionNotFoundException
|| t instanceof UnsupportedAuthenticationException
|| t instanceof InvalidMessageException
|| t instanceof InvalidTopicNameException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,8 @@ public static PulsarClientException getPulsarClientException(ServerError error,
return new PulsarClientException.IncompatibleSchemaException(errorMsg);
case TopicNotFound:
return new PulsarClientException.TopicDoesNotExistException(errorMsg);
case SubscriptionNotFound:
return new PulsarClientException.SubscriptionNotFoundException(errorMsg);
case ConsumerAssignError:
return new PulsarClientException.ConsumerAssignException(errorMsg);
case NotAllowedError:
Expand Down

0 comments on commit 97c639e

Please sign in to comment.