Skip to content

Commit

Permalink
[fix][broker]Create v1/namespace with Policies (apache#21171)
Browse files Browse the repository at this point in the history
Co-authored-by: vraulji <vishwadeepsinh.raulji@yahooinc.com>
  • Loading branch information
vraulji567 and vraulji authored Oct 26, 2023
1 parent 5e16ecf commit 2b5c199
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2656,4 +2656,15 @@ protected void internalEnableMigration(boolean migrated) {
throw new RestException(e);
}
}

protected Policies getDefaultPolicesIfNull(Policies policies) {
if (policies == null) {
policies = new Policies();
}
int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles();
if (policies.bundles == null) {
policies.bundles = getBundles(defaultNumberOfBundles);
}
return policies;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1722,5 +1722,41 @@ public void enableMigration(@PathParam("property") String property,
internalEnableMigration(migrated);
}

@PUT
@Path("/{property}/{cluster}/{namespace}/policy")
@ApiOperation(value = "Creates a new namespace with the specified policies")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace already exists"),
@ApiResponse(code = 412, message = "Namespace name is not valid") })
public void createNamespace(@Suspended AsyncResponse response,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@ApiParam(value = "Policies for the namespace") Policies policies) {
validateNamespaceName(property, cluster, namespace);
CompletableFuture<Void> ret;
if (!namespaceName.isGlobal()) {
// If the namespace is non global, make sure property has the access on the cluster. For global namespace,
// same check is made at the time of setting replication.
ret = validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster());
} else {
ret = CompletableFuture.completedFuture(null);
}

ret.thenApply(__ -> getDefaultPolicesIfNull(policies)).thenCompose(this::internalCreateNamespace)
.thenAccept(__ -> response.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable root = FutureUtil.unwrapCompletionException(ex);
if (root instanceof MetadataStoreException.AlreadyExistsException) {
response.resume(new RestException(Status.CONFLICT, "Namespace already exists"));
} else {
log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(response, ex);
}
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin.v2;

import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
Expand Down Expand Up @@ -1966,20 +1965,6 @@ public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String clust
return internalGetAntiAffinityNamespaces(cluster, antiAffinityGroup, tenant);
}

private Policies getDefaultPolicesIfNull(Policies policies) {
if (policies == null) {
policies = new Policies();
}

int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles();

if (policies.bundles == null) {
policies.bundles = getBundles(defaultNumberOfBundles);
}

return policies;
}

@GET
@Path("/{tenant}/{namespace}/compactionThreshold")
@ApiOperation(value = "Maximum number of uncompacted bytes in topics before compaction is triggered.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2106,4 +2106,68 @@ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS);
assertNull(topicPolicies);
}

@Test
public void testCreateNamespacesWithPolicy() throws Exception {
try {
asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "other-colo", "my-namespace",
new Policies()));
fail("should have failed");
} catch (RestException e) {
// Ok, cluster doesn't exist
assertEquals(e.getResponse().getStatus(), Status.FORBIDDEN.getStatusCode());
}

List<NamespaceName> nsnames = new ArrayList<>();
nsnames.add(NamespaceName.get(this.testTenant, "use", "create-namespace-1"));
nsnames.add(NamespaceName.get(this.testTenant, "use", "create-namespace-2"));
nsnames.add(NamespaceName.get(this.testTenant, "usc", "create-other-namespace-1"));
createTestNamespaces(nsnames, BundlesData.builder().build());

try {
asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "use", "create-namespace-1",
new Policies()));
fail("should have failed");
} catch (RestException e) {
// Ok, namespace already exists
assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());

}

try {
asyncRequests(response -> namespaces.createNamespace(response,"non-existing-tenant", "use", "create-namespace-1",
new Policies()));
fail("should have failed");
} catch (RestException e) {
// Ok, tenant doesn't exist
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}

try {
asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "use", "create-namespace-#",
new Policies()));
fail("should have failed");
} catch (RestException e) {
// Ok, invalid namespace name
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}

mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE
&& path.equals("/admin/policies/my-tenant/use/my-namespace-3");
});
try {
asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, "use", "my-namespace-3", new Policies()));
fail("should have failed");
} catch (RestException e) {
// Ok
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
}

private void createTestNamespaces(List<NamespaceName> nsnames, Policies policies) throws Exception {
for (NamespaceName nsName : nsnames) {
asyncRequests(ctx -> namespaces.createNamespace(ctx, nsName.getTenant(), nsName.getCluster(), nsName.getLocalName(), policies));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.admin.internal;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -182,9 +181,7 @@ public void createNamespace(String namespace, Policies policies) throws PulsarAd
@Override
public CompletableFuture<Void> createNamespaceAsync(String namespace, Policies policies) {
NamespaceName ns = NamespaceName.get(namespace);
checkArgument(ns.isV2(), "Create namespace with policies is only supported on newer namespaces");
WebTarget path = namespacePath(ns);
// For V2 API we pass full Policy class instance
WebTarget path = ns.isV2() ? namespacePath(ns) : namespacePath(ns, "policy");
return asyncPutRequest(path, Entity.entity(policies, MediaType.APPLICATION_JSON));
}

Expand Down

0 comments on commit 2b5c199

Please sign in to comment.