Skip to content

Commit 4958f45

Browse files
[improve][broker] Make get list from bundle Admin API async (#20652)
1 parent 43dc123 commit 4958f45

File tree

3 files changed

+45
-43
lines changed

3 files changed

+45
-43
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java

+29-30
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.pulsar.broker.PulsarServerException;
5252
import org.apache.pulsar.broker.service.Topic;
5353
import org.apache.pulsar.broker.web.RestException;
54-
import org.apache.pulsar.common.naming.NamespaceBundle;
5554
import org.apache.pulsar.common.naming.TopicName;
5655
import org.apache.pulsar.common.policies.data.EntryFilters;
5756
import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -449,35 +448,35 @@ public void getListFromBundle(
449448
bundleRange);
450449
asyncResponse.resume(Response.noContent().build());
451450
} else {
452-
NamespaceBundle nsBundle;
453-
try {
454-
nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles,
455-
bundleRange, true, true);
456-
} catch (WebApplicationException wae) {
457-
asyncResponse.resume(wae);
458-
return;
459-
}
460-
try {
461-
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics =
462-
pulsar().getBrokerService().getMultiLayerTopicsMap().get(namespaceName.toString());
463-
if (bundleTopics == null || bundleTopics.isEmpty()) {
464-
asyncResponse.resume(Collections.emptyList());
465-
return;
466-
}
467-
final List<String> topicList = new ArrayList<>();
468-
String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange();
469-
ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey);
470-
if (topicMap != null) {
471-
topicList.addAll(topicMap.keys().stream()
472-
.filter(name -> !TopicName.get(name).isPersistent())
473-
.collect(Collectors.toList()));
474-
}
475-
asyncResponse.resume(topicList);
476-
} catch (Exception e) {
477-
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
478-
namespaceName, bundleRange, e);
479-
asyncResponse.resume(new RestException(e));
480-
}
451+
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true)
452+
.thenAccept(nsBundle -> {
453+
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics =
454+
pulsar().getBrokerService()
455+
.getMultiLayerTopicsMap().get(namespaceName.toString());
456+
if (bundleTopics == null || bundleTopics.isEmpty()) {
457+
asyncResponse.resume(Collections.emptyList());
458+
return;
459+
}
460+
final List<String> topicList = new ArrayList<>();
461+
String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange();
462+
ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey);
463+
if (topicMap != null) {
464+
topicList.addAll(topicMap.keys().stream()
465+
.filter(name -> !TopicName.get(name).isPersistent())
466+
.collect(Collectors.toList()));
467+
}
468+
asyncResponse.resume(topicList);
469+
}).exceptionally(ex -> {
470+
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
471+
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
472+
namespaceName, bundleRange, realCause);
473+
if (realCause instanceof WebApplicationException) {
474+
asyncResponse.resume(realCause);
475+
} else {
476+
asyncResponse.resume(new RestException(realCause));
477+
}
478+
return null;
479+
});
481480
}
482481
}).exceptionally(ex -> {
483482
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -717,19 +717,19 @@ public CompletableFuture<Void> validateBundleOwnershipAsync(NamespaceBundle bund
717717
throw new RestException(Status.PRECONDITION_FAILED,
718718
"Failed to find ownership for ServiceUnit:" + bundle.toString());
719719
}
720-
// If the load manager is extensible load manager, we don't need check the authoritative.
721-
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
722-
return CompletableFuture.completedFuture(null);
723-
}
724720
return nsService.isServiceUnitOwnedAsync(bundle)
725721
.thenAccept(owned -> {
726722
if (!owned) {
727723
boolean newAuthoritative = this.isLeaderBroker();
728724
// Replace the host and port of the current request and redirect
729-
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
730-
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
731-
newAuthoritative).replaceQueryParam("destinationBroker",
732-
null).build();
725+
UriBuilder uriBuilder = UriBuilder.fromUri(uri.getRequestUri())
726+
.host(webUrl.get().getHost())
727+
.port(webUrl.get().getPort())
728+
.replaceQueryParam("authoritative", newAuthoritative);
729+
if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
730+
uriBuilder.replaceQueryParam("destinationBroker", null);
731+
}
732+
URI redirect = uriBuilder.build();
733733
log.debug("{} is not a service unit owned", bundle);
734734
// Redirect
735735
log.debug("Redirecting the rest call to {}", redirect);

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -964,7 +964,10 @@ public void testDisableBroker() throws Exception {
964964
defaultConf.setAllowAutoTopicCreation(true);
965965
defaultConf.setForceDeleteNamespaceAllowed(true);
966966
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
967+
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
967968
defaultConf.setLoadBalancerSheddingEnabled(false);
969+
defaultConf.setLoadBalancerDebugModeEnabled(true);
970+
defaultConf.setTopicLevelPoliciesEnabled(false);
968971
try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
969972
var pulsar3 = additionalPulsarTestContext.getPulsarService();
970973
ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl)
@@ -1005,7 +1008,7 @@ public void testDisableBroker() throws Exception {
10051008
@Test(timeOut = 30 * 1000)
10061009
public void testListTopic() throws Exception {
10071010
final String namespace = "public/testListTopic";
1008-
admin.namespaces().createNamespace(namespace, 3);
1011+
admin.namespaces().createNamespace(namespace, 9);
10091012

10101013
final String persistentTopicName = TopicName.get(
10111014
"persistent", NamespaceName.get(namespace),
@@ -1014,8 +1017,8 @@ public void testListTopic() throws Exception {
10141017
final String nonPersistentTopicName = TopicName.get(
10151018
"non-persistent", NamespaceName.get(namespace),
10161019
"get_topics_mode_" + UUID.randomUUID()).toString();
1017-
admin.topics().createPartitionedTopic(persistentTopicName, 3);
1018-
admin.topics().createPartitionedTopic(nonPersistentTopicName, 3);
1020+
admin.topics().createPartitionedTopic(persistentTopicName, 9);
1021+
admin.topics().createPartitionedTopic(nonPersistentTopicName, 9);
10191022
pulsarClient.newProducer().topic(persistentTopicName).create().close();
10201023
pulsarClient.newProducer().topic(nonPersistentTopicName).create().close();
10211024

@@ -1033,10 +1036,10 @@ public void testListTopic() throws Exception {
10331036
assertFalse(TopicName.get(s).isPersistent());
10341037
}
10351038
}
1036-
assertEquals(topicNum, 3);
1039+
assertEquals(topicNum, 9);
10371040

10381041
List<String> list = admin.topics().getList(namespace);
1039-
assertEquals(list.size(), 6);
1042+
assertEquals(list.size(), 18);
10401043
admin.namespaces().deleteNamespace(namespace, true);
10411044
}
10421045

0 commit comments

Comments
 (0)