Skip to content

Commit 147672f

Browse files
authored
[improve][broker] Make Namespaces.deleteNamespaceBundle async (apache#16287)
Master Issue: apache#14365 ### Motivation Please see apache#14365 ### Modifications * Make Namespaces.deleteNamespaceBundle async * Combine internalDeleteNamespaceBundle * Make removeOwnedServiceUnit async
1 parent 9f70219 commit 147672f

File tree

5 files changed

+131
-175
lines changed

5 files changed

+131
-175
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

+85-141
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import com.google.common.collect.Lists;
2626
import com.google.common.collect.Sets;
2727
import java.lang.reflect.Field;
28+
import java.net.MalformedURLException;
2829
import java.net.URI;
2930
import java.net.URL;
31+
import java.util.ArrayList;
3032
import java.util.Collections;
3133
import java.util.HashMap;
3234
import java.util.HashSet;
@@ -551,153 +553,95 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
551553
});
552554
}
553555

554-
protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) {
555-
if (force) {
556-
internalDeleteNamespaceBundleForcefully(bundleRange, authoritative);
557-
} else {
558-
internalDeleteNamespaceBundle(bundleRange, authoritative);
559-
}
560-
}
561-
562556
@SuppressWarnings("deprecation")
563-
protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
564-
validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
565-
validatePoliciesReadOnlyAccess();
566-
567-
// ensure that non-global namespace is directed to the correct cluster
568-
if (!namespaceName.isGlobal()) {
569-
validateClusterOwnership(namespaceName.getCluster());
570-
}
571-
572-
Policies policies = getNamespacePolicies(namespaceName);
573-
// ensure the local cluster is the only cluster for the global namespace configuration
574-
try {
575-
if (namespaceName.isGlobal()) {
576-
if (policies.replication_clusters.size() > 1) {
577-
// There are still more than one clusters configured for the global namespace
578-
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
579-
+ namespaceName + ". There are still more than one replication clusters configured.");
580-
}
581-
if (policies.replication_clusters.size() == 1
582-
&& !policies.replication_clusters.contains(config().getClusterName())) {
583-
// the only replication cluster is other cluster, redirect
584-
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
585-
ClusterData replClusterData =
586-
clusterResources().getCluster(replCluster)
587-
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
588-
"Cluster " + replCluster + " does not exist"));
589-
URL replClusterUrl;
590-
if (!config().isTlsEnabled() || !isRequestHttps()) {
591-
replClusterUrl = new URL(replClusterData.getServiceUrl());
592-
} else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
593-
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
594-
} else {
595-
throw new RestException(Status.PRECONDITION_FAILED,
596-
"The replication cluster does not provide TLS encrypted service");
597-
}
598-
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
599-
.port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
600-
if (log.isDebugEnabled()) {
601-
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
602-
clientAppId(), redirect, replCluster);
557+
protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
558+
boolean force) {
559+
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE)
560+
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
561+
.thenCompose(__ -> {
562+
if (!namespaceName.isGlobal()) {
563+
return validateClusterOwnershipAsync(namespaceName.getCluster());
603564
}
604-
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
605-
}
606-
}
607-
} catch (WebApplicationException wae) {
608-
throw wae;
609-
} catch (Exception e) {
610-
throw new RestException(e);
611-
}
612-
613-
try {
614-
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
615-
authoritative, true);
616-
List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
617-
.get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
618-
for (String topic : topics) {
619-
NamespaceBundle topicBundle = pulsar().getNamespaceService()
620-
.getBundle(TopicName.get(topic));
621-
if (bundle.equals(topicBundle)) {
622-
throw new RestException(Status.CONFLICT, "Cannot delete non empty bundle");
623-
}
624-
}
625-
626-
// remove from owned namespace map and ephemeral node from ZK
627-
pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
628-
} catch (WebApplicationException wae) {
629-
throw wae;
630-
} catch (Exception e) {
631-
log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
632-
bundleRange, e);
633-
throw new RestException(e);
634-
}
635-
}
636-
637-
@SuppressWarnings("deprecation")
638-
protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boolean authoritative) {
639-
validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
640-
validatePoliciesReadOnlyAccess();
565+
return CompletableFuture.completedFuture(null);
566+
})
567+
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
568+
.thenCompose(policies -> {
569+
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
570+
if (namespaceName.isGlobal()) {
641571

642-
// ensure that non-global namespace is directed to the correct cluster
643-
if (!namespaceName.isGlobal()) {
644-
validateClusterOwnership(namespaceName.getCluster());
645-
}
572+
if (policies.replication_clusters.size() > 1) {
573+
// There are still more than one clusters configured for the global namespace
574+
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
575+
+ namespaceName
576+
+ ". There are still more than one replication clusters configured.");
577+
}
578+
if (policies.replication_clusters.size() == 1
579+
&& !policies.replication_clusters.contains(config().getClusterName())) {
580+
// the only replication cluster is other cluster, redirect
581+
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
582+
future = clusterResources().getClusterAsync(replCluster)
583+
.thenCompose(clusterData -> {
584+
if (clusterData.isEmpty()) {
585+
throw new RestException(Status.NOT_FOUND,
586+
"Cluster " + replCluster + " does not exist");
587+
}
588+
ClusterData replClusterData = clusterData.get();
589+
URL replClusterUrl;
590+
try {
591+
if (!config().isTlsEnabled() || !isRequestHttps()) {
592+
replClusterUrl = new URL(replClusterData.getServiceUrl());
593+
} else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
594+
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
595+
} else {
596+
throw new RestException(Status.PRECONDITION_FAILED,
597+
"The replication cluster does not provide TLS encrypted "
598+
+ "service");
599+
}
600+
} catch (MalformedURLException malformedURLException) {
601+
throw new RestException(malformedURLException);
602+
}
646603

647-
Policies policies = getNamespacePolicies(namespaceName);
648-
// ensure the local cluster is the only cluster for the global namespace configuration
649-
try {
650-
if (namespaceName.isGlobal()) {
651-
if (policies.replication_clusters.size() > 1) {
652-
// There are still more than one clusters configured for the global namespace
653-
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
654-
+ namespaceName + ". There are still more than one replication clusters configured.");
655-
}
656-
if (policies.replication_clusters.size() == 1
657-
&& !policies.replication_clusters.contains(config().getClusterName())) {
658-
// the only replication cluster is other cluster, redirect
659-
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
660-
ClusterData replClusterData =
661-
clusterResources().getCluster(replCluster)
662-
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
663-
"Cluster " + replCluster + " does not exist"));
664-
URL replClusterUrl;
665-
if (!config().isTlsEnabled() || !isRequestHttps()) {
666-
replClusterUrl = new URL(replClusterData.getServiceUrl());
667-
} else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
668-
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
669-
} else {
670-
throw new RestException(Status.PRECONDITION_FAILED,
671-
"The replication cluster does not provide TLS encrypted service");
672-
}
673-
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
674-
.port(replClusterUrl.getPort())
675-
.replaceQueryParam("authoritative", false).build();
676-
if (log.isDebugEnabled()) {
677-
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
678-
clientAppId(), redirect, replCluster);
604+
URI redirect =
605+
UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
606+
.port(replClusterUrl.getPort())
607+
.replaceQueryParam("authoritative", false).build();
608+
if (log.isDebugEnabled()) {
609+
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
610+
clientAppId(), redirect, replCluster);
611+
}
612+
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
613+
});
614+
}
679615
}
680-
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
681-
}
682-
}
683-
} catch (WebApplicationException wae) {
684-
throw wae;
685-
} catch (Exception e) {
686-
throw new RestException(e);
687-
}
616+
return future.thenCompose(__ -> {
617+
NamespaceBundle bundle =
618+
validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
619+
authoritative, true);
620+
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
621+
.thenCompose(topics -> {
622+
CompletableFuture<Void> deleteTopicsFuture =
623+
CompletableFuture.completedFuture(null);
624+
if (!force) {
625+
List<CompletableFuture<NamespaceBundle>> futures = new ArrayList<>();
626+
for (String topic : topics) {
627+
futures.add(pulsar().getNamespaceService()
628+
.getBundleAsync(TopicName.get(topic))
629+
.thenCompose(topicBundle -> {
630+
if (bundle.equals(topicBundle)) {
631+
throw new RestException(Status.CONFLICT,
632+
"Cannot delete non empty bundle");
633+
}
634+
return CompletableFuture.completedFuture(null);
635+
}));
688636

689-
try {
690-
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
691-
authoritative, true);
692-
// directly remove from owned namespace map and ephemeral node from ZK
693-
pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
694-
} catch (WebApplicationException wae) {
695-
throw wae;
696-
} catch (Exception e) {
697-
log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
698-
bundleRange, e);
699-
throw new RestException(e);
700-
}
637+
}
638+
deleteTopicsFuture = FutureUtil.waitForAll(futures);
639+
}
640+
return deleteTopicsFuture.thenCompose(
641+
___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle));
642+
});
643+
});
644+
});
701645
}
702646

