From 1c099da5445e4b40fd1b0e6bca52b11fbdeb2e6e Mon Sep 17 00:00:00 2001 From: lipenghui Date: Fri, 14 Feb 2020 05:55:55 +0800 Subject: [PATCH] Supports evenly distribute topics count when splits bundle (#6241) ### Motivation Currently, bundle split splits the bundle into two parts of the same size. When there are fewer topics, bundle split does not work well. The topic assigned to the broker according to the topic name hash value, hashing is not effective in a small number of topics bundle split. So, this PR introduces an option(-balance-topic-count) for bundle split. When setting it to true, the given bundle splits to 2 parts, each part has the same amount of topics. And introduce a new Load Manager implementation named `org.apache.pulsar.broker.loadbalance.impl.BalanceTopicCountModularLoadManager`. The new Load Manager implementation splits bundle with balance topics count, others are not different from ModularLoadManagerImpl. --- conf/broker.conf | 8 + .../pulsar/broker/ServiceConfiguration.java | 14 +- .../apache/pulsar/PulsarBrokerStarter.java | 10 + .../broker/admin/impl/NamespacesBase.java | 33 ++- .../pulsar/broker/admin/v1/Namespaces.java | 3 +- .../pulsar/broker/admin/v2/Namespaces.java | 5 +- .../impl/ModularLoadManagerImpl.java | 2 +- .../impl/SimpleLoadManagerImpl.java | 7 +- .../broker/namespace/NamespaceService.java | 220 ++++++++++-------- .../pulsar/common/naming/NamespaceBundle.java | 4 + .../common/naming/NamespaceBundleFactory.java | 12 +- .../naming/NamespaceBundleSplitAlgorithm.java | 56 +++++ ...angeEquallyDivideBundleSplitAlgorithm.java | 35 +++ ...ountEquallyDivideBundleSplitAlgorithm.java | 50 ++++ .../pulsar/broker/admin/AdminApiTest.java | 107 ++++++++- .../broker/admin/v1/V1_AdminApiTest.java | 16 +- .../broker/loadbalance/LoadBalancerTest.java | 20 +- .../namespace/NamespaceServiceTest.java | 13 +- .../pulsar/broker/service/TopicOwnerTest.java | 2 +- .../client/api/BrokerServiceLookupTest.java | 2 +- .../common/naming/NamespaceBundlesTest.java | 29 ++- .../naming/ServiceConfigurationTest.java | 2 + .../configurations/pulsar_broker_test.conf | 2 + .../pulsar/client/admin/Namespaces.java | 2 +- .../client/admin/internal/NamespacesImpl.java | 5 +- .../pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- .../pulsar/admin/cli/CmdNamespaces.java | 7 +- site2/docs/reference-configuration.md | 2 + 28 files changed, 510 insertions(+), 160 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java diff --git a/conf/broker.conf b/conf/broker.conf index 9a8de0a66ab98..a98aa16ad84dc 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -705,6 +705,14 @@ loadBalancerOverrideBrokerNicSpeedGbps= # Name of load manager to use loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl +# Supported algorithms name for namespace bundle split. +# "range_equally_divide" divides the bundle into two parts with the same hash range size. +# "topic_count_equally_divide" divides the bundle into two parts with the same topics count. +supportedNamespaceBundleSplitAlgorithms=[range_equally_divide,topic_count_equally_divide] + +# Default algorithm name for namespace bundle split +defaultNamespaceBundleSplitAlgorithm=range_equally_divide + ### --- Replication --- ### # Enable replication metrics diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f64a6fa2d7bf9..d44f84644e920 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.netty.util.internal.PlatformDependent; @@ -1204,7 +1205,18 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Name of load manager to use" ) private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; - + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Supported algorithms name for namespace bundle split" + ) + private List supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide"); + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Default algorithm name for namespace bundle split" + ) + private String defaultNamespaceBundleSplitAlgorithm = "range_equally_divide"; @FieldContext( category = CATEGORY_LOAD_BALANCER, doc = "Option to override the auto-detected network interfaces max speed" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java index c7b0bb7e9c605..37c5a1f7cb1b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java @@ -51,6 +51,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; @@ -152,6 +153,15 @@ private static class BrokerStarter { throw new IllegalArgumentException("Max message size need smaller than jvm directMemory"); } + if (!NamespaceBundleSplitAlgorithm.availableAlgorithms.containsAll(brokerConfig.getSupportedNamespaceBundleSplitAlgorithms())) { + throw new IllegalArgumentException("The given supported namespace bundle split algorithm has unavailable algorithm. " + + "Available algorithms are " + NamespaceBundleSplitAlgorithm.availableAlgorithms); + } + + if (!brokerConfig.getSupportedNamespaceBundleSplitAlgorithms().contains(brokerConfig.getDefaultNamespaceBundleSplitAlgorithm())) { + throw new IllegalArgumentException("Supported namespace bundle split algorithms must contains the default namespace bundle split algorithm"); + } + // init functions worker if (starterArguments.runFunctionsWorker || brokerConfig.isFunctionsWorkerEnabled()) { WorkerConfig workerConfig; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d2368d7c7bd20..c0ce1e090ab40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -64,6 +64,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -775,7 +776,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat } @SuppressWarnings("deprecation") - protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload) { + protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) { log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); validateSuperUserAccess(); @@ -793,19 +794,41 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + List supportedNamespaceBundleSplitAlgorithms = pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms(); + if (StringUtils.isNotBlank(splitAlgorithmName) && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) { + throw new RestException(Status.PRECONDITION_FAILED, + "Unsupported namespace bundle split algorithm, supported algorithms are " + supportedNamespaceBundleSplitAlgorithms); + } + try { - pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload).get(); + pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get(); log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString()); - } catch (IllegalArgumentException e) { - log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName, + } catch (ExecutionException e) { + if (e.getCause() instanceof IllegalArgumentException) { + log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName, bundleRange, e.getMessage()); - throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request"); + throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request"); + } else { + log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e); + throw new RestException(e.getCause()); + } } catch (Exception e) { log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e); throw new RestException(e); } } + private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { + NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.of(pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm()); + } + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.rangeEquallyDivide; + } + return algorithm; + } + protected void internalSetPublishRate(PublishRate maxPublishMessageRate) { log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); validateSuperUserAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index df4cbc5dea733..8cd0ba7afd14b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -26,6 +26,7 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; @@ -456,7 +457,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("unload") @DefaultValue("false") boolean unload) { validateNamespaceName(property, cluster, namespace); - internalSplitNamespaceBundle(bundleRange, authoritative, unload); + internalSplitNamespaceBundle(bundleRange, authoritative, unload, NamespaceBundleSplitAlgorithm.rangeEquallyDivideName); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 8efcfa19b87f8..56f07d67a6e98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -361,9 +361,10 @@ public void unloadNamespaceBundle(@PathParam("tenant") String tenant, @PathParam public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @QueryParam("unload") @DefaultValue("false") boolean unload) { + @QueryParam("unload") @DefaultValue("false") boolean unload, + @QueryParam("splitAlgorithmName") String splitAlgorithmName) { validateNamespaceName(tenant, namespace); - internalSplitNamespaceBundle(bundleRange, authoritative, unload); + internalSplitNamespaceBundle(bundleRange, authoritative, unload, splitAlgorithmName); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 246e311f40f1f..5788656bb5eea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -648,7 +648,7 @@ public void checkNamespaceBundleSplit() { } log.info("Load-manager splitting bundle {} and unloading {}", bundleName, unloadSplitBundles); pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, - unloadSplitBundles); + unloadSplitBundles, null); // Make sure the same bundle is not selected again. loadData.getBundleData().remove(bundleName); localData.getLastStats().remove(bundleName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 14ae88a7266f9..765f6c661021d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -49,6 +49,7 @@ import org.apache.pulsar.broker.loadbalance.PlacementStrategy; import org.apache.pulsar.broker.loadbalance.ResourceUnit; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ResourceQuota; @@ -1478,9 +1479,9 @@ public void doNamespaceBundleSplit() throws Exception { for (String bundleName : bundlesToBeSplit) { try { pulsar.getAdminClient().namespaces().splitNamespaceBundle( - LoadManagerShared.getNamespaceNameFromBundleName(bundleName), - LoadManagerShared.getBundleRangeFromBundleName(bundleName), - pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled()); + LoadManagerShared.getNamespaceNameFromBundleName(bundleName), + LoadManagerShared.getBundleRangeFromBundleName(bundleName), + pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), null); log.info("Successfully split namespace bundle {}", bundleName); } catch (Exception e) { log.error("Failed to split namespace bundle {}", bundleName, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 0770a4d8bde52..d4e90be705d99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -72,6 +73,7 @@ import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -636,13 +638,12 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio * @return * @throws Exception */ - public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean unload) + public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean unload, NamespaceBundleSplitAlgorithm splitAlgorithm) throws Exception { final CompletableFuture unloadFuture = new CompletableFuture<>(); final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); - - splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture); + splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm); return unloadFuture; } @@ -650,114 +651,127 @@ public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, boolean unload, AtomicInteger counter, - CompletableFuture unloadFuture) { - CompletableFuture> updateFuture = new CompletableFuture<>(); - - final Pair> splittedBundles = bundleFactory.splitBundles(bundle, - 2 /* by default split into 2 */); - - // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. - if (splittedBundles != null) { - checkNotNull(splittedBundles.getLeft()); - checkNotNull(splittedBundles.getRight()); - checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles"); - NamespaceName nsname = bundle.getNamespaceObject(); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}", - nsname.toString(), bundle.getBundleRange(), counter.get(), - splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles", - splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles"); - } - try { - // take ownership of newly split bundles - for (NamespaceBundle sBundle : splittedBundles.getRight()) { - checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle)); - } - updateNamespaceBundles(nsname, splittedBundles.getLeft(), - (rc, path, zkCtx, stat) -> { - if (rc == Code.OK.intValue()) { - // invalidate cache as zookeeper has new split - // namespace bundle - bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); - - updateFuture.complete(splittedBundles.getRight()); - } else if (rc == Code.BADVERSION.intValue()) { - KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc)); - String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " + - "due to %s, counter: %d", - nsname.toString(), bundle.getBundleRange(), - keeperException.getMessage(), counter.get()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServerMetadataException(keeperException)); - } else { - String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", - nsname.toString(), bundle.getBundleRange(), - KeeperException.create(KeeperException.Code.get(rc)).getMessage()); - LOG.warn(msg); + CompletableFuture unloadFuture, + NamespaceBundleSplitAlgorithm splitAlgorithm) { + splitAlgorithm.getSplitBoundary(this, bundle).whenComplete((splitBoundary, ex) -> { + CompletableFuture> updateFuture = new CompletableFuture<>(); + if (ex == null) { + final Pair> splittedBundles; + try { + splittedBundles = bundleFactory.splitBundles(bundle, + 2 /* by default split into 2 */, splitBoundary); + + // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. + if (splittedBundles != null) { + checkNotNull(splittedBundles.getLeft()); + checkNotNull(splittedBundles.getRight()); + checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles"); + NamespaceName nsname = bundle.getNamespaceObject(); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}", + nsname.toString(), bundle.getBundleRange(), counter.get(), + splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles", + splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles"); + } + try { + // take ownership of newly split bundles + for (NamespaceBundle sBundle : splittedBundles.getRight()) { + checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle)); + } + updateNamespaceBundles(nsname, splittedBundles.getLeft(), + (rc, path, zkCtx, stat) -> { + if (rc == Code.OK.intValue()) { + // invalidate cache as zookeeper has new split + // namespace bundle + bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); + + updateFuture.complete(splittedBundles.getRight()); + } else if (rc == Code.BADVERSION.intValue()) { + KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc)); + String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " + + "due to %s, counter: %d", + nsname.toString(), bundle.getBundleRange(), + keeperException.getMessage(), counter.get()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServerMetadataException(keeperException)); + } else { + String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", + nsname.toString(), bundle.getBundleRange(), + KeeperException.create(KeeperException.Code.get(rc)).getMessage()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + }); + } catch (Exception e) { + String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s", + nsname.toString(), e.getMessage()); + LOG.warn(msg, e); updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } - }); - } catch (Exception e) { - String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s", - nsname.toString(), e.getMessage()); - LOG.warn(msg, e); - updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - } else { - String msg = format("bundle %s not found under namespace", bundle.toString()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - - // If success updateNamespaceBundles, then do invalidateBundleCache and unload. - // Else retry splitAndOwnBundleOnceAndRetry. - updateFuture.whenCompleteAsync((r, t)-> { - if (t != null) { - // retry several times on BadVersion - if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { - pulsar.getOrderedExecutor() - .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture)); - } else { - // Retry enough, or meet other exception - String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", - bundle.toString(), counter.get(), t.getMessage()); - LOG.warn(msg2); - unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2)); + } else { + String msg = format("bundle %s not found under namespace", bundle.toString()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + } catch (Exception e) { + updateFuture.completeExceptionally(e); } - return; + } else { + updateFuture.completeExceptionally(ex); } - // success updateNamespaceBundles - try { - // disable old bundle in memory - getOwnershipCache().updateBundleState(bundle, false); + // If success updateNamespaceBundles, then do invalidateBundleCache and unload. + // Else retry splitAndOwnBundleOnceAndRetry. + updateFuture.whenCompleteAsync((r, t)-> { + if (t != null) { + // retry several times on BadVersion + if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { + pulsar.getOrderedExecutor() + .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm)); + } else if (t instanceof IllegalArgumentException) { + unloadFuture.completeExceptionally(t); + } else { + // Retry enough, or meet other exception + String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", + bundle.toString(), counter.get(), t.getMessage()); + LOG.warn(msg2); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2)); + } + return; + } - // update bundled_topic cache for load-report-generation - pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); - loadManager.get().setLoadReportForceUpdateFlag(); + // success updateNamespaceBundles + try { + // disable old bundle in memory + getOwnershipCache().updateBundleState(bundle, false); + + // update bundled_topic cache for load-report-generation + pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); + loadManager.get().setLoadReportForceUpdateFlag(); + + if (unload) { + // unload new split bundles + r.forEach(splitBundle -> { + try { + unloadNamespaceBundle(splitBundle); + } catch (Exception e) { + LOG.warn("Failed to unload split bundle {}", splitBundle, e); + throw new RuntimeException("Failed to unload split bundle " + splitBundle, e); + } + }); + } - if (unload) { - // unload new split bundles - r.forEach(splitBundle -> { - try { - unloadNamespaceBundle(splitBundle); - } catch (Exception e) { - LOG.warn("Failed to unload split bundle {}", splitBundle, e); - throw new RuntimeException("Failed to unload split bundle " + splitBundle, e); - } - }); + unloadFuture.complete(null); + } catch (Exception e) { + String msg1 = format( + "failed to disable bundle %s under namespace [%s] with error %s", + bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage()); + LOG.warn(msg1, e); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1)); } - - unloadFuture.complete(null); - } catch (Exception e) { - String msg1 = format( - "failed to disable bundle %s under namespace [%s] with error %s", - bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage()); - LOG.warn(msg1, e); - unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1)); - } - return; - }, pulsar.getOrderedExecutor()); + return; + }, pulsar.getOrderedExecutor()); + }); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java index 43cfaa0a58804..c79f6bdbe204a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java @@ -153,4 +153,8 @@ public void setHasNonPersistentTopic(boolean hasNonPersistentTopic) { public static String getBundleRange(String namespaceBundle) { return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1); } + + public NamespaceBundleFactory getNamespaceBundleFactory() { + return factory; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index bdfccf3cf4a7e..69ec0b719f2ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -221,11 +221,19 @@ public static BundlesData getBundlesData(NamespaceBundles bundles) throws Except * {@link NamespaceBundle} needs to be split * @param numBundles * split into numBundles + * @param splitBoundary + * split into 2 numBundles by the given split key. The given split key must between the key range of the + * given split bundle. * @return List of split {@link NamespaceBundle} and {@link NamespaceBundles} that contains final bundles including * split bundles for a given namespace */ - public Pair> splitBundles(NamespaceBundle targetBundle, int numBundles) { + public Pair> splitBundles(NamespaceBundle targetBundle, int numBundles, Long splitBoundary) { checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle); + if (splitBoundary != null) { + checkArgument(splitBoundary > targetBundle.getLowerEndpoint() && splitBoundary < targetBundle.getUpperEndpoint(), + "The given fixed key must between the key range of the %s bundle", targetBundle); + numBundles = 2; + } checkNotNull(targetBundle, "can't split null bundle"); checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present"); NamespaceName nsname = targetBundle.getNamespaceObject(); @@ -243,7 +251,7 @@ public Pair> splitBundles(NamespaceBundl splitPartition = i; Long maxVal = sourceBundle.partitions[i + 1]; Long minVal = sourceBundle.partitions[i]; - Long segSize = (maxVal - minVal) / numBundles; + Long segSize = splitBoundary == null ? (maxVal - minVal) / numBundles : splitBoundary - minVal; partitions[pos++] = minVal; Long curPartition = minVal + segSize; for (int j = 0; j < numBundles - 1; j++) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java new file mode 100644 index 0000000000000..a0e45f3e9485c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.namespace.NamespaceService; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Algorithm interface for namespace bundle split. + */ +public interface NamespaceBundleSplitAlgorithm { + + String rangeEquallyDivideName = "range_equally_divide"; + String topicCountEquallyDivideName = "topic_count_equally_divide"; + + List availableAlgorithms = Lists.newArrayList(rangeEquallyDivideName, topicCountEquallyDivideName); + + NamespaceBundleSplitAlgorithm rangeEquallyDivide = new RangeEquallyDivideBundleSplitAlgorithm(); + NamespaceBundleSplitAlgorithm topicCountEquallyDivide = new TopicCountEquallyDivideBundleSplitAlgorithm(); + + static NamespaceBundleSplitAlgorithm of(String algorithmName) { + if (algorithmName == null) { + return null; + } + switch (algorithmName) { + case rangeEquallyDivideName: + return rangeEquallyDivide; + case topicCountEquallyDivideName: + return topicCountEquallyDivide; + default: + return null; + } + } + + CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java new file mode 100644 index 0000000000000..a66ac2630226e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.namespace.NamespaceService; + +import java.util.concurrent.CompletableFuture; + +/** + * This algorithm divides the bundle into two parts with the same hash range size. + */ +public class RangeEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm { + + @Override + public CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle) { + return CompletableFuture.completedFuture(bundle.getLowerEndpoint() + + (bundle.getUpperEndpoint() - bundle.getLowerEndpoint()) / 2); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java new file mode 100644 index 0000000000000..40b3d778fd881 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.namespace.NamespaceService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * This algorithm divides the bundle into two parts with the same topics count. + */ +public class TopicCountEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm { + + @Override + public CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle) { + return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> { + if (topics == null || topics.size() <= 1) { + return CompletableFuture.completedFuture(null); + } + List topicNameHashList = new ArrayList<>(topics.size()); + for (String topic : topics) { + topicNameHashList.add(bundle.getNamespaceBundleFactory().getLongHashCode(topic)); + } + Collections.sort(topicNameHashList); + long splitStart = topicNameHashList.get(Math.max((topicNameHashList.size() / 2) - 1, 0)); + long splitEnd = topicNameHashList.get(topicNameHashList.size() / 2); + long splitMiddle = splitStart + (splitEnd - splitStart) / 2; + return CompletableFuture.completedFuture(splitMiddle); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index ec2c6d08647bf..5b9949b4ae745 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -41,6 +41,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -90,6 +91,7 @@ import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -995,7 +997,7 @@ public void testNamespaceSplitBundle() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1010,6 +1012,95 @@ public void testNamespaceSplitBundle() throws Exception { producer.close(); } + @Test + public void testNamespaceSplitBundleWithTopicCountEquallyDivideAlgorithm() throws Exception { + // Force to create a topic + final String namespace = "prop-xyz/ns1"; + List topicNames = Lists.newArrayList( + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-1").toString(), + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-2").toString()); + + List> producers = new ArrayList<>(2); + for (String topicName : topicNames) { + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producers.add(producer); + producer.send("message".getBytes()); + } + + assertTrue(admin.topics().getList(namespace).containsAll(topicNames)); + + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, + NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName); + } catch (Exception e) { + fail("split bundle shouldn't have thrown exception"); + } + NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace)); + NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(0))); + NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(1))); + assertNotEquals(bundle1, bundle2); + String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" }; + for (int i = 0; i < bundles.getBundles().size(); i++) { + assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]); + } + producers.forEach(Producer::closeAsync); + } + + @Test + public void testNamespaceSplitBundleWithInvalidAlgorithm() throws Exception { + // Force to create a topic + final String namespace = "prop-xyz/ns1"; + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, + "invalid_test"); + fail("unsupported namespace bundle split algorithm"); + } catch (PulsarAdminException ignored) { + } + } + + @Test + public void testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws Exception { + conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName); + // Force to create a topic + final String namespace = "prop-xyz/ns1"; + List topicNames = Lists.newArrayList( + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-1").toString(), + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-2").toString()); + + List> producers = new ArrayList<>(2); + for (String topicName : topicNames) { + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producers.add(producer); + producer.send("message".getBytes()); + } + + assertTrue(admin.topics().getList(namespace).containsAll(topicNames)); + + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); + } catch (Exception e) { + fail("split bundle shouldn't have thrown exception"); + } + NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace)); + NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(0))); + NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(1))); + assertNotEquals(bundle1, bundle2); + String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" }; + for (int i = 0; i < bundles.getBundles().size(); i++) { + assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]); + } + producers.forEach(Producer::closeAsync); + conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.rangeEquallyDivideName); + } + @Test public void testNamespaceSplitBundleConcurrent() throws Exception { // Force to create a topic @@ -1025,7 +1116,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1042,11 +1133,11 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { try { executorService.invokeAll(Arrays.asList(() -> { log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, null); return null; }, () -> { log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, null); return null; })); } catch (Exception e) { @@ -1064,19 +1155,19 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { try { executorService.invokeAll(Arrays.asList(() -> { log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, null); return null; })); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index aab5d4a2b1cf2..9c7e1d47b94eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -956,7 +956,7 @@ public void testNamespaceSplitBundle() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -986,7 +986,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1007,13 +1007,13 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { () -> { log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, null); return null; }, () -> { log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, null); return null; } ) @@ -1039,25 +1039,25 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { () -> { log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, null); return null; } ) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index c3ed98597423d..d52a3ecb47bb7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -680,25 +680,25 @@ public void testNamespaceBundleAutoSplit() throws Exception { boolean isAutoUnooadSplitBundleEnabled = pulsarServices[0].getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); // verify bundles are split verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-01", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-02", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-03", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-04", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-05", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-06", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-07", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-08", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-09", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-10", "0x00000000_0x02000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, null); } /* diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index bde8dc1e8c7d6..ce5f519cfb2c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -110,7 +111,7 @@ public void testSplitAndOwnBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(topicName); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result.get(); @@ -190,7 +191,7 @@ public void testSplitMapWithRefreshedStatMap() throws Exception { assertNotNull(list); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result.get(); } catch (Exception e) { @@ -383,7 +384,7 @@ public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(topicName); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result.get(); @@ -448,7 +449,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); NamespaceBundle originalBundle = bundles.findBundle(topicName); - CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result1.get(); } catch (Exception e) { @@ -467,7 +468,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { } }); - CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true); + CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result2.get(); } catch (Exception e) { @@ -483,7 +484,7 @@ private Pair> splitBundles(NamespaceBund bCacheField.setAccessible(true); ((AsyncLoadingCache) bCacheField.get(utilityFactory)).put(nsname, CompletableFuture.completedFuture(bundles)); - return utilityFactory.splitBundles(targetBundle, 2); + return utilityFactory.splitBundles(targetBundle, 2, null); } private static final Logger log = LoggerFactory.getLogger(NamespaceServiceTest.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index f33f28c0ad6ea..0ad182498b440 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -103,7 +103,7 @@ public void testConnectToInvalidateBundleCacheBroker() throws Exception { // All brokers will invalidate bundles cache after namespace bundle split pulsarAdmins[0].namespaces().splitNamespaceBundle("my-tenant/my-ns", pulsarServices[0].getNamespaceService().getBundle(TopicName.get(topic1)).getBundleRange(), - true); + true, null); PulsarClient client = PulsarClient.builder(). serviceUrl(serviceUrlForTopic1) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 1722d1633640e..0e926af5a055a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -871,7 +871,7 @@ public void testSplitUnloadLookupTest() throws Exception { assertEquals(bundleInBroker2.toString(), unsplitBundle); // (5) Split the bundle for topic-1 - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); // (6) Broker-2 should get the watch and update bundle cache final int retry = 5; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 4093899bf811b..2a2521d4e1cf0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -182,7 +182,7 @@ public void testsplitBundles() throws Exception { NamespaceBundle bundle = bundles.findBundle(topicName); final int numberSplitBundles = 4; // (1) split in 4 - Pair> splitBundles = factory.splitBundles(bundle, numberSplitBundles); + Pair> splitBundles = factory.splitBundles(bundle, numberSplitBundles, null); // existing_no_bundles(1) + // additional_new_split_bundle(4) - // parent_target_bundle(1) @@ -225,7 +225,7 @@ public void testSplitBundleInTwo() throws Exception { NamespaceBundles bundles = factory.getBundles(nsname); NamespaceBundle bundle = bundles.findBundle(topicName); // (1) split : [0x00000000,0xffffffff] => [0x00000000_0x7fffffff,0x7fffffff_0xffffffff] - Pair> splitBundles = factory.splitBundles(bundle, NO_BUNDLES); + Pair> splitBundles = factory.splitBundles(bundle, NO_BUNDLES, null); assertNotNull(splitBundles); assertBundleDivideInTwo(bundle, splitBundles.getRight(), NO_BUNDLES); @@ -248,6 +248,29 @@ public void testSplitBundleInTwo() throws Exception { } + @Test + public void testSplitBundleByFixBoundary() throws Exception { + NamespaceName nsname = NamespaceName.get("pulsar/global/ns1"); + NamespaceBundles bundles = factory.getBundles(nsname); + NamespaceBundle bundleToSplit = bundles.getBundles().get(0); + + try { + factory.splitBundles(bundleToSplit, 0, bundleToSplit.getLowerEndpoint()); + } catch (IllegalArgumentException e) { + //No-op + } + try { + factory.splitBundles(bundleToSplit, 0, bundleToSplit.getUpperEndpoint()); + } catch (IllegalArgumentException e) { + //No-op + } + + Long fixBoundary = bundleToSplit.getLowerEndpoint() + 10; + Pair> splitBundles = factory.splitBundles(bundleToSplit, 0, fixBoundary); + assertEquals(splitBundles.getRight().get(0).getLowerEndpoint(), bundleToSplit.getLowerEndpoint()); + assertEquals(splitBundles.getRight().get(1).getLowerEndpoint().longValue(), bundleToSplit.getLowerEndpoint() + fixBoundary); + } + private void validateSplitBundlesRange(NamespaceBundle fullBundle, List splitBundles) { assertNotNull(fullBundle); assertNotNull(splitBundles); @@ -267,7 +290,7 @@ private Pair> splitBundlesUtilFactory(Na bCacheField.setAccessible(true); ((AsyncLoadingCache) bCacheField.get(utilityFactory)).put(nsname, CompletableFuture.completedFuture(bundles)); - return utilityFactory.splitBundles(targetBundle, numBundles); + return utilityFactory.splitBundles(targetBundle, numBundles, null); } private void assertBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundle bundle, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index c154d29a0ef6b..24e7152856718 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -61,6 +61,8 @@ public void testInit() throws Exception { && config.getBrokerServicePort().get().equals(brokerServicePort)); assertEquals(config.getBootstrapNamespaces().get(1), "ns2"); assertEquals(config.getBrokerDeleteInactiveTopicsMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + assertEquals(config.getDefaultNamespaceBundleSplitAlgorithm(), "topic_count_equally_divide"); + assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1); } @Test diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 5d37a11ec60f4..bd966ad76f5b7 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -86,3 +86,5 @@ replicationConnectionsPerBroker=16 replicationProducerQueueSize=1000 replicatorPrefix=pulsar.repl brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up +supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] +defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 3ffbf8971edc5..bfd4b7bf93fed 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -930,7 +930,7 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi * @throws PulsarAdminException * Unexpected error */ - void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles) throws PulsarAdminException; + void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException; /** * Set message-publish-rate (topics under this namespace can publish this many messages per second) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index bd29057e65178..3b60fa278cd76 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -527,12 +527,13 @@ public CompletableFuture unloadNamespaceBundleAsync(String namespace, Stri } @Override - public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles) + public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException { try { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, bundle, "split"); - request(path.queryParam("unload", Boolean.toString(unloadSplitBundles))) + request(path.queryParam("unload", Boolean.toString(unloadSplitBundles)) + .queryParam("splitAlgorithmName", splitAlgorithmName)) .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 453f543b1803d..371f752138a3e 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -310,7 +310,7 @@ void namespaces() throws Exception { verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff"); namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff")); - verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false); + verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, null); namespaces.run(split("get-backlog-quotas myprop/clust/ns1")); verify(mockNamespaces).getBacklogQuotaMap("myprop/clust/ns1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index ab8b2264aeb86..d77b8ec187311 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -515,10 +515,15 @@ private class SplitBundle extends CliCommand { "-u" }, description = "Unload newly split bundles after splitting old bundle", required = false) private boolean unload; + @Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split namespace bundle.\n" + + " Valid options are: [range_equally_divide, topic_count_equally_divide].\n" + + " Use broker side config if absent", required = false) + private String splitAlgorithmName; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - admin.namespaces().splitNamespaceBundle(namespace, bundle, unload); + admin.namespaces().splitNamespaceBundle(namespace, bundle, unload, splitAlgorithmName); } } diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 9adfcab82177e..d72778a4d8097 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -225,6 +225,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |defaultRetentionSizeInMB| Default retention size |0| |keepAliveIntervalSeconds| How often to check whether the connections are still alive |30| |loadManagerClassName| Name of load manager to use |org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl| +|supportedNamespaceBundleSplitAlgorithms| Supported algorithms name for namespace bundle split |[range_equally_divide,topic_count_equally_divide]| +|defaultNamespaceBundleSplitAlgorithm| Default algorithm name for namespace bundle split |range_equally_divide| |managedLedgerOffloadDriver| Driver to use to offload old data to long term storage (Possible values: S3) || |managedLedgerOffloadMaxThreads| Maximum number of thread pool threads for ledger offloading |2| |managedLedgerUnackedRangesOpenCacheSetEnabled| Use Open Range-Set to cache unacknowledged messages |true|