Skip to content

Commit

Permalink
[improve][broker] Improve naming for delete topic error (#16965)
Browse files Browse the repository at this point in the history
  • Loading branch information
leizhiyuan authored Aug 14, 2022
1 parent 6fc48d1 commit d3dd143
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void deleteTopic(
Throwable t = FutureUtil.unwrapCompletionException(ex);
if (!force && (t instanceof BrokerServiceException.TopicBusyException)) {
ex = new RestException(Response.Status.PRECONDITION_FAILED,
"Topic has active producers/subscriptions");
t.getMessage());
}
if (isManagedLedgerNotFoundException(t)) {
ex = new RestException(Response.Status.NOT_FOUND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ public void deleteTopic(
Throwable t = FutureUtil.unwrapCompletionException(ex);
if (!force && (t instanceof BrokerServiceException.TopicBusyException)) {
ex = new RestException(Response.Status.PRECONDITION_FAILED,
"Topic has active producers/subscriptions");
t.getMessage());
}
if (isManagedLedgerNotFoundException(t)) {
ex = new RestException(Response.Status.NOT_FOUND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
if (failIfHasSubscriptions) {
if (!subscriptions.isEmpty()) {
isFenced = false;
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
deleteFuture.completeExceptionally(
new TopicBusyException("Topic has subscriptions:" + subscriptions.keys()));
return;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,9 +1130,15 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
log.warn("[{}] Topic is already being closed or deleted", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
} else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions"));
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions: " + subscriptions.keys()));
} else if (failIfHasBacklogs && hasBacklogs()) {
return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions did not catch up"));
List<String> backlogSubs =
subscriptions.values().stream()
.filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0)
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
}

fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ public void testDeleteTopicAndSchemaForV1() throws Exception {
} catch (Exception e) {
assertThat(e.getMessage())
.isNotNull()
.startsWith("Topic has active producers/subscriptions");
.startsWith("Topic has 2 connected producers/consumers");
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 2);
Expand Down Expand Up @@ -936,7 +936,7 @@ public void testDeleteTopicAndSchemaForV2() throws Exception {
admin.topics().delete(topicOne, false);
fail();
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions"));
assertTrue(e.getMessage().startsWith("Topic has 2 connected producers/consumers"));
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topicOne).getSchemaName()).get().size(), 2);
Expand Down

0 comments on commit d3dd143

Please sign in to comment.