Skip to content

Commit c35afe5

Browse files
committed
Make some methods in NamespacesBase async.
1 parent c49a977 commit c35afe5

File tree

9 files changed

+405
-289
lines changed

9 files changed

+405
-289
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java

+4
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS
9393
create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
9494
}
9595

96+
public CompletableFuture<Void> createPoliciesAsync(NamespaceName ns, Policies policies) {
97+
return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
98+
}
99+
96100
public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException {
97101
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
98102
return super.exists(path) && super.getChildren(path).isEmpty();

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public CompletableFuture<Void> updateTenantAsync(String tenantName, Function<Ten
7878
}
7979

8080
public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
81-
return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName));
81+
return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName));
8282
}
8383

8484
public List<String> getListOfNamespaces(String tenant) throws MetadataStoreException {
@@ -110,6 +110,40 @@ public List<String> getListOfNamespaces(String tenant) throws MetadataStoreExcep
110110
return namespaces;
111111
}
112112

113+
public CompletableFuture<List<String>> getListOfNamespacesAsync(String tenant) {
114+
List<String> namespaces = new ArrayList<>();
115+
// this will return a cluster in v1 and a namespace in v2
116+
CompletableFuture<List<CompletableFuture<Void>>> result = getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
117+
.thenApply(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key ->
118+
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key))
119+
.thenCompose(children -> {
120+
CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);
121+
if (children == null || children.isEmpty()) {
122+
String namespace = NamespaceName.get(tenant, key).toString();
123+
// if the length is 0 then this is probably a leftover cluster from namespace
124+
// created with the v1 admin format (prop/cluster/ns) and then deleted, so no
125+
// need to add it to the list
126+
ret = ret.thenCompose(__ -> getAsync(joinPath(BASE_POLICIES_PATH, namespace))
127+
.thenAccept(opt -> opt.map(k -> namespaces.add(namespace)))
128+
.exceptionally(ex -> {
129+
Throwable cause = FutureUtil.unwrapCompletionException(ex);
130+
if (cause instanceof MetadataStoreException
131+
.ContentDeserializationException) {
132+
return null;
133+
}
134+
throw FutureUtil.wrapToCompletionException(ex);
135+
}));
136+
} else {
137+
children.forEach(ns -> {
138+
namespaces.add(NamespaceName.get(tenant, key, ns).toString());
139+
});
140+
}
141+
return ret;
142+
})).collect(Collectors.toList())
143+
);
144+
return result.thenCompose(futures -> FutureUtil.waitForAll(futures)).thenApply(__ -> namespaces);
145+
}
146+
113147
public CompletableFuture<List<String>> getActiveNamespaces(String tenant, String cluster) {
114148
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
115149
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

+5
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,11 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
534534
}
535535
}
536536

537+
protected CompletableFuture<List<String>> getPartitionedTopicListAsync(TopicDomain topicDomain) {
538+
return namespaceResources().getPartitionedTopicResources()
539+
.listPartitionedTopicsAsync(namespaceName, topicDomain);
540+
}
541+
537542
protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
538543
try {
539544
return getPulsarResources().getTopicResources().getExistingPartitions(topicName)

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

+35-40
Original file line numberDiff line numberDiff line change
@@ -103,60 +103,55 @@
103103
import org.apache.pulsar.common.util.Codec;
104104
import org.apache.pulsar.common.util.FutureUtil;
105105
import org.apache.pulsar.metadata.api.MetadataStoreException;
106-
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
107106
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
108107
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
109108
import org.slf4j.Logger;
110109
import org.slf4j.LoggerFactory;
111110

112111
public abstract class NamespacesBase extends AdminResource {
113112

114-
protected List<String> internalGetTenantNamespaces(String tenant) {
115-
checkNotNull(tenant, "Tenant should not be null");
113+
protected CompletableFuture<List<String>> internalGetTenantNamespaces(String tenant) {
114+
if (tenant == null) {
115+
return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null"));
116+
}
116117
try {
117118
NamedEntity.checkName(tenant);
118119
} catch (IllegalArgumentException e) {
119120
log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e);
120-
throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid");
121-
}
122-
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
123-
124-
try {
125-
if (!tenantResources().tenantExists(tenant)) {
126-
throw new RestException(Status.NOT_FOUND, "Tenant not found");
127-
}
128-
129-
return tenantResources().getListOfNamespaces(tenant);
130-
} catch (Exception e) {
131-
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
132-
throw new RestException(e);
121+
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
133122
}
123+
return validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES)
124+
.thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
125+
.thenCompose(existed -> {
126+
if (!existed) {
127+
throw new RestException(Status.NOT_FOUND, "Tenant not found");
128+
}
129+
return tenantResources().getListOfNamespacesAsync(tenant);
130+
});
134131
}
135132

136-
protected void internalCreateNamespace(Policies policies) {
137-
validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
138-
validatePoliciesReadOnlyAccess();
139-
validatePolicies(namespaceName, policies);
140-
141-
try {
142-
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
143-
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
144-
if (maxNamespacesPerTenant > 0) {
145-
List<String> namespaces = tenantResources().getListOfNamespaces(namespaceName.getTenant());
146-
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
147-
throw new RestException(Status.PRECONDITION_FAILED,
148-
"Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant());
149-
}
150-
}
151-
namespaceResources().createPolicies(namespaceName, policies);
152-
log.info("[{}] Created namespace {}", clientAppId(), namespaceName);
153-
} catch (AlreadyExistsException e) {
154-
log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName);
155-
throw new RestException(Status.CONFLICT, "Namespace already exists");
156-
} catch (Exception e) {
157-
log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e);
158-
throw new RestException(e);
159-
}
133+
protected CompletableFuture<Void> internalCreateNamespace(Policies policies) {
134+
return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE)
135+
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
136+
.thenAccept(__ -> validatePolicies(namespaceName, policies))
137+
.thenCompose(__ -> {
138+
CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);
139+
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
140+
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
141+
if (maxNamespacesPerTenant > 0) {
142+
ret = tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())
143+
.thenAccept(namespaces -> {
144+
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
145+
throw new RestException(Status.PRECONDITION_FAILED,
146+
"Exceed the maximum number of namespace in tenant :"
147+
+ namespaceName.getTenant());
148+
}
149+
});
150+
}
151+
return ret;
152+
})
153+
.thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies))
154+
.thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName));
160155
}
161156

162157
protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) {

0 commit comments

Comments
 (0)