diff --git a/conf/broker.conf b/conf/broker.conf index 798fbacf271ca..d71d9fb5270f4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -693,6 +693,14 @@ loadBalancerOverrideBrokerNicSpeedGbps= # Name of load manager to use loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl +# Supported algorithms name for namespace bundle split. +# "range_equally_divide" divides the bundle into two parts with the same hash range size. +# "topic_count_equally_divide" divides the bundle into two parts with the same topics count. +supportedNamespaceBundleSplitAlgorithms=[range_equally_divide,topic_count_equally_divide] + +# Default algorithm name for namespace bundle split +defaultNamespaceBundleSplitAlgorithm=range_equally_divide + ### --- Replication --- ### # Enable replication metrics diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2771692b82f30..2808a5f080379 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.netty.util.internal.PlatformDependent; @@ -1189,7 +1190,18 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Name of load manager to use" ) private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; - + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Supported algorithms name for namespace bundle split" + ) + private List supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide"); + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Default algorithm name for namespace bundle split" + ) + private String defaultNamespaceBundleSplitAlgorithm = "range_equally_divide"; @FieldContext( category = CATEGORY_LOAD_BALANCER, doc = "Option to override the auto-detected network interfaces max speed" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java index c7b0bb7e9c605..37c5a1f7cb1b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java @@ -51,6 +51,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; @@ -152,6 +153,15 @@ private static class BrokerStarter { throw new IllegalArgumentException("Max message size need smaller than jvm directMemory"); } + if (!NamespaceBundleSplitAlgorithm.availableAlgorithms.containsAll(brokerConfig.getSupportedNamespaceBundleSplitAlgorithms())) { + throw new IllegalArgumentException("The given supported namespace bundle split algorithm has unavailable algorithm. " + + "Available algorithms are " + NamespaceBundleSplitAlgorithm.availableAlgorithms); + } + + if (!brokerConfig.getSupportedNamespaceBundleSplitAlgorithms().contains(brokerConfig.getDefaultNamespaceBundleSplitAlgorithm())) { + throw new IllegalArgumentException("Supported namespace bundle split algorithms must contains the default namespace bundle split algorithm"); + } + // init functions worker if (starterArguments.runFunctionsWorker || brokerConfig.isFunctionsWorkerEnabled()) { WorkerConfig workerConfig; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 33592e6f99612..3def090ad66d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -64,6 +64,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -774,7 +775,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat } @SuppressWarnings("deprecation") - protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload) { + protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) { log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); validateSuperUserAccess(); @@ -792,19 +793,41 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + List supportedNamespaceBundleSplitAlgorithms = pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms(); + if (StringUtils.isNotBlank(splitAlgorithmName) && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) { + throw new RestException(Status.PRECONDITION_FAILED, + "Unsupported namespace bundle split algorithm, supported algorithms are " + supportedNamespaceBundleSplitAlgorithms); + } + try { - pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload).get(); + pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get(); log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString()); - } catch (IllegalArgumentException e) { - log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName, + } catch (ExecutionException e) { + if (e.getCause() instanceof IllegalArgumentException) { + log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName, bundleRange, e.getMessage()); - throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request"); + throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request"); + } else { + log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e); + throw new RestException(e.getCause()); + } } catch (Exception e) { log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e); throw new RestException(e); } } + private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { + NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.of(pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm()); + } + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.rangeEquallyDivide; + } + return algorithm; + } + protected void internalSetPublishRate(PublishRate maxPublishMessageRate) { log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); validateSuperUserAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 99bbcd305c8de..ec9b1fc5da331 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -26,6 +26,7 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; @@ -446,7 +447,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("unload") @DefaultValue("false") boolean unload) { validateNamespaceName(property, cluster, namespace); - internalSplitNamespaceBundle(bundleRange, authoritative, unload); + internalSplitNamespaceBundle(bundleRange, authoritative, unload, NamespaceBundleSplitAlgorithm.rangeEquallyDivideName); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index ae691a70470df..1043fdd4aa494 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -350,9 +350,10 @@ public void unloadNamespaceBundle(@PathParam("tenant") String tenant, @PathParam public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @QueryParam("unload") @DefaultValue("false") boolean unload) { + @QueryParam("unload") @DefaultValue("false") boolean unload, + @QueryParam("splitAlgorithmName") String splitAlgorithmName) { validateNamespaceName(tenant, namespace); - internalSplitNamespaceBundle(bundleRange, authoritative, unload); + internalSplitNamespaceBundle(bundleRange, authoritative, unload, splitAlgorithmName); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 246e311f40f1f..5788656bb5eea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -648,7 +648,7 @@ public void checkNamespaceBundleSplit() { } log.info("Load-manager splitting bundle {} and unloading {}", bundleName, unloadSplitBundles); pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, - unloadSplitBundles); + unloadSplitBundles, null); // Make sure the same bundle is not selected again. loadData.getBundleData().remove(bundleName); localData.getLastStats().remove(bundleName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 14ae88a7266f9..765f6c661021d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -49,6 +49,7 @@ import org.apache.pulsar.broker.loadbalance.PlacementStrategy; import org.apache.pulsar.broker.loadbalance.ResourceUnit; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ResourceQuota; @@ -1478,9 +1479,9 @@ public void doNamespaceBundleSplit() throws Exception { for (String bundleName : bundlesToBeSplit) { try { pulsar.getAdminClient().namespaces().splitNamespaceBundle( - LoadManagerShared.getNamespaceNameFromBundleName(bundleName), - LoadManagerShared.getBundleRangeFromBundleName(bundleName), - pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled()); + LoadManagerShared.getNamespaceNameFromBundleName(bundleName), + LoadManagerShared.getBundleRangeFromBundleName(bundleName), + pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), null); log.info("Successfully split namespace bundle {}", bundleName); } catch (Exception e) { log.error("Failed to split namespace bundle {}", bundleName, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 0770a4d8bde52..d4e90be705d99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -72,6 +73,7 @@ import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -636,13 +638,12 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio * @return * @throws Exception */ - public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean unload) + public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean unload, NamespaceBundleSplitAlgorithm splitAlgorithm) throws Exception { final CompletableFuture unloadFuture = new CompletableFuture<>(); final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); - - splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture); + splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm); return unloadFuture; } @@ -650,114 +651,127 @@ public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, boolean unload, AtomicInteger counter, - CompletableFuture unloadFuture) { - CompletableFuture> updateFuture = new CompletableFuture<>(); - - final Pair> splittedBundles = bundleFactory.splitBundles(bundle, - 2 /* by default split into 2 */); - - // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. - if (splittedBundles != null) { - checkNotNull(splittedBundles.getLeft()); - checkNotNull(splittedBundles.getRight()); - checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles"); - NamespaceName nsname = bundle.getNamespaceObject(); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}", - nsname.toString(), bundle.getBundleRange(), counter.get(), - splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles", - splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles"); - } - try { - // take ownership of newly split bundles - for (NamespaceBundle sBundle : splittedBundles.getRight()) { - checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle)); - } - updateNamespaceBundles(nsname, splittedBundles.getLeft(), - (rc, path, zkCtx, stat) -> { - if (rc == Code.OK.intValue()) { - // invalidate cache as zookeeper has new split - // namespace bundle - bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); - - updateFuture.complete(splittedBundles.getRight()); - } else if (rc == Code.BADVERSION.intValue()) { - KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc)); - String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " + - "due to %s, counter: %d", - nsname.toString(), bundle.getBundleRange(), - keeperException.getMessage(), counter.get()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServerMetadataException(keeperException)); - } else { - String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", - nsname.toString(), bundle.getBundleRange(), - KeeperException.create(KeeperException.Code.get(rc)).getMessage()); - LOG.warn(msg); + CompletableFuture unloadFuture, + NamespaceBundleSplitAlgorithm splitAlgorithm) { + splitAlgorithm.getSplitBoundary(this, bundle).whenComplete((splitBoundary, ex) -> { + CompletableFuture> updateFuture = new CompletableFuture<>(); + if (ex == null) { + final Pair> splittedBundles; + try { + splittedBundles = bundleFactory.splitBundles(bundle, + 2 /* by default split into 2 */, splitBoundary); + + // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. + if (splittedBundles != null) { + checkNotNull(splittedBundles.getLeft()); + checkNotNull(splittedBundles.getRight()); + checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles"); + NamespaceName nsname = bundle.getNamespaceObject(); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}", + nsname.toString(), bundle.getBundleRange(), counter.get(), + splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles", + splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles"); + } + try { + // take ownership of newly split bundles + for (NamespaceBundle sBundle : splittedBundles.getRight()) { + checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle)); + } + updateNamespaceBundles(nsname, splittedBundles.getLeft(), + (rc, path, zkCtx, stat) -> { + if (rc == Code.OK.intValue()) { + // invalidate cache as zookeeper has new split + // namespace bundle + bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); + + updateFuture.complete(splittedBundles.getRight()); + } else if (rc == Code.BADVERSION.intValue()) { + KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc)); + String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " + + "due to %s, counter: %d", + nsname.toString(), bundle.getBundleRange(), + keeperException.getMessage(), counter.get()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServerMetadataException(keeperException)); + } else { + String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", + nsname.toString(), bundle.getBundleRange(), + KeeperException.create(KeeperException.Code.get(rc)).getMessage()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + }); + } catch (Exception e) { + String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s", + nsname.toString(), e.getMessage()); + LOG.warn(msg, e); updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } - }); - } catch (Exception e) { - String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s", - nsname.toString(), e.getMessage()); - LOG.warn(msg, e); - updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - } else { - String msg = format("bundle %s not found under namespace", bundle.toString()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - - // If success updateNamespaceBundles, then do invalidateBundleCache and unload. - // Else retry splitAndOwnBundleOnceAndRetry. - updateFuture.whenCompleteAsync((r, t)-> { - if (t != null) { - // retry several times on BadVersion - if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { - pulsar.getOrderedExecutor() - .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture)); - } else { - // Retry enough, or meet other exception - String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", - bundle.toString(), counter.get(), t.getMessage()); - LOG.warn(msg2); - unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2)); + } else { + String msg = format("bundle %s not found under namespace", bundle.toString()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + } catch (Exception e) { + updateFuture.completeExceptionally(e); } - return; + } else { + updateFuture.completeExceptionally(ex); } - // success updateNamespaceBundles - try { - // disable old bundle in memory - getOwnershipCache().updateBundleState(bundle, false); + // If success updateNamespaceBundles, then do invalidateBundleCache and unload. + // Else retry splitAndOwnBundleOnceAndRetry. + updateFuture.whenCompleteAsync((r, t)-> { + if (t != null) { + // retry several times on BadVersion + if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { + pulsar.getOrderedExecutor() + .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm)); + } else if (t instanceof IllegalArgumentException) { + unloadFuture.completeExceptionally(t); + } else { + // Retry enough, or meet other exception + String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", + bundle.toString(), counter.get(), t.getMessage()); + LOG.warn(msg2); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2)); + } + return; + } - // update bundled_topic cache for load-report-generation - pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); - loadManager.get().setLoadReportForceUpdateFlag(); + // success updateNamespaceBundles + try { + // disable old bundle in memory + getOwnershipCache().updateBundleState(bundle, false); + + // update bundled_topic cache for load-report-generation + pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); + loadManager.get().setLoadReportForceUpdateFlag(); + + if (unload) { + // unload new split bundles + r.forEach(splitBundle -> { + try { + unloadNamespaceBundle(splitBundle); + } catch (Exception e) { + LOG.warn("Failed to unload split bundle {}", splitBundle, e); + throw new RuntimeException("Failed to unload split bundle " + splitBundle, e); + } + }); + } - if (unload) { - // unload new split bundles - r.forEach(splitBundle -> { - try { - unloadNamespaceBundle(splitBundle); - } catch (Exception e) { - LOG.warn("Failed to unload split bundle {}", splitBundle, e); - throw new RuntimeException("Failed to unload split bundle " + splitBundle, e); - } - }); + unloadFuture.complete(null); + } catch (Exception e) { + String msg1 = format( + "failed to disable bundle %s under namespace [%s] with error %s", + bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage()); + LOG.warn(msg1, e); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1)); } - - unloadFuture.complete(null); - } catch (Exception e) { - String msg1 = format( - "failed to disable bundle %s under namespace [%s] with error %s", - bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage()); - LOG.warn(msg1, e); - unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1)); - } - return; - }, pulsar.getOrderedExecutor()); + return; + }, pulsar.getOrderedExecutor()); + }); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java index 43cfaa0a58804..c79f6bdbe204a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java @@ -153,4 +153,8 @@ public void setHasNonPersistentTopic(boolean hasNonPersistentTopic) { public static String getBundleRange(String namespaceBundle) { return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1); } + + public NamespaceBundleFactory getNamespaceBundleFactory() { + return factory; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index bdfccf3cf4a7e..69ec0b719f2ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -221,11 +221,19 @@ public static BundlesData getBundlesData(NamespaceBundles bundles) throws Except * {@link NamespaceBundle} needs to be split * @param numBundles * split into numBundles + * @param splitBoundary + * split into 2 numBundles by the given split key. The given split key must between the key range of the + * given split bundle. * @return List of split {@link NamespaceBundle} and {@link NamespaceBundles} that contains final bundles including * split bundles for a given namespace */ - public Pair> splitBundles(NamespaceBundle targetBundle, int numBundles) { + public Pair> splitBundles(NamespaceBundle targetBundle, int numBundles, Long splitBoundary) { checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle); + if (splitBoundary != null) { + checkArgument(splitBoundary > targetBundle.getLowerEndpoint() && splitBoundary < targetBundle.getUpperEndpoint(), + "The given fixed key must between the key range of the %s bundle", targetBundle); + numBundles = 2; + } checkNotNull(targetBundle, "can't split null bundle"); checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present"); NamespaceName nsname = targetBundle.getNamespaceObject(); @@ -243,7 +251,7 @@ public Pair> splitBundles(NamespaceBundl splitPartition = i; Long maxVal = sourceBundle.partitions[i + 1]; Long minVal = sourceBundle.partitions[i]; - Long segSize = (maxVal - minVal) / numBundles; + Long segSize = splitBoundary == null ? (maxVal - minVal) / numBundles : splitBoundary - minVal; partitions[pos++] = minVal; Long curPartition = minVal + segSize; for (int j = 0; j < numBundles - 1; j++) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java new file mode 100644 index 0000000000000..a0e45f3e9485c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.namespace.NamespaceService; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Algorithm interface for namespace bundle split. + */ +public interface NamespaceBundleSplitAlgorithm { + + String rangeEquallyDivideName = "range_equally_divide"; + String topicCountEquallyDivideName = "topic_count_equally_divide"; + + List availableAlgorithms = Lists.newArrayList(rangeEquallyDivideName, topicCountEquallyDivideName); + + NamespaceBundleSplitAlgorithm rangeEquallyDivide = new RangeEquallyDivideBundleSplitAlgorithm(); + NamespaceBundleSplitAlgorithm topicCountEquallyDivide = new TopicCountEquallyDivideBundleSplitAlgorithm(); + + static NamespaceBundleSplitAlgorithm of(String algorithmName) { + if (algorithmName == null) { + return null; + } + switch (algorithmName) { + case rangeEquallyDivideName: + return rangeEquallyDivide; + case topicCountEquallyDivideName: + return topicCountEquallyDivide; + default: + return null; + } + } + + CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java new file mode 100644 index 0000000000000..a66ac2630226e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.namespace.NamespaceService; + +import java.util.concurrent.CompletableFuture; + +/** + * This algorithm divides the bundle into two parts with the same hash range size. + */ +public class RangeEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm { + + @Override + public CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle) { + return CompletableFuture.completedFuture(bundle.getLowerEndpoint() + + (bundle.getUpperEndpoint() - bundle.getLowerEndpoint()) / 2); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java new file mode 100644 index 0000000000000..40b3d778fd881 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.namespace.NamespaceService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * This algorithm divides the bundle into two parts with the same topics count. + */ +public class TopicCountEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm { + + @Override + public CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle) { + return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> { + if (topics == null || topics.size() <= 1) { + return CompletableFuture.completedFuture(null); + } + List topicNameHashList = new ArrayList<>(topics.size()); + for (String topic : topics) { + topicNameHashList.add(bundle.getNamespaceBundleFactory().getLongHashCode(topic)); + } + Collections.sort(topicNameHashList); + long splitStart = topicNameHashList.get(Math.max((topicNameHashList.size() / 2) - 1, 0)); + long splitEnd = topicNameHashList.get(topicNameHashList.size() / 2); + long splitMiddle = splitStart + (splitEnd - splitStart) / 2; + return CompletableFuture.completedFuture(splitMiddle); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index ec2c6d08647bf..5b9949b4ae745 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -41,6 +41,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -90,6 +91,7 @@ import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -995,7 +997,7 @@ public void testNamespaceSplitBundle() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1010,6 +1012,95 @@ public void testNamespaceSplitBundle() throws Exception { producer.close(); } + @Test + public void testNamespaceSplitBundleWithTopicCountEquallyDivideAlgorithm() throws Exception { + // Force to create a topic + final String namespace = "prop-xyz/ns1"; + List topicNames = Lists.newArrayList( + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-1").toString(), + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-2").toString()); + + List> producers = new ArrayList<>(2); + for (String topicName : topicNames) { + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producers.add(producer); + producer.send("message".getBytes()); + } + + assertTrue(admin.topics().getList(namespace).containsAll(topicNames)); + + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, + NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName); + } catch (Exception e) { + fail("split bundle shouldn't have thrown exception"); + } + NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace)); + NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(0))); + NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(1))); + assertNotEquals(bundle1, bundle2); + String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" }; + for (int i = 0; i < bundles.getBundles().size(); i++) { + assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]); + } + producers.forEach(Producer::closeAsync); + } + + @Test + public void testNamespaceSplitBundleWithInvalidAlgorithm() throws Exception { + // Force to create a topic + final String namespace = "prop-xyz/ns1"; + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, + "invalid_test"); + fail("unsupported namespace bundle split algorithm"); + } catch (PulsarAdminException ignored) { + } + } + + @Test + public void testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws Exception { + conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName); + // Force to create a topic + final String namespace = "prop-xyz/ns1"; + List topicNames = Lists.newArrayList( + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-1").toString(), + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-2").toString()); + + List> producers = new ArrayList<>(2); + for (String topicName : topicNames) { + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producers.add(producer); + producer.send("message".getBytes()); + } + + assertTrue(admin.topics().getList(namespace).containsAll(topicNames)); + + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); + } catch (Exception e) { + fail("split bundle shouldn't have thrown exception"); + } + NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace)); + NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(0))); + NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(1))); + assertNotEquals(bundle1, bundle2); + String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" }; + for (int i = 0; i < bundles.getBundles().size(); i++) { + assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]); + } + producers.forEach(Producer::closeAsync); + conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.rangeEquallyDivideName); + } + @Test public void testNamespaceSplitBundleConcurrent() throws Exception { // Force to create a topic @@ -1025,7 +1116,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1042,11 +1133,11 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { try { executorService.invokeAll(Arrays.asList(() -> { log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, null); return null; }, () -> { log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, null); return null; })); } catch (Exception e) { @@ -1064,19 +1155,19 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { try { executorService.invokeAll(Arrays.asList(() -> { log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, null); return null; })); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index aab5d4a2b1cf2..9c7e1d47b94eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -956,7 +956,7 @@ public void testNamespaceSplitBundle() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -986,7 +986,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1007,13 +1007,13 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { () -> { log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, null); return null; }, () -> { log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, null); return null; } ) @@ -1039,25 +1039,25 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { () -> { log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, null); return null; } ) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index c3ed98597423d..d52a3ecb47bb7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -680,25 +680,25 @@ public void testNamespaceBundleAutoSplit() throws Exception { boolean isAutoUnooadSplitBundleEnabled = pulsarServices[0].getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); // verify bundles are split verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-01", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-02", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-03", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-04", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-05", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-06", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-07", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-08", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-09", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-10", "0x00000000_0x02000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); } /* diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index bde8dc1e8c7d6..ce5f519cfb2c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -110,7 +111,7 @@ public void testSplitAndOwnBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(topicName); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result.get(); @@ -190,7 +191,7 @@ public void testSplitMapWithRefreshedStatMap() throws Exception { assertNotNull(list); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result.get(); } catch (Exception e) { @@ -383,7 +384,7 @@ public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(topicName); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result.get(); @@ -448,7 +449,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); NamespaceBundle originalBundle = bundles.findBundle(topicName); - CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result1.get(); } catch (Exception e) { @@ -467,7 +468,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { } }); - CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true); + CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result2.get(); } catch (Exception e) { @@ -483,7 +484,7 @@ private Pair> splitBundles(NamespaceBund bCacheField.setAccessible(true); ((AsyncLoadingCache) bCacheField.get(utilityFactory)).put(nsname, CompletableFuture.completedFuture(bundles)); - return utilityFactory.splitBundles(targetBundle, 2); + return utilityFactory.splitBundles(targetBundle, 2, null); } private static final Logger log = LoggerFactory.getLogger(NamespaceServiceTest.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index f33f28c0ad6ea..0ad182498b440 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -103,7 +103,7 @@ public void testConnectToInvalidateBundleCacheBroker() throws Exception { // All brokers will invalidate bundles cache after namespace bundle split pulsarAdmins[0].namespaces().splitNamespaceBundle("my-tenant/my-ns", pulsarServices[0].getNamespaceService().getBundle(TopicName.get(topic1)).getBundleRange(), - true); + true, null); PulsarClient client = PulsarClient.builder(). serviceUrl(serviceUrlForTopic1) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 1722d1633640e..0e926af5a055a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -871,7 +871,7 @@ public void testSplitUnloadLookupTest() throws Exception { assertEquals(bundleInBroker2.toString(), unsplitBundle); // (5) Split the bundle for topic-1 - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); // (6) Broker-2 should get the watch and update bundle cache final int retry = 5; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 4093899bf811b..2a2521d4e1cf0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -182,7 +182,7 @@ public void testsplitBundles() throws Exception { NamespaceBundle bundle = bundles.findBundle(topicName); final int numberSplitBundles = 4; // (1) split in 4 - Pair> splitBundles = factory.splitBundles(bundle, numberSplitBundles); + Pair> splitBundles = factory.splitBundles(bundle, numberSplitBundles, null); // existing_no_bundles(1) + // additional_new_split_bundle(4) - // parent_target_bundle(1) @@ -225,7 +225,7 @@ public void testSplitBundleInTwo() throws Exception { NamespaceBundles bundles = factory.getBundles(nsname); NamespaceBundle bundle = bundles.findBundle(topicName); // (1) split : [0x00000000,0xffffffff] => [0x00000000_0x7fffffff,0x7fffffff_0xffffffff] - Pair> splitBundles = factory.splitBundles(bundle, NO_BUNDLES); + Pair> splitBundles = factory.splitBundles(bundle, NO_BUNDLES, null); assertNotNull(splitBundles); assertBundleDivideInTwo(bundle, splitBundles.getRight(), NO_BUNDLES); @@ -248,6 +248,29 @@ public void testSplitBundleInTwo() throws Exception { } + @Test + public void testSplitBundleByFixBoundary() throws Exception { + NamespaceName nsname = NamespaceName.get("pulsar/global/ns1"); + NamespaceBundles bundles = factory.getBundles(nsname); + NamespaceBundle bundleToSplit = bundles.getBundles().get(0); + + try { + factory.splitBundles(bundleToSplit, 0, bundleToSplit.getLowerEndpoint()); + } catch (IllegalArgumentException e) { + //No-op + } + try { + factory.splitBundles(bundleToSplit, 0, bundleToSplit.getUpperEndpoint()); + } catch (IllegalArgumentException e) { + //No-op + } + + Long fixBoundary = bundleToSplit.getLowerEndpoint() + 10; + Pair> splitBundles = factory.splitBundles(bundleToSplit, 0, fixBoundary); + assertEquals(splitBundles.getRight().get(0).getLowerEndpoint(), bundleToSplit.getLowerEndpoint()); + assertEquals(splitBundles.getRight().get(1).getLowerEndpoint().longValue(), bundleToSplit.getLowerEndpoint() + fixBoundary); + } + private void validateSplitBundlesRange(NamespaceBundle fullBundle, List splitBundles) { assertNotNull(fullBundle); assertNotNull(splitBundles); @@ -267,7 +290,7 @@ private Pair> splitBundlesUtilFactory(Na bCacheField.setAccessible(true); ((AsyncLoadingCache) bCacheField.get(utilityFactory)).put(nsname, CompletableFuture.completedFuture(bundles)); - return utilityFactory.splitBundles(targetBundle, numBundles); + return utilityFactory.splitBundles(targetBundle, numBundles, null); } private void assertBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundle bundle, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index c154d29a0ef6b..24e7152856718 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -61,6 +61,8 @@ public void testInit() throws Exception { && config.getBrokerServicePort().get().equals(brokerServicePort)); assertEquals(config.getBootstrapNamespaces().get(1), "ns2"); assertEquals(config.getBrokerDeleteInactiveTopicsMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + assertEquals(config.getDefaultNamespaceBundleSplitAlgorithm(), "topic_count_equally_divide"); + assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1); } @Test diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 5d37a11ec60f4..bd966ad76f5b7 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -86,3 +86,5 @@ replicationConnectionsPerBroker=16 replicationProducerQueueSize=1000 replicatorPrefix=pulsar.repl brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up +supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] +defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 0d69ce42ff581..e0bf5d6d6fc9b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -929,7 +929,7 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi * @throws PulsarAdminException * Unexpected error */ - void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles) throws PulsarAdminException; + void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException; /** * Set message-publish-rate (topics under this namespace can publish this many messages per second) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7832d0510072a..99f829290fd35 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -526,12 +526,13 @@ public CompletableFuture unloadNamespaceBundleAsync(String namespace, Stri } @Override - public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles) + public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException { try { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, bundle, "split"); - request(path.queryParam("unload", Boolean.toString(unloadSplitBundles))) + request(path.queryParam("unload", Boolean.toString(unloadSplitBundles)) + .queryParam("splitAlgorithmName", splitAlgorithmName)) .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 2bbe32281ec59..edeab1b6d4d88 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -309,7 +309,7 @@ void namespaces() throws Exception { verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff"); namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff")); - verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false); + verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, null); namespaces.run(split("get-backlog-quotas myprop/clust/ns1")); verify(mockNamespaces).getBacklogQuotaMap("myprop/clust/ns1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 6159fa90e28ac..0899ca36d3366 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -513,10 +513,15 @@ private class SplitBundle extends CliCommand { "-u" }, description = "Unload newly split bundles after splitting old bundle", required = false) private boolean unload; + @Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split namespace bundle.\n" + + " Valid options are: [range_equally_divide, topic_count_equally_divide].\n" + + " Use broker side config if absent", required = false) + private String splitAlgorithmName; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - admin.namespaces().splitNamespaceBundle(namespace, bundle, unload); + admin.namespaces().splitNamespaceBundle(namespace, bundle, unload, splitAlgorithmName); } } diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 9adfcab82177e..d72778a4d8097 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -225,6 +225,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |defaultRetentionSizeInMB| Default retention size |0| |keepAliveIntervalSeconds| How often to check whether the connections are still alive |30| |loadManagerClassName| Name of load manager to use |org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl| +|supportedNamespaceBundleSplitAlgorithms| Supported algorithms name for namespace bundle split |[range_equally_divide,topic_count_equally_divide]| +|defaultNamespaceBundleSplitAlgorithm| Default algorithm name for namespace bundle split |range_equally_divide| |managedLedgerOffloadDriver| Driver to use to offload old data to long term storage (Possible values: S3) || |managedLedgerOffloadMaxThreads| Maximum number of thread pool threads for ledger offloading |2| |managedLedgerUnackedRangesOpenCacheSetEnabled| Use Open Range-Set to cache unacknowledged messages |true|