Skip to content

Commit 138ea35

Browse files
authored
[improve][broker] Make some methods in NamespacesBase async. (#15518)
### Motivation See PIP #14365 and change tracker #15043. Make `NamespacesBase` `getTenantNamespaces / createNamespace / getTopics / getPolicies / getPermissions` methods to pure async.
1 parent e6996b8 commit 138ea35

File tree

10 files changed

+453
-310
lines changed

10 files changed

+453
-310
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java

+4
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS
9393
create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
9494
}
9595

96+
public CompletableFuture<Void> createPoliciesAsync(NamespaceName ns, Policies policies) {
97+
return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
98+
}
99+
96100
public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException {
97101
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
98102
return super.exists(path) && super.getChildren(path).isEmpty();

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java

+37-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.resources;
2020

2121
import java.util.ArrayList;
22+
import java.util.Collections;
2223
import java.util.List;
2324
import java.util.Optional;
2425
import java.util.concurrent.CompletableFuture;
@@ -78,7 +79,7 @@ public CompletableFuture<Void> updateTenantAsync(String tenantName, Function<Ten
7879
}
7980

8081
public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
81-
return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName));
82+
return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName));
8283
}
8384

8485
public List<String> getListOfNamespaces(String tenant) throws MetadataStoreException {
@@ -110,6 +111,41 @@ public List<String> getListOfNamespaces(String tenant) throws MetadataStoreExcep
110111
return namespaces;
111112
}
112113

114+
public CompletableFuture<List<String>> getListOfNamespacesAsync(String tenant) {
115+
// this will return a cluster in v1 and a namespace in v2
116+
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
117+
.thenCompose(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key ->
118+
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key))
119+
.thenCompose(children -> {
120+
if (children == null || children.isEmpty()) {
121+
String namespace = NamespaceName.get(tenant, key).toString();
122+
// if the length is 0 then this is probably a leftover cluster from namespace
123+
// created with the v1 admin format (prop/cluster/ns) and then deleted, so no
124+
// need to add it to the list
125+
return getAsync(joinPath(BASE_POLICIES_PATH, namespace))
126+
.thenApply(opt -> opt.isPresent() ? Collections.singletonList(namespace)
127+
: new ArrayList<String>())
128+
.exceptionally(ex -> {
129+
Throwable cause = FutureUtil.unwrapCompletionException(ex);
130+
if (cause instanceof MetadataStoreException
131+
.ContentDeserializationException) {
132+
return new ArrayList<>();
133+
}
134+
throw FutureUtil.wrapToCompletionException(ex);
135+
});
136+
} else {
137+
CompletableFuture<List<String>> ret = new CompletableFuture();
138+
ret.complete(children.stream().map(ns -> NamespaceName.get(tenant, key, ns)
139+
.toString()).collect(Collectors.toList()));
140+
return ret;
141+
}
142+
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
143+
(accumulator, n) -> accumulator.thenCompose(namespaces -> n.thenApply(m -> {
144+
namespaces.addAll(m);
145+
return namespaces;
146+
}))));
147+
}
148+
113149
public CompletableFuture<List<String>> getActiveNamespaces(String tenant, String cluster) {
114150
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
115151
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

+10
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,11 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
323323
return FutureUtil.failedFuture(new RestException(e));
324324
}
325325
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
326+
if (policies.get().is_allow_auto_update_schema == null) {
327+
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
328+
policies.get().is_allow_auto_update_schema = pulsar().getConfig()
329+
.isAllowAutoUpdateSchemaEnabled();
330+
}
326331
return CompletableFuture.completedFuture(policies.get());
327332
});
328333
} else {
@@ -534,6 +539,11 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
534539
}
535540
}
536541

542+
protected CompletableFuture<List<String>> getPartitionedTopicListAsync(TopicDomain topicDomain) {
543+
return namespaceResources().getPartitionedTopicResources()
544+
.listPartitionedTopicsAsync(namespaceName, topicDomain);
545+
}
546+
537547
protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
538548
try {
539549
return getPulsarResources().getTopicResources().getExistingPartitions(topicName)

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

+34-40
Original file line numberDiff line numberDiff line change
@@ -103,60 +103,54 @@
103103
import org.apache.pulsar.common.util.Codec;
104104
import org.apache.pulsar.common.util.FutureUtil;
105105
import org.apache.pulsar.metadata.api.MetadataStoreException;
106-
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
107106
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
108107
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
109108
import org.slf4j.Logger;
110109
import org.slf4j.LoggerFactory;
111110

112111
public abstract class NamespacesBase extends AdminResource {
113112

114-
protected List<String> internalGetTenantNamespaces(String tenant) {
115-
checkNotNull(tenant, "Tenant should not be null");
113+
protected CompletableFuture<List<String>> internalGetTenantNamespaces(String tenant) {
114+
if (tenant == null) {
115+
return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null"));
116+
}
116117
try {
117118
NamedEntity.checkName(tenant);
118119
} catch (IllegalArgumentException e) {
119120
log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e);
120-
throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid");
121-
}
122-
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
123-
124-
try {
125-
if (!tenantResources().tenantExists(tenant)) {
126-
throw new RestException(Status.NOT_FOUND, "Tenant not found");
127-
}
128-
129-
return tenantResources().getListOfNamespaces(tenant);
130-
} catch (Exception e) {
131-
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
132-
throw new RestException(e);
121+
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
133122
}
123+
return validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES)
124+
.thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
125+
.thenCompose(existed -> {
126+
if (!existed) {
127+
throw new RestException(Status.NOT_FOUND, "Tenant not found");
128+
}
129+
return tenantResources().getListOfNamespacesAsync(tenant);
130+
});
134131
}
135132

136-
protected void internalCreateNamespace(Policies policies) {
137-
validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
138-
validatePoliciesReadOnlyAccess();
139-
validatePolicies(namespaceName, policies);
140-
141-
try {
142-
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
143-
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
144-
if (maxNamespacesPerTenant > 0) {
145-
List<String> namespaces = tenantResources().getListOfNamespaces(namespaceName.getTenant());
146-
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
147-
throw new RestException(Status.PRECONDITION_FAILED,
148-
"Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant());
149-
}
150-
}
151-
namespaceResources().createPolicies(namespaceName, policies);
152-
log.info("[{}] Created namespace {}", clientAppId(), namespaceName);
153-
} catch (AlreadyExistsException e) {
154-
log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName);
155-
throw new RestException(Status.CONFLICT, "Namespace already exists");
156-
} catch (Exception e) {
157-
log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e);
158-
throw new RestException(e);
159-
}
133+
protected CompletableFuture<Void> internalCreateNamespace(Policies policies) {
134+
return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE)
135+
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
136+
.thenAccept(__ -> validatePolicies(namespaceName, policies))
137+
.thenCompose(__ -> {
138+
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
139+
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
140+
if (maxNamespacesPerTenant > 0) {
141+
return tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())
142+
.thenAccept(namespaces -> {
143+
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
144+
throw new RestException(Status.PRECONDITION_FAILED,
145+
"Exceed the maximum number of namespace in tenant :"
146+
+ namespaceName.getTenant());
147+
}
148+
});
149+
}
150+
return CompletableFuture.completedFuture(null);
151+
})
152+
.thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies))
153+
.thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName));
160154
}
161155

162156
protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) {

0 commit comments

Comments
 (0)