Skip to content

Commit

Permalink
[improve][broker] Make get list from bundle Admin API async (#20652)
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 authored and Technoboy- committed Jul 3, 2023
1 parent 4d8051c commit dcc44d8
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
Expand Down Expand Up @@ -445,35 +444,35 @@ public void getListFromBundle(
bundleRange);
asyncResponse.resume(Response.noContent().build());
} else {
NamespaceBundle nsBundle;
try {
nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles,
bundleRange, true, true);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
}
try {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics =
pulsar().getBrokerService().getMultiLayerTopicsMap().get(namespaceName.toString());
if (bundleTopics == null || bundleTopics.isEmpty()) {
asyncResponse.resume(Collections.emptyList());
return;
}
final List<String> topicList = new ArrayList<>();
String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange();
ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey);
if (topicMap != null) {
topicList.addAll(topicMap.keys().stream()
.filter(name -> !TopicName.get(name).isPersistent())
.collect(Collectors.toList()));
}
asyncResponse.resume(topicList);
} catch (Exception e) {
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange, e);
asyncResponse.resume(new RestException(e));
}
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true)
.thenAccept(nsBundle -> {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> bundleTopics =
pulsar().getBrokerService()
.getMultiLayerTopicsMap().get(namespaceName.toString());
if (bundleTopics == null || bundleTopics.isEmpty()) {
asyncResponse.resume(Collections.emptyList());
return;
}
final List<String> topicList = new ArrayList<>();
String bundleKey = namespaceName.toString() + "/" + nsBundle.getBundleRange();
ConcurrentOpenHashMap<String, Topic> topicMap = bundleTopics.get(bundleKey);
if (topicMap != null) {
topicList.addAll(topicMap.keys().stream()
.filter(name -> !TopicName.get(name).isPersistent())
.collect(Collectors.toList()));
}
asyncResponse.resume(topicList);
}).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange, realCause);
if (realCause instanceof WebApplicationException) {
asyncResponse.resume(realCause);
} else {
asyncResponse.resume(new RestException(realCause));
}
return null;
});
}
}).exceptionally(ex -> {
log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,19 +717,19 @@ public CompletableFuture<Void> validateBundleOwnershipAsync(NamespaceBundle bund
throw new RestException(Status.PRECONDITION_FAILED,
"Failed to find ownership for ServiceUnit:" + bundle.toString());
}
// If the load manager is extensible load manager, we don't need check the authoritative.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
return CompletableFuture.completedFuture(null);
}
return nsService.isServiceUnitOwnedAsync(bundle)
.thenAccept(owned -> {
if (!owned) {
boolean newAuthoritative = this.isLeaderBroker();
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
newAuthoritative).replaceQueryParam("destinationBroker",
null).build();
UriBuilder uriBuilder = UriBuilder.fromUri(uri.getRequestUri())
.host(webUrl.get().getHost())
.port(webUrl.get().getPort())
.replaceQueryParam("authoritative", newAuthoritative);
if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
uriBuilder.replaceQueryParam("destinationBroker", null);
}
URI redirect = uriBuilder.build();
log.debug("{} is not a service unit owned", bundle);
// Redirect
log.debug("Redirecting the rest call to {}", redirect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,10 @@ public void testDisableBroker() throws Exception {
defaultConf.setAllowAutoTopicCreation(true);
defaultConf.setForceDeleteNamespaceAllowed(true);
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
defaultConf.setLoadBalancerSheddingEnabled(false);
defaultConf.setLoadBalancerDebugModeEnabled(true);
defaultConf.setTopicLevelPoliciesEnabled(false);
try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
var pulsar3 = additionalPulsarTestContext.getPulsarService();
ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl)
Expand Down Expand Up @@ -1005,7 +1008,7 @@ public void testDisableBroker() throws Exception {
@Test(timeOut = 30 * 1000)
public void testListTopic() throws Exception {
final String namespace = "public/testListTopic";
admin.namespaces().createNamespace(namespace, 3);
admin.namespaces().createNamespace(namespace, 9);

final String persistentTopicName = TopicName.get(
"persistent", NamespaceName.get(namespace),
Expand All @@ -1014,8 +1017,8 @@ public void testListTopic() throws Exception {
final String nonPersistentTopicName = TopicName.get(
"non-persistent", NamespaceName.get(namespace),
"get_topics_mode_" + UUID.randomUUID()).toString();
admin.topics().createPartitionedTopic(persistentTopicName, 3);
admin.topics().createPartitionedTopic(nonPersistentTopicName, 3);
admin.topics().createPartitionedTopic(persistentTopicName, 9);
admin.topics().createPartitionedTopic(nonPersistentTopicName, 9);
pulsarClient.newProducer().topic(persistentTopicName).create().close();
pulsarClient.newProducer().topic(nonPersistentTopicName).create().close();

Expand All @@ -1033,10 +1036,10 @@ public void testListTopic() throws Exception {
assertFalse(TopicName.get(s).isPersistent());
}
}
assertEquals(topicNum, 3);
assertEquals(topicNum, 9);

List<String> list = admin.topics().getList(namespace);
assertEquals(list.size(), 6);
assertEquals(list.size(), 18);
admin.namespaces().deleteNamespace(namespace, true);
}

Expand Down

0 comments on commit dcc44d8

Please sign in to comment.