From 7717b0f01433e23766d7a73f42442b5867591d8b Mon Sep 17 00:00:00 2001 From: mattison chao Date: Fri, 27 May 2022 14:02:05 +0800 Subject: [PATCH 1/3] [fix][broker] Fix async methods that are not awaited. --- .../pulsar/broker/service/persistent/PersistentTopic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 30de90ac1a7e1..73c5a930c2c28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1179,7 +1179,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema()) - .thenAccept(__ -> deleteTopicPolicies()) + .thenCompose(__ -> deleteTopicPolicies()) .thenCompose(__ -> transactionBufferCleanupAndClose()) .whenComplete((v, ex) -> { if (ex != null) { From a8bf10c1c837fe7a6736f16a31d68386468aed73 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 17 Aug 2022 20:30:05 +0800 Subject: [PATCH 2/3] Fix test --- .../pulsar/broker/service/BrokerService.java | 15 ++++++++++++--- .../api/AuthenticatedProducerConsumerTest.java | 1 + 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 532e41cfbd3c3..d1e7018ede9c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2928,11 +2928,20 @@ public Optional getTopicPolicies(TopicName topicName) { } public CompletableFuture deleteTopicPolicies(TopicName topicName) { - if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) { + final PulsarService pulsarService = pulsar(); + if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) { return CompletableFuture.completedFuture(null); } - TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName()); - return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName); + return pulsarService.getPulsarResources().getNamespaceResources() + .getPoliciesAsync(topicName.getNamespaceObject()) + .thenComposeAsync(optPolicies -> { + if (optPolicies.isPresent() && optPolicies.get().deleted) { + // If the namespace is deleted, we can return directly. + return CompletableFuture.completedFuture(null); + } + TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName()); + return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName); + }); } private CompletableFuture checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 046b26846e2d3..e0cc980991ef7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -80,6 +80,7 @@ protected void setup() throws Exception { conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); conf.setTlsAllowInsecureConnection(true); + conf.setTopicLevelPoliciesEnabled(false); Set superUserRoles = new HashSet<>(); superUserRoles.add("localhost"); From c46ef8e64e702e6fcb35653fc69d6ce2a0ef8af6 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 17 Aug 2022 20:33:26 +0800 Subject: [PATCH 3/3] Fix typo --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d1e7018ede9c0..385164195cd26 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2936,7 +2936,7 @@ public CompletableFuture deleteTopicPolicies(TopicName topicName) { .getPoliciesAsync(topicName.getNamespaceObject()) .thenComposeAsync(optPolicies -> { if (optPolicies.isPresent() && optPolicies.get().deleted) { - // If the namespace is deleted, we can return directly. + // We can return the completed future directly if the namespace is already deleted. return CompletableFuture.completedFuture(null); } TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());