From 90aac85ab123d40a6dfcc9c0349d2c0d7d79ed21 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Fri, 21 Jul 2023 13:04:30 +0800 Subject: [PATCH] [fix][broker] Inconsistent behaviour for topic auto_creation (#20843) (cherry picked from commit 9b6a1232cf35d15b9bf492f60f5e52534d879df2) --- .../pulsar/broker/service/BrokerService.java | 64 +++++++++---------- .../impl/HierarchyTopicAutoCreationTest.java | 2 +- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8783b319da5343..0db66193435b8f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2894,44 +2894,44 @@ public CompletableFuture fetchPartitionedTopicMetadata && !topicExists && !topicName.isPartitioned() && pulsar.getBrokerService() - .isAllowAutoTopicCreation(topicName, policies) - && pulsar.getBrokerService() - .isDefaultTopicTypePartitioned(topicName, policies)) { - - pulsar.getBrokerService() - .createDefaultPartitionedTopicAsync(topicName, policies) - .thenAccept(md -> future.complete(md)) - .exceptionally(ex -> { - if (ex.getCause() - instanceof MetadataStoreException.AlreadyExistsException) { - log.info("[{}] The partitioned topic is already" - + " created, try to refresh the cache and read" - + " again.", topicName); - // The partitioned topic might be created concurrently - fetchPartitionedTopicMetadataAsync(topicName, true) - .whenComplete((metadata2, ex2) -> { - if (ex2 == null) { - future.complete(metadata2); - } else { - future.completeExceptionally(ex2); - } - }); - } else { - log.error("[{}] operation of creating partitioned" - + " topic metadata failed", - topicName, ex); - future.completeExceptionally(ex); - } - return null; - }); + .isDefaultTopicTypePartitioned(topicName, policies)) { + isAllowAutoTopicCreationAsync(topicName, policies).thenAccept(allowed -> { + if (allowed) { + pulsar.getBrokerService() + .createDefaultPartitionedTopicAsync(topicName, policies) + .thenAccept(md -> future.complete(md)) + .exceptionally(ex -> { + if (ex.getCause() + instanceof MetadataStoreException + .AlreadyExistsException) { + // The partitioned topic might be created concurrently + fetchPartitionedTopicMetadataAsync(topicName) + .whenComplete((metadata2, ex2) -> { + if (ex2 == null) { + future.complete(metadata2); + } else { + future.completeExceptionally(ex2); + } + }); + } else { + future.completeExceptionally(ex); + } + return null; + }); + } else { + future.complete(metadata); + } + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; + }); } else { future.complete(metadata); } }); return future; - })) - ); + }))); } @SuppressWarnings("deprecation") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java index 5068a8ccb6b0b7..4eece21d670039 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java @@ -93,4 +93,4 @@ public void testPartitionedTopicAutoCreation() { t1.interrupt(); } -} \ No newline at end of file +}