703647
protected CompletableFuture<Void> internalGrantPermissionOnNamespaceAsync(String role, Set<AuthAction> actions) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -259,14 +259,22 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
259259
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
260260
@ApiResponse(code = 403, message = "Don't have admin permission"),
261261
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
262-
@ApiResponse(code = 409, message = "Namespace bundle is not empty") })
263-
public void deleteNamespaceBundle(@PathParam("property") String property,
264-
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
265-
@PathParam("bundle") String bundleRange,
266-
@QueryParam("force") @DefaultValue("false") boolean force,
267-
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
268-
validateNamespaceName(property, cluster, namespace);
269-
internalDeleteNamespaceBundle(bundleRange, authoritative, force);
262+
@ApiResponse(code = 409, message = "Namespace bundle is not empty")})
263+
public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("property") String property,
264+
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
265+
@PathParam("bundle") String bundleRange,
266+
@QueryParam("force") @DefaultValue("false") boolean force,
267+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
268+
validateNamespaceName(property, cluster, namespace);
269+
internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force)
270+
.thenRun(() -> response.resume(Response.noContent().build()))
271+
.exceptionally(ex -> {
272+
if (!isRedirectException(ex)) {
273+
log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex);
274+
}
275+
resumeAsyncResponseExceptionally(response, ex);
276+
return null;
277+
});
270278
}
271279

