Skip to content

Commit

Permalink
[fix] [broker] Fix race-condition causing repeated delete topic (apac…
Browse files Browse the repository at this point in the history
…he#23522)

(cherry picked from commit 7b80f01)
(cherry picked from commit eddf395)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Nov 7, 2024
1 parent 17a32fd commit 2142929
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,13 @@ private static ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {
}

private static MetaStoreException getException(Throwable t) {
if (t.getCause() instanceof MetadataStoreException.BadVersionException) {
return new ManagedLedgerException.BadVersionException(t.getMessage());
Throwable actEx = FutureUtil.unwrapCompletionException(t);
if (actEx instanceof MetadataStoreException.BadVersionException badVersionException) {
return new ManagedLedgerException.BadVersionException(badVersionException);
} else if (actEx instanceof MetaStoreException metaStoreException){
return metaStoreException;
} else {
return new MetaStoreException(t);
return new MetaStoreException(actEx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
// Mark the progress of close to prevent close calling concurrently.
this.closeFutures = new CloseFutures(new CompletableFuture(), new CompletableFuture());

AtomicBoolean alreadyUnFenced = new AtomicBoolean();
CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
Expand All @@ -1453,6 +1454,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
closeClientFuture.complete(null);
}, getOrderedExecutor()).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
alreadyUnFenced.set(true);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
Expand All @@ -1475,6 +1477,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
Expand All @@ -1484,6 +1487,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
log.error("[{}] Error deleting topic", topic, e);
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(e);
} else {
Expand Down Expand Up @@ -1514,6 +1518,7 @@ public void deleteLedgerComplete(Object ctx) {
} else {
log.error("[{}] Error deleting topic",
topic, exception);
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new PersistenceException(exception));
Expand All @@ -1526,6 +1531,7 @@ public void deleteLedgerComplete(Object ctx) {
}
});
}).exceptionally(ex->{
alreadyUnFenced.set(true);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before deleting topic.",
Expand All @@ -1537,7 +1543,9 @@ public void deleteLedgerComplete(Object ctx) {
}).whenComplete((value, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
if (!alreadyUnFenced.get()) {
unfenceTopicToResume();
}
}
});

Expand Down

0 comments on commit 2142929

Please sign in to comment.