Skip to content

Commit

Permalink
[fix][broker] Avoid execute prepareInitPoliciesCacheAsync if namespac…
Browse files Browse the repository at this point in the history
…e is deleted (#22268)
  • Loading branch information
hanmz authored Mar 17, 2024
1 parent 4e0c145 commit 96d77f7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,34 +324,46 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
}
}

private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
@VisibleForTesting
@Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
requireNonNull(namespace);
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClient(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
final CompletableFuture<Void> initFuture = readerCompletableFuture
.thenCompose(reader -> {
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
initPolicesCache(reader, stageFuture);
return stageFuture
// Read policies in background
.thenAccept(__ -> readMorePoliciesAsync(reader));
});
initFuture.exceptionally(ex -> {
try {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
cleanCacheAndCloseReader(namespace, false);
} catch (Throwable cleanupEx) {
// Adding this catch to avoid break callback chain
log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx);
}
return null;
});
// let caller know we've got an exception.
return initFuture;
});
return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
.thenCompose(namespacePolicies -> {
if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
log.info("[{}] skip prepare init policies cache since the namespace is deleted",
namespace);
return CompletableFuture.completedFuture(null);
}

return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClient(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
final CompletableFuture<Void> initFuture = readerCompletableFuture
.thenCompose(reader -> {
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
initPolicesCache(reader, stageFuture);
return stageFuture
// Read policies in background
.thenAccept(__ -> readMorePoliciesAsync(reader));
});
initFuture.exceptionally(ex -> {
try {
log.error("[{}] Failed to create reader on __change_events topic",
namespace, ex);
cleanCacheAndCloseReader(namespace, false);
} catch (Throwable cleanupEx) {
// Adding this catch to avoid break callback chain
log.error("[{}] Failed to cleanup reader on __change_events topic",
namespace, cleanupEx);
}
return null;
});
// let caller know we've got an exception.
return initFuture;
});
});
}

protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic

private static final String NAMESPACE4 = "system-topic/namespace-4";

private static final String NAMESPACE5 = "system-topic/namespace-5";

private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1");
private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2");
private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1");
Expand Down Expand Up @@ -465,4 +467,21 @@ public void testWriterCache() throws Exception {
admin.namespaces().deleteNamespace(NAMESPACE4);
Assert.assertNull(service.getWriterCaches().synchronous().getIfPresent(NamespaceName.get(NAMESPACE4)));
}

@Test
public void testPrepareInitPoliciesCacheAsyncWhenNamespaceBeingDeleted() throws Exception {
SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
admin.namespaces().createNamespace(NAMESPACE5);

NamespaceName namespaceName = NamespaceName.get(NAMESPACE5);
pulsar.getPulsarResources().getNamespaceResources().setPolicies(namespaceName,
old -> {
old.deleted = true;
return old;
});

assertNull(service.getPoliciesCacheInit(namespaceName));
service.prepareInitPoliciesCacheAsync(namespaceName).get();
admin.namespaces().deleteNamespace(NAMESPACE5);
}
}

0 comments on commit 96d77f7

Please sign in to comment.