272280
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,22 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
207207
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
208208
@ApiResponse(code = 403, message = "Don't have admin permission"),
209209
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
210-
@ApiResponse(code = 409, message = "Namespace bundle is not empty") })
211-
public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
212-
@PathParam("bundle") String bundleRange,
213-
@QueryParam("force") @DefaultValue("false") boolean force,
214-
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
210+
@ApiResponse(code = 409, message = "Namespace bundle is not empty")})
211+
public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("tenant") String tenant,
212+
@PathParam("namespace") String namespace,
213+
@PathParam("bundle") String bundleRange,
214+
@QueryParam("force") @DefaultValue("false") boolean force,
215+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
215216
validateNamespaceName(tenant, namespace);
216-
internalDeleteNamespaceBundle(bundleRange, authoritative, force);
217+
internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force)
218+
.thenRun(() -> response.resume(Response.noContent().build()))
219+
.exceptionally(ex -> {
220+
if (!isRedirectException(ex)) {
221+
log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex);
222+
}
223+
resumeAsyncResponseExceptionally(response, ex);
224+
return null;
225+
});
217226
}
218227

219228
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -1047,10 +1047,9 @@ public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
10471047
.thenCompose(ownershipCache::checkOwnershipAsync);
10481048
}
10491049

1050-
public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
1051-
ownershipCache.removeOwnership(nsBundle).get(
1052-
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
1053-
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
1050+
public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {
1051+
return ownershipCache.removeOwnership(nsBundle)
1052+
.thenRun(() -> bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
10541053
}
10551054

10561055
protected void onNamespaceBundleOwned(NamespaceBundle bundle) {

0 commit comments

Comments
 (0)