Skip to content

Commit

Permalink
[fix][broker] Inconsistent behaviour for topic auto_creation (apache#…
Browse files Browse the repository at this point in the history
…20843)

(cherry picked from commit 9b6a123)
  • Loading branch information
mattisonchao authored and shibd committed Oct 21, 2023
1 parent edbb6bb commit 90aac85
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2894,44 +2894,44 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
&& !topicExists
&& !topicName.isPartitioned()
&& pulsar.getBrokerService()
.isAllowAutoTopicCreation(topicName, policies)
&& pulsar.getBrokerService()
.isDefaultTopicTypePartitioned(topicName, policies)) {

pulsar.getBrokerService()
.createDefaultPartitionedTopicAsync(topicName, policies)
.thenAccept(md -> future.complete(md))
.exceptionally(ex -> {
if (ex.getCause()
instanceof MetadataStoreException.AlreadyExistsException) {
log.info("[{}] The partitioned topic is already"
+ " created, try to refresh the cache and read"
+ " again.", topicName);
// The partitioned topic might be created concurrently
fetchPartitionedTopicMetadataAsync(topicName, true)
.whenComplete((metadata2, ex2) -> {
if (ex2 == null) {
future.complete(metadata2);
} else {
future.completeExceptionally(ex2);
}
});
} else {
log.error("[{}] operation of creating partitioned"
+ " topic metadata failed",
topicName, ex);
future.completeExceptionally(ex);
}
return null;
});
.isDefaultTopicTypePartitioned(topicName, policies)) {
isAllowAutoTopicCreationAsync(topicName, policies).thenAccept(allowed -> {
if (allowed) {
pulsar.getBrokerService()
.createDefaultPartitionedTopicAsync(topicName, policies)
.thenAccept(md -> future.complete(md))
.exceptionally(ex -> {
if (ex.getCause()
instanceof MetadataStoreException
.AlreadyExistsException) {
// The partitioned topic might be created concurrently
fetchPartitionedTopicMetadataAsync(topicName)
.whenComplete((metadata2, ex2) -> {
if (ex2 == null) {
future.complete(metadata2);
} else {
future.completeExceptionally(ex2);
}
});
} else {
future.completeExceptionally(ex);
}
return null;
});
} else {
future.complete(metadata);
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
} else {
future.complete(metadata);
}
});

return future;
}))
);
})));
}

@SuppressWarnings("deprecation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,4 @@ public void testPartitionedTopicAutoCreation() {

t1.interrupt();
}
}
}

0 comments on commit 90aac85

Please sign in to comment.