Skip to content

Commit 80400ef

Browse files
RobertIndienodece
authored andcommitted
[improve][broker] Make Namespaces.deleteNamespaceBundle async (apache#16287)
Master Issue: apache#14365 ### Motivation Please see apache#14365 ### Modifications * Make Namespaces.deleteNamespaceBundle async * Combine internalDeleteNamespaceBundle * Make removeOwnedServiceUnit async (cherry picked from commit 147672f) Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 5152554 commit 80400ef

File tree

6 files changed

+242
-238
lines changed

6 files changed

+242
-238
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

+125-179
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.collect.Lists;
2626
import com.google.common.collect.Sets;
2727
import java.lang.reflect.Field;
28+
import java.net.MalformedURLException;
2829
import java.net.URI;
2930
import java.net.URL;
3031
import java.util.ArrayList;
@@ -413,6 +414,38 @@ protected CompletableFuture<Void> internalClearZkSources() {
413414

414415
}
415416

417+
private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
418+
if (CollectionUtils.isEmpty(topicNames)) {
419+
return CompletableFuture.completedFuture(null);
420+
}
421+
List<CompletableFuture<Void>> futures = new ArrayList<>();
422+
for (String topicName : topicNames) {
423+
TopicName tn = TopicName.get(topicName);
424+
futures.add(pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
425+
.runWithMarkDeleteAsync(tn,
426+
() -> namespaceResources().getPartitionedTopicResources().deletePartitionedTopicAsync(tn)));
427+
}
428+
return FutureUtil.waitForAll(futures);
429+
}
430+
431+
private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
432+
if (CollectionUtils.isEmpty(topicNames)) {
433+
return CompletableFuture.completedFuture(null);
434+
}
435+
PulsarAdmin admin;
436+
try {
437+
admin = pulsar().getAdminClient();
438+
} catch (Exception ex) {
439+
log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
440+
return FutureUtil.failedFuture(ex);
441+
}
442+
List<CompletableFuture<Void>> futures = new ArrayList<>();
443+
for (String topicName : topicNames) {
444+
futures.add(admin.topics().deleteAsync(topicName, true));
445+
}
446+
return FutureUtil.waitForAll(futures);
447+
}
448+
416449
@SuppressWarnings("deprecation")
417450
protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, boolean authoritative) {
418451
validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
@@ -632,189 +665,102 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
632665
});
633666
}
634667

