Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] make split namespaces bundle async #15886

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
Expand Down Expand Up @@ -1103,67 +1104,67 @@ public void internalUnloadNamespaceBundle(AsyncResponse asyncResponse, String bu
}

@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName, boolean authoritative,
boolean unload, String splitAlgorithmName, List<Long> splitBoundaries) {
validateSuperUserAccess();
checkNotNull(bundleName, "BundleRange should not be null");
protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundleName, boolean authoritative,
boolean unload, String splitAlgorithmName,
List<Long> splitBoundaries) {
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);

String bundleRange = getBundleRange(bundleName);

Policies policies = getNamespacePolicies(namespaceName);

if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
} else {
validateClusterOwnership(namespaceName.getCluster());
validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
}
CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);

validatePoliciesReadOnlyAccess();

List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName)) {
if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms));
}
if (splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
&& (splitBoundaries == null || splitBoundaries.size() == 0)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"With specified_positions_divide split algorithm, splitBoundaries must not be emtpy"));
}
}

NamespaceBundle nsBundle;
try {
nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, false);
} catch (Exception e) {
asyncResponse.resume(e);
return;
}
ret.thenRun(() -> {
checkNotNull(bundleName, "BundleRange should not be null");
List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName)) {
if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms);
}
if (splitAlgorithmName.equalsIgnoreCase(
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
&& (splitBoundaries == null || splitBoundaries.size() == 0)) {
throw new RestException(Status.PRECONDITION_FAILED,
"With specified_positions_divide split algorithm, splitBoundaries must not be "
+ "emtpy");
}
}
}).thenCompose(__ -> validateSuperUserAccessAsync())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return validateClusterOwnershipAsync(namespaceName.getCluster()).thenCompose(
___ -> validateClusterForTenantAsync(namespaceName.getTenant(),
namespaceName.getCluster()));
}
})
.thenCompose(__ -> getBundleRangeAsync(bundleName))
.thenCompose(bundleRange -> getNamespacePoliciesAsync(namespaceName).thenApply(
policies -> Pair.of(bundleRange, policies)))
.thenCompose(pair -> {
String bundleRange = pair.getLeft();
Policies policies = pair.getRight();
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false).thenApply(__ -> {
NamespaceBundle nsBundle =
validateNamespaceBundleRange(namespaceName, policies.bundles, bundleRange);
return nsBundle;
});
}).thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)
).exceptionally(ex -> {
if (ex.getCause() instanceof IllegalArgumentException) {
throw new RestException(Status.PRECONDITION_FAILED,
"Split bundle failed due to invalid request");
} else {
throw new RestException(ex.getCause());
}
});

pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)
.thenRun(() -> {
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
if (ex.getCause() instanceof IllegalArgumentException) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
bundleRange, ex.getMessage());
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Split bundle failed due to invalid request"));
} else {
log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, ex);
asyncResponse.resume(new RestException(ex.getCause()));
}
return null;
});
return ret;
}

protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, String bundleRange, List<String> topics) {
Expand Down Expand Up @@ -1217,21 +1218,35 @@ protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, String
}

private String getBundleRange(String bundleName) {
return sync(()->getBundleRangeAsync(bundleName));
}

private CompletableFuture<String> getBundleRangeAsync(String bundleName) {
if (BundleType.LARGEST.toString().equals(bundleName)) {
return findLargestBundleWithTopics(namespaceName).getBundleRange();
return findLargestBundleWithTopicsAsync(namespaceName).thenApply(NamespaceBundle::getBundleRange);
} else if (BundleType.HOT.toString().equals(bundleName)) {
return findHotBundle(namespaceName).getBundleRange();
return findHotBundleAsync(namespaceName).thenApply(NamespaceBundle::getBundleRange);
} else {
return bundleName;
return CompletableFuture.completedFuture(bundleName);
}
}

private NamespaceBundle findLargestBundleWithTopics(NamespaceName namespaceName) {
return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundleWithHighestTopics(namespaceName);
return sync(()->findLargestBundleWithTopicsAsync(namespaceName));
}

private CompletableFuture<NamespaceBundle> findLargestBundleWithTopicsAsync(NamespaceName namespaceName) {
return pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundleWithHighestTopicsAsync(namespaceName);
}

private NamespaceBundle findHotBundle(NamespaceName namespaceName) {
return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundleWithHighestThroughput(namespaceName);
return sync(()->findHotBundleAsync(namespaceName));
}

private CompletableFuture<NamespaceBundle> findHotBundleAsync(NamespaceName namespaceName) {
return CompletableFuture.supplyAsync(() -> pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundleWithHighestThroughput(namespaceName));
}

private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,15 +772,21 @@ public void splitNamespaceBundle(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitBoundaries") @DefaultValue("") List<Long> splitBoundaries) {
try {
validateNamespaceName(property, cluster, namespace);
internalSplitNamespaceBundle(asyncResponse, bundleRange,
authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(property, cluster, namespace);
internalSplitNamespaceBundleAsync(bundleRange,
authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME,
splitBoundaries).thenAccept(
__ -> {
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), bundleRange);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("Failed to set split NamespaceBundle {}", namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
Expand Down Expand Up @@ -701,15 +702,21 @@ public void splitNamespaceBundle(
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName,
@ApiParam("splitBoundaries") List<Long> splitBoundaries) {
try {
validateNamespaceName(tenant, namespace);
internalSplitNamespaceBundle(asyncResponse,
bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(tenant, namespace);
internalSplitNamespaceBundleAsync(bundleRange,
authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME,
splitBoundaries).thenAccept(
__ -> {
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), bundleRange);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("Failed to set split NamespaceBundle {}", namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
return null;
});
}

@GET
Expand Down
Loading