635-
private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
636-
log.info("internalDeletePartitionedTopicsAsync");
637-
if (CollectionUtils.isEmpty(topicNames)) {
638-
return CompletableFuture.completedFuture(null);
639-
}
640-
List<CompletableFuture<Void>> futures = new ArrayList<>();
641-
for (String topicName : topicNames) {
642-
TopicName tn = TopicName.get(topicName);
643-
futures.add(pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
644-
.runWithMarkDeleteAsync(tn,
645-
() -> namespaceResources().getPartitionedTopicResources().deletePartitionedTopicAsync(tn)));
646-
}
647-
return FutureUtil.waitForAll(futures);
648-
}
649-
private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
650-
log.info("internalDeleteTopicsAsync");
651-
if (CollectionUtils.isEmpty(topicNames)) {
652-
return CompletableFuture.completedFuture(null);
653-
}
654-
PulsarAdmin admin;
655-
try {
656-
admin = pulsar().getAdminClient();
657-
} catch (Exception ex) {
658-
log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
659-
return FutureUtil.failedFuture(ex);
660-
}
661-
List<CompletableFuture<Void>> futures = new ArrayList<>();
662-
for (String topicName : topicNames) {
663-
futures.add(admin.topics().deleteAsync(topicName, true, true));
664-
}
665-
return FutureUtil.waitForAll(futures);
666-
}
667-
668-
protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) {
669-
if (force) {
670-
internalDeleteNamespaceBundleForcefully(bundleRange, authoritative);
671-
} else {
672-
internalDeleteNamespaceBundle(bundleRange, authoritative);
673-
}
674-
}
675-
676668
@SuppressWarnings("deprecation")
677-
protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
678-
validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
679-
validatePoliciesReadOnlyAccess();
680-
681-
// ensure that non-global namespace is directed to the correct cluster
682-
if (!namespaceName.isGlobal()) {
683-
validateClusterOwnership(namespaceName.getCluster());
684-
}
685-
686-
Policies policies = getNamespacePolicies(namespaceName);
687-
// ensure the local cluster is the only cluster for the global namespace configuration
688-
try {
689-
if (namespaceName.isGlobal()) {
690-
if (policies.replication_clusters.size() > 1) {
691-
// There are still more than one clusters configured for the global namespace
692-
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
693-
+ namespaceName + ". There are still more than one replication clusters configured.");
694-
}
695-
if (policies.replication_clusters.size() == 1
696-
&& !policies.replication_clusters.contains(config().getClusterName())) {
697-
// the only replication cluster is other cluster, redirect
698-
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
699-
ClusterData replClusterData =
700-
clusterResources().getCluster(replCluster)
701-
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
702-
"Cluster " + replCluster + " does not exist"));
703-
URL replClusterUrl;
704-
if (!config().isTlsEnabled() || !isRequestHttps()) {
705-
replClusterUrl = new URL(replClusterData.getServiceUrl());
706-
} else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
707-
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
708-
} else {
709-
throw new RestException(Status.PRECONDITION_FAILED,
710-
"The replication cluster does not provide TLS encrypted service");
711-
}
712-
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
713-
.port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
714-
if (log.isDebugEnabled()) {
715-
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
716-
clientAppId(), redirect, replCluster);
669+
protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
670+
boolean force) {
671+
log.info("[{}] Deleting namespace bundle {}/{} authoritative:{} force:{}",
672+
clientAppId(), namespaceName, bundleRange, authoritative, force);
673+
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE)
674+
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
675+
.thenCompose(__ -> {
676+
if (!namespaceName.isGlobal()) {
677+
return validateClusterOwnershipAsync(namespaceName.getCluster());
717678
}
718-
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
719-
}
720-
}
721-
} catch (WebApplicationException wae) {
722-
throw wae;
723-
} catch (Exception e) {
724-
throw new RestException(e);
725-
}
726-
727-
try {
728-
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
729-
authoritative, true);
730-
List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
731-
.get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
732-
for (String topic : topics) {
733-
NamespaceBundle topicBundle = pulsar().getNamespaceService()
734-
.getBundle(TopicName.get(topic));
735-
if (bundle.equals(topicBundle)) {
736-
throw new RestException(Status.CONFLICT, "Cannot delete non empty bundle");
737-
}
738-
}
739-
740-
// remove from owned namespace map and ephemeral node from ZK
741-
pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
742-
pulsar().getBrokerService().getBundleStats().remove(bundle.toString());
743-
} catch (WebApplicationException wae) {
744-
throw wae;
745-
} catch (Exception e) {
746-
log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
747-
bundleRange, e);
748-
throw new RestException(e);
749-
}
750-
}
751-
752-
@SuppressWarnings("deprecation")
753-
protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boolean authoritative) {
754-
validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
755-
validatePoliciesReadOnlyAccess();
756-
757-
// ensure that non-global namespace is directed to the correct cluster
758-
if (!namespaceName.isGlobal()) {
759-
validateClusterOwnership(namespaceName.getCluster());
760-
}
679+
return CompletableFuture.completedFuture(null);
680+
})
681+
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
682+
.thenCompose(policies -> {
683+
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
684+
if (namespaceName.isGlobal()) {
685+
686+
if (policies.replication_clusters.size() > 1) {
687+
// There are still more than one clusters configured for the global namespace
688+
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
689+
+ namespaceName
690+
+ ". There are still more than one replication clusters configured.");
691+
}
692+
if (policies.replication_clusters.size() == 1
693+
&& !policies.replication_clusters.contains(config().getClusterName())) {
694+
// the only replication cluster is other cluster, redirect
695+
String replCluster = new ArrayList<>(policies.replication_clusters).get(0);
696+
future = clusterResources().getClusterAsync(replCluster)
697+
.thenCompose(clusterData -> {
698+
if (!clusterData.isPresent()) {
699+
throw new RestException(Status.NOT_FOUND,
700+
"Cluster " + replCluster + " does not exist");
701+
}
702+
ClusterData replClusterData = clusterData.get();
703+
URL replClusterUrl;
704+
try {
705+
if (!config().isTlsEnabled() || !isRequestHttps()) {
706+
replClusterUrl = new URL(replClusterData.getServiceUrl());
707+
} else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
708+
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
709+
} else {
710+
throw new RestException(Status.PRECONDITION_FAILED,
711+
"The replication cluster does not provide TLS encrypted "
712+
+ "service");
713+
}
714+
} catch (MalformedURLException malformedURLException) {
715+
throw new RestException(malformedURLException);
716+
}
761717

762-
Policies policies = getNamespacePolicies(namespaceName);
763-
// ensure the local cluster is the only cluster for the global namespace configuration
764-
try {
765-
if (namespaceName.isGlobal()) {
766-
if (policies.replication_clusters.size() > 1) {
767-
// There are still more than one clusters configured for the global namespace
768-
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
769-
+ namespaceName + ". There are still more than one replication clusters configured.");
770-
}
771-
if (policies.replication_clusters.size() == 1
772-
&& !policies.replication_clusters.contains(config().getClusterName())) {
773-
// the only replication cluster is other cluster, redirect
774-
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
775-
ClusterData replClusterData =
776-
clusterResources().getCluster(replCluster)
777-
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
778-
"Cluster " + replCluster + " does not exist"));
779-
URL replClusterUrl;
780-
if (!config().isTlsEnabled() || !isRequestHttps()) {
781-
replClusterUrl = new URL(replClusterData.getServiceUrl());
782-
} else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
783-
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
784-
} else {
785-
throw new RestException(Status.PRECONDITION_FAILED,
786-
"The replication cluster does not provide TLS encrypted service");
787-
}
788-
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
789-
.port(replClusterUrl.getPort())
790-
.replaceQueryParam("authoritative", false).build();
791-
if (log.isDebugEnabled()) {
792-
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
793-
clientAppId(), redirect, replCluster);
718+
URI redirect =
719+
UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
720+
.port(replClusterUrl.getPort())
721+
.replaceQueryParam("authoritative", false).build();
722+
if (log.isDebugEnabled()) {
723+
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
724+
clientAppId(), redirect, replCluster);
725+
}
726+
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
727+
});
728+
}
794729
}
795-
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
796-
}
797-
}
798-
} catch (WebApplicationException wae) {
799-
throw wae;
800-
} catch (Exception e) {
801-
throw new RestException(e);
802-
}
803-
804-
try {
805-
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
806-
authoritative, true);
807-
// directly remove from owned namespace map and ephemeral node from ZK
808-
pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
809-
pulsar().getBrokerService().getBundleStats().remove(bundle.toString());
810-
} catch (WebApplicationException wae) {
811-
log.error("validateNamespaceBundleOwnership failed with exception", wae);
812-
throw wae;
813-
} catch (Exception e) {
814-
log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
815-
bundleRange, e);
816-
throw new RestException(e);
817-
}
730+
return future
731+
.thenCompose(__ ->
732+
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles,
733+
bundleRange,
734+
authoritative, true))
735+
.thenCompose(bundle -> {
736+
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
737+
.thenCompose(topics -> {
738+
CompletableFuture<Void> deleteTopicsFuture =
739+
CompletableFuture.completedFuture(null);
740+
if (!force) {
741+
List<CompletableFuture<NamespaceBundle>> futures = new ArrayList<>();
742+
for (String topic : topics) {
743+
futures.add(pulsar().getNamespaceService()
744+
.getBundleAsync(TopicName.get(topic))
745+
.thenCompose(topicBundle -> {
746+
if (bundle.equals(topicBundle)) {
747+
throw new RestException(Status.CONFLICT,
748+
"Cannot delete non empty bundle");
749+
}
750+
return CompletableFuture.completedFuture(null);
751+
}));
752+
753+
}
754+
deleteTopicsFuture = FutureUtil.waitForAll(futures);
755+
}
756+
return deleteTopicsFuture.thenCompose(
757+
___ -> pulsar().getNamespaceService()
758+
.removeOwnedServiceUnitAsync(bundle))
759+
.thenRun(() -> pulsar().getBrokerService().getBundleStats()
760+
.remove(bundle.toString()));
761+
});
762+
});
763+
});
818764
}
819765

820766
protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {

0 commit comments

Comments
 (0)