From 3cd51d350efac153ceb072f69dc54778eb72bf99 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 6 Feb 2020 19:49:54 +0800 Subject: [PATCH 1/9] Support split bundle based on balance topics count. --- conf/broker.conf | 2 + .../broker/admin/impl/NamespacesBase.java | 4 +- .../pulsar/broker/admin/v1/Namespaces.java | 2 +- .../pulsar/broker/admin/v2/Namespaces.java | 5 +- .../BalanceTopicCountModularLoadManager.java | 31 +++ .../impl/ModularLoadManagerImpl.java | 14 +- .../impl/SimpleLoadManagerImpl.java | 12 +- .../broker/namespace/NamespaceService.java | 231 ++++++++++-------- .../pulsar/common/naming/NamespaceBundle.java | 4 + .../common/naming/NamespaceBundleFactory.java | 12 +- .../namespace/NamespaceServiceTest.java | 12 +- .../common/naming/NamespaceBundlesTest.java | 29 ++- .../pulsar/client/admin/Namespaces.java | 2 +- .../client/admin/internal/NamespacesImpl.java | 5 +- .../pulsar/admin/cli/CmdNamespaces.java | 6 +- 15 files changed, 239 insertions(+), 132 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BalanceTopicCountModularLoadManager.java diff --git a/conf/broker.conf b/conf/broker.conf index 798fbacf271ca..3f9a24b9b981b 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -691,6 +691,8 @@ loadBalancerNamespaceMaximumBundles=128 loadBalancerOverrideBrokerNicSpeedGbps= # Name of load manager to use +# loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl +# loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.BalanceTopicCountModularLoadManager loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl ### --- Replication --- ### diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 33592e6f99612..b9f7b59c04d2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -774,7 +774,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat } @SuppressWarnings("deprecation") - protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload) { + protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, boolean balanceTopicCount) { log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); validateSuperUserAccess(); @@ -793,7 +793,7 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit authoritative, true); try { - pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload).get(); + pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, balanceTopicCount).get(); log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString()); } catch (IllegalArgumentException e) { log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 99bbcd305c8de..809bd7e24567d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -446,7 +446,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("unload") @DefaultValue("false") boolean unload) { validateNamespaceName(property, cluster, namespace); - internalSplitNamespaceBundle(bundleRange, authoritative, unload); + internalSplitNamespaceBundle(bundleRange, authoritative, unload, false); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index ae691a70470df..898c35fcc4a5f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -350,9 +350,10 @@ public void unloadNamespaceBundle(@PathParam("tenant") String tenant, @PathParam public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @QueryParam("unload") @DefaultValue("false") boolean unload) { + @QueryParam("unload") @DefaultValue("false") boolean unload, + @QueryParam("balanceTopicCount") @DefaultValue("false") boolean balanceTopicCount) { validateNamespaceName(tenant, namespace); - internalSplitNamespaceBundle(bundleRange, authoritative, unload); + internalSplitNamespaceBundle(bundleRange, authoritative, unload, balanceTopicCount); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BalanceTopicCountModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BalanceTopicCountModularLoadManager.java new file mode 100644 index 0000000000000..e5545f4976669 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BalanceTopicCountModularLoadManager.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.client.admin.PulsarAdminException; + +public class BalanceTopicCountModularLoadManager extends ModularLoadManagerImpl { + + @Override + protected void internalSplitNamespaceBundle(String namespaceName, String bundleRange) throws PulsarServerException, PulsarAdminException { + pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, + pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), true); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 246e311f40f1f..eaf0d588a1b37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -169,7 +169,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach private SimpleResourceAllocationPolicies policies; // Pulsar service used to initialize this. - private PulsarService pulsar; + protected PulsarService pulsar; // Executor service used to regularly update broker data. private final ScheduledExecutorService scheduler; @@ -634,7 +634,6 @@ public void checkNamespaceBundleSplit() { || !pulsar.getLeaderElectionService().isLeader()) { return; } - final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); synchronized (bundleSplitStrategy) { final Set bundlesToBeSplit = bundleSplitStrategy.findBundlesToSplit(loadData, pulsar); NamespaceBundleFactory namespaceBundleFactory = pulsar.getNamespaceService().getNamespaceBundleFactory(); @@ -646,9 +645,9 @@ public void checkNamespaceBundleSplit() { .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) { continue; } - log.info("Load-manager splitting bundle {} and unloading {}", bundleName, unloadSplitBundles); - pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, - unloadSplitBundles); + log.info("Load-manager splitting bundle {} and unloading {}", bundleName, + pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled()); + internalSplitNamespaceBundle(namespaceName, bundleRange); // Make sure the same bundle is not selected again. loadData.getBundleData().remove(bundleName); localData.getLastStats().remove(bundleName); @@ -665,6 +664,11 @@ public void checkNamespaceBundleSplit() { } + protected void internalSplitNamespaceBundle(String namespaceName, String bundleRange) throws PulsarServerException, PulsarAdminException { + pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, + pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), false); + } + /** * When the broker data ZooKeeper nodes are updated, update the broker data map. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 14ae88a7266f9..a57181c36fdb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -49,6 +49,7 @@ import org.apache.pulsar.broker.loadbalance.PlacementStrategy; import org.apache.pulsar.broker.loadbalance.ResourceUnit; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ResourceQuota; @@ -1477,10 +1478,8 @@ public void doNamespaceBundleSplit() throws Exception { if (bundlesToBeSplit.size() > 0) { for (String bundleName : bundlesToBeSplit) { try { - pulsar.getAdminClient().namespaces().splitNamespaceBundle( - LoadManagerShared.getNamespaceNameFromBundleName(bundleName), - LoadManagerShared.getBundleRangeFromBundleName(bundleName), - pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled()); + internalSplitNamespaceBundle(LoadManagerShared.getNamespaceNameFromBundleName(bundleName), + LoadManagerShared.getBundleRangeFromBundleName(bundleName)); log.info("Successfully split namespace bundle {}", bundleName); } catch (Exception e) { log.error("Failed to split namespace bundle {}", bundleName, e); @@ -1490,6 +1489,11 @@ public void doNamespaceBundleSplit() throws Exception { } } + protected void internalSplitNamespaceBundle(String namespaceName, String bundleRange) throws PulsarServerException, PulsarAdminException { + pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, + pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), false); + } + @Override public void stop() throws PulsarServerException { loadReportCacheZk.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 0770a4d8bde52..9ae4a92dc7d27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -72,6 +72,7 @@ import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -636,128 +637,152 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio * @return * @throws Exception */ - public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean unload) + public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean unload, boolean balanceTopicCount) throws Exception { final CompletableFuture unloadFuture = new CompletableFuture<>(); final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); - splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture); + splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, balanceTopicCount); return unloadFuture; } + CompletableFuture getSplitKeyForBalanceTopicCountAsync(NamespaceBundle bundle, boolean balanceTopicCount) { + if (!balanceTopicCount) { + return CompletableFuture.completedFuture(null); + } else { + return getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> { + List topicNameHashList = new ArrayList<>(topics.size()); + for (String topic : topics) { + topicNameHashList.add(bundle.getNamespaceBundleFactory().getLongHashCode(topic)); + } + Collections.sort(topicNameHashList); + long splitStart = topicNameHashList.get(Math.max((topicNameHashList.size() / 2) - 1, 0)); + long splitEnd = topicNameHashList.get(topicNameHashList.size() / 2); + long splitMiddle = splitStart + (splitEnd - splitStart) / 2; + return CompletableFuture.completedFuture(splitMiddle); + }); + } + } + void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, boolean unload, AtomicInteger counter, - CompletableFuture unloadFuture) { - CompletableFuture> updateFuture = new CompletableFuture<>(); - - final Pair> splittedBundles = bundleFactory.splitBundles(bundle, - 2 /* by default split into 2 */); - - // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. - if (splittedBundles != null) { - checkNotNull(splittedBundles.getLeft()); - checkNotNull(splittedBundles.getRight()); - checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles"); - NamespaceName nsname = bundle.getNamespaceObject(); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}", - nsname.toString(), bundle.getBundleRange(), counter.get(), - splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles", - splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles"); - } - try { - // take ownership of newly split bundles - for (NamespaceBundle sBundle : splittedBundles.getRight()) { - checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle)); - } - updateNamespaceBundles(nsname, splittedBundles.getLeft(), - (rc, path, zkCtx, stat) -> { - if (rc == Code.OK.intValue()) { - // invalidate cache as zookeeper has new split - // namespace bundle - bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); - - updateFuture.complete(splittedBundles.getRight()); - } else if (rc == Code.BADVERSION.intValue()) { - KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc)); - String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " + - "due to %s, counter: %d", - nsname.toString(), bundle.getBundleRange(), - keeperException.getMessage(), counter.get()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServerMetadataException(keeperException)); - } else { - String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", - nsname.toString(), bundle.getBundleRange(), - KeeperException.create(KeeperException.Code.get(rc)).getMessage()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + CompletableFuture unloadFuture, + boolean balanceTopicCount) { + getSplitKeyForBalanceTopicCountAsync(bundle, balanceTopicCount).whenComplete((splitKey, ex) -> { + CompletableFuture> updateFuture = new CompletableFuture<>(); + if (ex == null) { + final Pair> splittedBundles = bundleFactory.splitBundles(bundle, + 2 /* by default split into 2 */, splitKey); + + // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. + if (splittedBundles != null) { + checkNotNull(splittedBundles.getLeft()); + checkNotNull(splittedBundles.getRight()); + checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles"); + NamespaceName nsname = bundle.getNamespaceObject(); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}", + nsname.toString(), bundle.getBundleRange(), counter.get(), + splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles", + splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles"); + } + try { + // take ownership of newly split bundles + for (NamespaceBundle sBundle : splittedBundles.getRight()) { + checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle)); } - }); - } catch (Exception e) { - String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s", - nsname.toString(), e.getMessage()); - LOG.warn(msg, e); - updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - } else { - String msg = format("bundle %s not found under namespace", bundle.toString()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - - // If success updateNamespaceBundles, then do invalidateBundleCache and unload. - // Else retry splitAndOwnBundleOnceAndRetry. - updateFuture.whenCompleteAsync((r, t)-> { - if (t != null) { - // retry several times on BadVersion - if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { - pulsar.getOrderedExecutor() - .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture)); + updateNamespaceBundles(nsname, splittedBundles.getLeft(), + (rc, path, zkCtx, stat) -> { + if (rc == Code.OK.intValue()) { + // invalidate cache as zookeeper has new split + // namespace bundle + bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); + + updateFuture.complete(splittedBundles.getRight()); + } else if (rc == Code.BADVERSION.intValue()) { + KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc)); + String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " + + "due to %s, counter: %d", + nsname.toString(), bundle.getBundleRange(), + keeperException.getMessage(), counter.get()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServerMetadataException(keeperException)); + } else { + String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", + nsname.toString(), bundle.getBundleRange(), + KeeperException.create(KeeperException.Code.get(rc)).getMessage()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + }); + } catch (Exception e) { + String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s", + nsname.toString(), e.getMessage()); + LOG.warn(msg, e); + updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } } else { - // Retry enough, or meet other exception - String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", - bundle.toString(), counter.get(), t.getMessage()); - LOG.warn(msg2); - unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2)); + String msg = format("bundle %s not found under namespace", bundle.toString()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } - return; + } else { + updateFuture.completeExceptionally(ex); } - // success updateNamespaceBundles - try { - // disable old bundle in memory - getOwnershipCache().updateBundleState(bundle, false); - - // update bundled_topic cache for load-report-generation - pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); - loadManager.get().setLoadReportForceUpdateFlag(); - - if (unload) { - // unload new split bundles - r.forEach(splitBundle -> { - try { - unloadNamespaceBundle(splitBundle); - } catch (Exception e) { - LOG.warn("Failed to unload split bundle {}", splitBundle, e); - throw new RuntimeException("Failed to unload split bundle " + splitBundle, e); - } - }); + // If success updateNamespaceBundles, then do invalidateBundleCache and unload. + // Else retry splitAndOwnBundleOnceAndRetry. + updateFuture.whenCompleteAsync((r, t)-> { + if (t != null) { + // retry several times on BadVersion + if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { + pulsar.getOrderedExecutor() + .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, balanceTopicCount)); + } else { + // Retry enough, or meet other exception + String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", + bundle.toString(), counter.get(), t.getMessage()); + LOG.warn(msg2); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2)); + } + return; } - unloadFuture.complete(null); - } catch (Exception e) { - String msg1 = format( - "failed to disable bundle %s under namespace [%s] with error %s", - bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage()); - LOG.warn(msg1, e); - unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1)); - } - return; - }, pulsar.getOrderedExecutor()); + // success updateNamespaceBundles + try { + // disable old bundle in memory + getOwnershipCache().updateBundleState(bundle, false); + + // update bundled_topic cache for load-report-generation + pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); + loadManager.get().setLoadReportForceUpdateFlag(); + + if (unload) { + // unload new split bundles + r.forEach(splitBundle -> { + try { + unloadNamespaceBundle(splitBundle); + } catch (Exception e) { + LOG.warn("Failed to unload split bundle {}", splitBundle, e); + throw new RuntimeException("Failed to unload split bundle " + splitBundle, e); + } + }); + } + + unloadFuture.complete(null); + } catch (Exception e) { + String msg1 = format( + "failed to disable bundle %s under namespace [%s] with error %s", + bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage()); + LOG.warn(msg1, e); + unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1)); + } + return; + }, pulsar.getOrderedExecutor()); + }); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java index 43cfaa0a58804..c79f6bdbe204a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java @@ -153,4 +153,8 @@ public void setHasNonPersistentTopic(boolean hasNonPersistentTopic) { public static String getBundleRange(String namespaceBundle) { return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1); } + + public NamespaceBundleFactory getNamespaceBundleFactory() { + return factory; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index bdfccf3cf4a7e..a7a52f67089c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -221,11 +221,19 @@ public static BundlesData getBundlesData(NamespaceBundles bundles) throws Except * {@link NamespaceBundle} needs to be split * @param numBundles * split into numBundles + * @param splitKey + * split into 2 numBundles by the given split key. The given split key must between the key range of the + * given split bundle. * @return List of split {@link NamespaceBundle} and {@link NamespaceBundles} that contains final bundles including * split bundles for a given namespace */ - public Pair> splitBundles(NamespaceBundle targetBundle, int numBundles) { + public Pair> splitBundles(NamespaceBundle targetBundle, int numBundles, Long splitKey) { checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle); + if (splitKey != null) { + checkArgument(splitKey > targetBundle.getLowerEndpoint() && splitKey < targetBundle.getUpperEndpoint(), + "The given fixed key must between the key range of the %s bundle", targetBundle); + numBundles = 2; + } checkNotNull(targetBundle, "can't split null bundle"); checkNotNull(targetBundle.getNamespaceObject(), "namespace must be present"); NamespaceName nsname = targetBundle.getNamespaceObject(); @@ -243,7 +251,7 @@ public Pair> splitBundles(NamespaceBundl splitPartition = i; Long maxVal = sourceBundle.partitions[i + 1]; Long minVal = sourceBundle.partitions[i]; - Long segSize = (maxVal - minVal) / numBundles; + Long segSize = splitKey == null ? (maxVal - minVal) / numBundles : splitKey - minVal; partitions[pos++] = minVal; Long curPartition = minVal + segSize; for (int j = 0; j < numBundles - 1; j++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index bde8dc1e8c7d6..c282079631226 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -110,7 +110,7 @@ public void testSplitAndOwnBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(topicName); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, false); try { result.get(); @@ -190,7 +190,7 @@ public void testSplitMapWithRefreshedStatMap() throws Exception { assertNotNull(list); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, false); try { result.get(); } catch (Exception e) { @@ -383,7 +383,7 @@ public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(topicName); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, false); try { result.get(); @@ -448,7 +448,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); NamespaceBundle originalBundle = bundles.findBundle(topicName); - CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false); + CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false, false); try { result1.get(); } catch (Exception e) { @@ -467,7 +467,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { } }); - CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true); + CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, false); try { result2.get(); } catch (Exception e) { @@ -483,7 +483,7 @@ private Pair> splitBundles(NamespaceBund bCacheField.setAccessible(true); ((AsyncLoadingCache) bCacheField.get(utilityFactory)).put(nsname, CompletableFuture.completedFuture(bundles)); - return utilityFactory.splitBundles(targetBundle, 2); + return utilityFactory.splitBundles(targetBundle, 2, null); } private static final Logger log = LoggerFactory.getLogger(NamespaceServiceTest.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 4093899bf811b..2fa9fd2df6c9e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -182,7 +182,7 @@ public void testsplitBundles() throws Exception { NamespaceBundle bundle = bundles.findBundle(topicName); final int numberSplitBundles = 4; // (1) split in 4 - Pair> splitBundles = factory.splitBundles(bundle, numberSplitBundles); + Pair> splitBundles = factory.splitBundles(bundle, numberSplitBundles, null); // existing_no_bundles(1) + // additional_new_split_bundle(4) - // parent_target_bundle(1) @@ -225,7 +225,7 @@ public void testSplitBundleInTwo() throws Exception { NamespaceBundles bundles = factory.getBundles(nsname); NamespaceBundle bundle = bundles.findBundle(topicName); // (1) split : [0x00000000,0xffffffff] => [0x00000000_0x7fffffff,0x7fffffff_0xffffffff] - Pair> splitBundles = factory.splitBundles(bundle, NO_BUNDLES); + Pair> splitBundles = factory.splitBundles(bundle, NO_BUNDLES, null); assertNotNull(splitBundles); assertBundleDivideInTwo(bundle, splitBundles.getRight(), NO_BUNDLES); @@ -248,6 +248,29 @@ public void testSplitBundleInTwo() throws Exception { } + @Test + public void testSplitBundleByFixKey() throws Exception { + NamespaceName nsname = NamespaceName.get("pulsar/global/ns1"); + NamespaceBundles bundles = factory.getBundles(nsname); + NamespaceBundle bundleToSplit = bundles.getBundles().get(0); + + try { + factory.splitBundles(bundleToSplit, 0, bundleToSplit.getLowerEndpoint()); + } catch (IllegalArgumentException e) { + //No-op + } + try { + factory.splitBundles(bundleToSplit, 0, bundleToSplit.getUpperEndpoint()); + } catch (IllegalArgumentException e) { + //No-op + } + + Long fixKey = bundleToSplit.getLowerEndpoint() + 10; + Pair> splitBundles = factory.splitBundles(bundleToSplit, 0, fixKey); + assertEquals(splitBundles.getRight().get(0).getLowerEndpoint(), bundleToSplit.getLowerEndpoint()); + assertEquals(splitBundles.getRight().get(1).getLowerEndpoint().longValue(), bundleToSplit.getLowerEndpoint() + fixKey); + } + private void validateSplitBundlesRange(NamespaceBundle fullBundle, List splitBundles) { assertNotNull(fullBundle); assertNotNull(splitBundles); @@ -267,7 +290,7 @@ private Pair> splitBundlesUtilFactory(Na bCacheField.setAccessible(true); ((AsyncLoadingCache) bCacheField.get(utilityFactory)).put(nsname, CompletableFuture.completedFuture(bundles)); - return utilityFactory.splitBundles(targetBundle, numBundles); + return utilityFactory.splitBundles(targetBundle, numBundles, null); } private void assertBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundle bundle, diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 0d69ce42ff581..ef3e459e95e66 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -929,7 +929,7 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi * @throws PulsarAdminException * Unexpected error */ - void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles) throws PulsarAdminException; + void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, boolean balanceTopicCount) throws PulsarAdminException; /** * Set message-publish-rate (topics under this namespace can publish this many messages per second) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7832d0510072a..3d7f8096a15a3 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -526,12 +526,13 @@ public CompletableFuture unloadNamespaceBundleAsync(String namespace, Stri } @Override - public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles) + public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, boolean balanceTopicCount) throws PulsarAdminException { try { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, bundle, "split"); - request(path.queryParam("unload", Boolean.toString(unloadSplitBundles))) + request(path.queryParam("unload", Boolean.toString(unloadSplitBundles)) + .queryParam("balanceTopicCount", Boolean.toString(balanceTopicCount))) .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 6159fa90e28ac..792fd85d881c8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -513,10 +513,14 @@ private class SplitBundle extends CliCommand { "-u" }, description = "Unload newly split bundles after splitting old bundle", required = false) private boolean unload; + @Parameter(names = { "--balance-topic-count", + "-btc" }, description = "Balance topic count for split bundles", required = false) + private boolean balanceTopicCount; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - admin.namespaces().splitNamespaceBundle(namespace, bundle, unload); + admin.namespaces().splitNamespaceBundle(namespace, bundle, unload, balanceTopicCount); } } From d45673fcc7a47b6f4150b790c002ee3a84fa4b0d Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 6 Feb 2020 20:47:27 +0800 Subject: [PATCH 2/9] Fix tests --- .../pulsar/broker/admin/AdminApiTest.java | 16 +++++++-------- .../broker/admin/v1/V1_AdminApiTest.java | 16 +++++++-------- .../broker/loadbalance/LoadBalancerTest.java | 20 +++++++++---------- .../pulsar/broker/service/TopicOwnerTest.java | 2 +- .../client/api/BrokerServiceLookupTest.java | 2 +- 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index ec2c6d08647bf..4e79ae216a2ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -995,7 +995,7 @@ public void testNamespaceSplitBundle() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, false); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1025,7 +1025,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, false); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1042,11 +1042,11 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { try { executorService.invokeAll(Arrays.asList(() -> { log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, false); return null; }, () -> { log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, false); return null; })); } catch (Exception e) { @@ -1064,19 +1064,19 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { try { executorService.invokeAll(Arrays.asList(() -> { log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, false); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, false); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, false); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, false); return null; })); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index aab5d4a2b1cf2..3edf5845f823e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -956,7 +956,7 @@ public void testNamespaceSplitBundle() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, false); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -986,7 +986,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, false); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1007,13 +1007,13 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { () -> { log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, false); return null; }, () -> { log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, false); return null; } ) @@ -1039,25 +1039,25 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { () -> { log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, false); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, false); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, false); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false); + admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, false); return null; } ) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index c3ed98597423d..7b31314507c2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -680,25 +680,25 @@ public void testNamespaceBundleAutoSplit() throws Exception { boolean isAutoUnooadSplitBundleEnabled = pulsarServices[0].getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); // verify bundles are split verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-01", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-02", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-03", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-04", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-05", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-06", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-07", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-08", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-09", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-10", "0x00000000_0x02000000", - isAutoUnooadSplitBundleEnabled); + isAutoUnooadSplitBundleEnabled, false); } /* diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index f33f28c0ad6ea..bd299057373bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -103,7 +103,7 @@ public void testConnectToInvalidateBundleCacheBroker() throws Exception { // All brokers will invalidate bundles cache after namespace bundle split pulsarAdmins[0].namespaces().splitNamespaceBundle("my-tenant/my-ns", pulsarServices[0].getNamespaceService().getBundle(TopicName.get(topic1)).getBundleRange(), - true); + true, false); PulsarClient client = PulsarClient.builder(). serviceUrl(serviceUrlForTopic1) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 1722d1633640e..76d61f33742b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -871,7 +871,7 @@ public void testSplitUnloadLookupTest() throws Exception { assertEquals(bundleInBroker2.toString(), unsplitBundle); // (5) Split the bundle for topic-1 - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, false); // (6) Broker-2 should get the watch and update bundle cache final int retry = 5; From a856f5d4eb5fc38305cffb9000106b64a705559b Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 6 Feb 2020 21:13:54 +0800 Subject: [PATCH 3/9] Fix tests --- .../java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 2bbe32281ec59..d13895ee411e9 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -309,7 +309,7 @@ void namespaces() throws Exception { verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff"); namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff")); - verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false); + verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, false); namespaces.run(split("get-backlog-quotas myprop/clust/ns1")); verify(mockNamespaces).getBacklogQuotaMap("myprop/clust/ns1"); From 9ffe4326ba4b464918017b43ea7d1633350120ac Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 6 Feb 2020 21:29:42 +0800 Subject: [PATCH 4/9] Add check for no topics in namespace, back to the default bundle split policy --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 9ae4a92dc7d27..9183f50b512e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -653,6 +653,9 @@ CompletableFuture getSplitKeyForBalanceTopicCountAsync(NamespaceBundle bun return CompletableFuture.completedFuture(null); } else { return getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> { + if (topics == null || topics.size() <= 1) { + return CompletableFuture.completedFuture(null); + } List topicNameHashList = new ArrayList<>(topics.size()); for (String topic : topics) { topicNameHashList.add(bundle.getNamespaceBundleFactory().getLongHashCode(topic)); From 07379db322d84d15499476224239312b0d38ff0e Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Sat, 8 Feb 2020 00:22:06 +0800 Subject: [PATCH 5/9] Apply comments --- conf/broker.conf | 10 +- .../pulsar/broker/ServiceConfiguration.java | 14 ++- .../apache/pulsar/PulsarBrokerStarter.java | 10 ++ .../broker/admin/impl/NamespacesBase.java | 22 +++- .../pulsar/broker/admin/v1/Namespaces.java | 3 +- .../pulsar/broker/admin/v2/Namespaces.java | 4 +- .../impl/ModularLoadManagerImpl.java | 12 +- .../impl/SimpleLoadManagerImpl.java | 11 +- .../broker/namespace/NamespaceService.java | 35 ++---- .../common/naming/NamespaceBundleFactory.java | 10 +- .../naming/NamespaceBundleSplitAlgorithm.java | 56 +++++++++ ...ngeEquallyDivideBundleSplitAlgorithm.java} | 18 +-- ...ountEquallyDivideBundleSplitAlgorithm.java | 50 ++++++++ .../pulsar/broker/admin/AdminApiTest.java | 107 ++++++++++++++++-- .../broker/admin/v1/V1_AdminApiTest.java | 16 +-- .../broker/loadbalance/LoadBalancerTest.java | 20 ++-- .../namespace/NamespaceServiceTest.java | 11 +- .../pulsar/broker/service/TopicOwnerTest.java | 2 +- .../client/api/BrokerServiceLookupTest.java | 2 +- .../common/naming/NamespaceBundlesTest.java | 8 +- .../naming/ServiceConfigurationTest.java | 2 + .../configurations/pulsar_broker_test.conf | 2 + .../pulsar/client/admin/Namespaces.java | 2 +- .../client/admin/internal/NamespacesImpl.java | 4 +- .../pulsar/admin/cli/CmdNamespaces.java | 9 +- 25 files changed, 333 insertions(+), 107 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java rename pulsar-broker/src/main/java/org/apache/pulsar/{broker/loadbalance/impl/BalanceTopicCountModularLoadManager.java => common/naming/RangeEquallyDivideBundleSplitAlgorithm.java} (58%) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java diff --git a/conf/broker.conf b/conf/broker.conf index 3f9a24b9b981b..d71d9fb5270f4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -691,10 +691,16 @@ loadBalancerNamespaceMaximumBundles=128 loadBalancerOverrideBrokerNicSpeedGbps= # Name of load manager to use -# loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl -# loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.BalanceTopicCountModularLoadManager loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl +# Supported algorithms name for namespace bundle split. +# "range_equally_divide" divides the bundle into two parts with the same hash range size. +# "topic_count_equally_divide" divides the bundle into two parts with the same topics count. +supportedNamespaceBundleSplitAlgorithms=[range_equally_divide,topic_count_equally_divide] + +# Default algorithm name for namespace bundle split +defaultNamespaceBundleSplitAlgorithm=range_equally_divide + ### --- Replication --- ### # Enable replication metrics diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2771692b82f30..2808a5f080379 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.netty.util.internal.PlatformDependent; @@ -1189,7 +1190,18 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Name of load manager to use" ) private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; - + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Supported algorithms name for namespace bundle split" + ) + private List supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide"); + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "Default algorithm name for namespace bundle split" + ) + private String defaultNamespaceBundleSplitAlgorithm = "range_equally_divide"; @FieldContext( category = CATEGORY_LOAD_BALANCER, doc = "Option to override the auto-detected network interfaces max speed" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java index c7b0bb7e9c605..37c5a1f7cb1b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java @@ -51,6 +51,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; @@ -152,6 +153,15 @@ private static class BrokerStarter { throw new IllegalArgumentException("Max message size need smaller than jvm directMemory"); } + if (!NamespaceBundleSplitAlgorithm.availableAlgorithms.containsAll(brokerConfig.getSupportedNamespaceBundleSplitAlgorithms())) { + throw new IllegalArgumentException("The given supported namespace bundle split algorithm has unavailable algorithm. " + + "Available algorithms are " + NamespaceBundleSplitAlgorithm.availableAlgorithms); + } + + if (!brokerConfig.getSupportedNamespaceBundleSplitAlgorithms().contains(brokerConfig.getDefaultNamespaceBundleSplitAlgorithm())) { + throw new IllegalArgumentException("Supported namespace bundle split algorithms must contains the default namespace bundle split algorithm"); + } + // init functions worker if (starterArguments.runFunctionsWorker || brokerConfig.isFunctionsWorkerEnabled()) { WorkerConfig workerConfig; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index b9f7b59c04d2f..5088e43594fae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -64,6 +64,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -774,7 +775,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat } @SuppressWarnings("deprecation") - protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, boolean balanceTopicCount) { + protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) { log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); validateSuperUserAccess(); @@ -792,8 +793,14 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + List supportedNamespaceBundleSplitAlgorithms = pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms(); + if (StringUtils.isNotBlank(splitAlgorithmName) && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) { + throw new RestException(Status.PRECONDITION_FAILED, + "Unsupported namespace bundle split algorithm, supported algorithms are " + supportedNamespaceBundleSplitAlgorithms); + } + try { - pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, balanceTopicCount).get(); + pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get(); log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString()); } catch (IllegalArgumentException e) { log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName, @@ -805,6 +812,17 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit } } + private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { + NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.of(pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm()); + } + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.rangeEquallyDivide; + } + return algorithm; + } + protected void internalSetPublishRate(PublishRate maxPublishMessageRate) { log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); validateSuperUserAccess(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 809bd7e24567d..ec9b1fc5da331 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -26,6 +26,7 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; @@ -446,7 +447,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("unload") @DefaultValue("false") boolean unload) { validateNamespaceName(property, cluster, namespace); - internalSplitNamespaceBundle(bundleRange, authoritative, unload, false); + internalSplitNamespaceBundle(bundleRange, authoritative, unload, NamespaceBundleSplitAlgorithm.rangeEquallyDivideName); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 898c35fcc4a5f..1043fdd4aa494 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -351,9 +351,9 @@ public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam( @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("unload") @DefaultValue("false") boolean unload, - @QueryParam("balanceTopicCount") @DefaultValue("false") boolean balanceTopicCount) { + @QueryParam("splitAlgorithmName") String splitAlgorithmName) { validateNamespaceName(tenant, namespace); - internalSplitNamespaceBundle(bundleRange, authoritative, unload, balanceTopicCount); + internalSplitNamespaceBundle(bundleRange, authoritative, unload, splitAlgorithmName); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index eaf0d588a1b37..670e7ead7c6c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -634,6 +634,7 @@ public void checkNamespaceBundleSplit() { || !pulsar.getLeaderElectionService().isLeader()) { return; } + final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); synchronized (bundleSplitStrategy) { final Set bundlesToBeSplit = bundleSplitStrategy.findBundlesToSplit(loadData, pulsar); NamespaceBundleFactory namespaceBundleFactory = pulsar.getNamespaceService().getNamespaceBundleFactory(); @@ -645,9 +646,9 @@ public void checkNamespaceBundleSplit() { .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) { continue; } - log.info("Load-manager splitting bundle {} and unloading {}", bundleName, - pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled()); - internalSplitNamespaceBundle(namespaceName, bundleRange); + log.info("Load-manager splitting bundle {} and unloading {}", bundleName, unloadSplitBundles); + pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, + unloadSplitBundles, null); // Make sure the same bundle is not selected again. loadData.getBundleData().remove(bundleName); localData.getLastStats().remove(bundleName); @@ -664,11 +665,6 @@ public void checkNamespaceBundleSplit() { } - protected void internalSplitNamespaceBundle(String namespaceName, String bundleRange) throws PulsarServerException, PulsarAdminException { - pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, - pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), false); - } - /** * When the broker data ZooKeeper nodes are updated, update the broker data map. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index a57181c36fdb1..765f6c661021d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -1478,8 +1478,10 @@ public void doNamespaceBundleSplit() throws Exception { if (bundlesToBeSplit.size() > 0) { for (String bundleName : bundlesToBeSplit) { try { - internalSplitNamespaceBundle(LoadManagerShared.getNamespaceNameFromBundleName(bundleName), - LoadManagerShared.getBundleRangeFromBundleName(bundleName)); + pulsar.getAdminClient().namespaces().splitNamespaceBundle( + LoadManagerShared.getNamespaceNameFromBundleName(bundleName), + LoadManagerShared.getBundleRangeFromBundleName(bundleName), + pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), null); log.info("Successfully split namespace bundle {}", bundleName); } catch (Exception e) { log.error("Failed to split namespace bundle {}", bundleName, e); @@ -1489,11 +1491,6 @@ public void doNamespaceBundleSplit() throws Exception { } } - protected void internalSplitNamespaceBundle(String namespaceName, String bundleRange) throws PulsarServerException, PulsarAdminException { - pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, - pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), false); - } - @Override public void stop() throws PulsarServerException { loadReportCacheZk.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 9183f50b512e7..87d57ef110747 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -637,48 +638,26 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio * @return * @throws Exception */ - public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean unload, boolean balanceTopicCount) + public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle, boolean unload, NamespaceBundleSplitAlgorithm splitAlgorithm) throws Exception { final CompletableFuture unloadFuture = new CompletableFuture<>(); final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT); - - splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, balanceTopicCount); + splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm); return unloadFuture; } - CompletableFuture getSplitKeyForBalanceTopicCountAsync(NamespaceBundle bundle, boolean balanceTopicCount) { - if (!balanceTopicCount) { - return CompletableFuture.completedFuture(null); - } else { - return getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> { - if (topics == null || topics.size() <= 1) { - return CompletableFuture.completedFuture(null); - } - List topicNameHashList = new ArrayList<>(topics.size()); - for (String topic : topics) { - topicNameHashList.add(bundle.getNamespaceBundleFactory().getLongHashCode(topic)); - } - Collections.sort(topicNameHashList); - long splitStart = topicNameHashList.get(Math.max((topicNameHashList.size() / 2) - 1, 0)); - long splitEnd = topicNameHashList.get(topicNameHashList.size() / 2); - long splitMiddle = splitStart + (splitEnd - splitStart) / 2; - return CompletableFuture.completedFuture(splitMiddle); - }); - } - } - void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, boolean unload, AtomicInteger counter, CompletableFuture unloadFuture, - boolean balanceTopicCount) { - getSplitKeyForBalanceTopicCountAsync(bundle, balanceTopicCount).whenComplete((splitKey, ex) -> { + NamespaceBundleSplitAlgorithm splitAlgorithm) { + splitAlgorithm.getSplitBoundary(this, bundle).whenComplete((splitBoundary, ex) -> { CompletableFuture> updateFuture = new CompletableFuture<>(); if (ex == null) { final Pair> splittedBundles = bundleFactory.splitBundles(bundle, - 2 /* by default split into 2 */, splitKey); + 2 /* by default split into 2 */, splitBoundary); // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. if (splittedBundles != null) { @@ -743,7 +722,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, // retry several times on BadVersion if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { pulsar.getOrderedExecutor() - .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, balanceTopicCount)); + .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm)); } else { // Retry enough, or meet other exception String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index a7a52f67089c0..69ec0b719f2ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -221,16 +221,16 @@ public static BundlesData getBundlesData(NamespaceBundles bundles) throws Except * {@link NamespaceBundle} needs to be split * @param numBundles * split into numBundles - * @param splitKey + * @param splitBoundary * split into 2 numBundles by the given split key. The given split key must between the key range of the * given split bundle. * @return List of split {@link NamespaceBundle} and {@link NamespaceBundles} that contains final bundles including * split bundles for a given namespace */ - public Pair> splitBundles(NamespaceBundle targetBundle, int numBundles, Long splitKey) { + public Pair> splitBundles(NamespaceBundle targetBundle, int numBundles, Long splitBoundary) { checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split further", targetBundle); - if (splitKey != null) { - checkArgument(splitKey > targetBundle.getLowerEndpoint() && splitKey < targetBundle.getUpperEndpoint(), + if (splitBoundary != null) { + checkArgument(splitBoundary > targetBundle.getLowerEndpoint() && splitBoundary < targetBundle.getUpperEndpoint(), "The given fixed key must between the key range of the %s bundle", targetBundle); numBundles = 2; } @@ -251,7 +251,7 @@ public Pair> splitBundles(NamespaceBundl splitPartition = i; Long maxVal = sourceBundle.partitions[i + 1]; Long minVal = sourceBundle.partitions[i]; - Long segSize = splitKey == null ? (maxVal - minVal) / numBundles : splitKey - minVal; + Long segSize = splitBoundary == null ? (maxVal - minVal) / numBundles : splitBoundary - minVal; partitions[pos++] = minVal; Long curPartition = minVal + segSize; for (int j = 0; j < numBundles - 1; j++) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java new file mode 100644 index 0000000000000..a0e45f3e9485c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.namespace.NamespaceService; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Algorithm interface for namespace bundle split. + */ +public interface NamespaceBundleSplitAlgorithm { + + String rangeEquallyDivideName = "range_equally_divide"; + String topicCountEquallyDivideName = "topic_count_equally_divide"; + + List availableAlgorithms = Lists.newArrayList(rangeEquallyDivideName, topicCountEquallyDivideName); + + NamespaceBundleSplitAlgorithm rangeEquallyDivide = new RangeEquallyDivideBundleSplitAlgorithm(); + NamespaceBundleSplitAlgorithm topicCountEquallyDivide = new TopicCountEquallyDivideBundleSplitAlgorithm(); + + static NamespaceBundleSplitAlgorithm of(String algorithmName) { + if (algorithmName == null) { + return null; + } + switch (algorithmName) { + case rangeEquallyDivideName: + return rangeEquallyDivide; + case topicCountEquallyDivideName: + return topicCountEquallyDivide; + default: + return null; + } + } + + CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BalanceTopicCountModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java similarity index 58% rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BalanceTopicCountModularLoadManager.java rename to pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java index e5545f4976669..a66ac2630226e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BalanceTopicCountModularLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java @@ -16,16 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.impl; +package org.apache.pulsar.common.naming; -import org.apache.pulsar.broker.PulsarServerException; -import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.broker.namespace.NamespaceService; -public class BalanceTopicCountModularLoadManager extends ModularLoadManagerImpl { +import java.util.concurrent.CompletableFuture; + +/** + * This algorithm divides the bundle into two parts with the same hash range size. + */ +public class RangeEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm { @Override - protected void internalSplitNamespaceBundle(String namespaceName, String bundleRange) throws PulsarServerException, PulsarAdminException { - pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange, - pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), true); + public CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle) { + return CompletableFuture.completedFuture(bundle.getLowerEndpoint() + + (bundle.getUpperEndpoint() - bundle.getLowerEndpoint()) / 2); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java new file mode 100644 index 0000000000000..40b3d778fd881 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/TopicCountEquallyDivideBundleSplitAlgorithm.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import org.apache.pulsar.broker.namespace.NamespaceService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * This algorithm divides the bundle into two parts with the same topics count. + */ +public class TopicCountEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm { + + @Override + public CompletableFuture getSplitBoundary(NamespaceService service, NamespaceBundle bundle) { + return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> { + if (topics == null || topics.size() <= 1) { + return CompletableFuture.completedFuture(null); + } + List topicNameHashList = new ArrayList<>(topics.size()); + for (String topic : topics) { + topicNameHashList.add(bundle.getNamespaceBundleFactory().getLongHashCode(topic)); + } + Collections.sort(topicNameHashList); + long splitStart = topicNameHashList.get(Math.max((topicNameHashList.size() / 2) - 1, 0)); + long splitEnd = topicNameHashList.get(topicNameHashList.size() / 2); + long splitMiddle = splitStart + (splitEnd - splitStart) / 2; + return CompletableFuture.completedFuture(splitMiddle); + }); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 4e79ae216a2ab..5b9949b4ae745 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -41,6 +41,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -90,6 +91,7 @@ import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -995,7 +997,7 @@ public void testNamespaceSplitBundle() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1010,6 +1012,95 @@ public void testNamespaceSplitBundle() throws Exception { producer.close(); } + @Test + public void testNamespaceSplitBundleWithTopicCountEquallyDivideAlgorithm() throws Exception { + // Force to create a topic + final String namespace = "prop-xyz/ns1"; + List topicNames = Lists.newArrayList( + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-1").toString(), + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-2").toString()); + + List> producers = new ArrayList<>(2); + for (String topicName : topicNames) { + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producers.add(producer); + producer.send("message".getBytes()); + } + + assertTrue(admin.topics().getList(namespace).containsAll(topicNames)); + + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, + NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName); + } catch (Exception e) { + fail("split bundle shouldn't have thrown exception"); + } + NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace)); + NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(0))); + NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(1))); + assertNotEquals(bundle1, bundle2); + String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" }; + for (int i = 0; i < bundles.getBundles().size(); i++) { + assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]); + } + producers.forEach(Producer::closeAsync); + } + + @Test + public void testNamespaceSplitBundleWithInvalidAlgorithm() throws Exception { + // Force to create a topic + final String namespace = "prop-xyz/ns1"; + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, + "invalid_test"); + fail("unsupported namespace bundle split algorithm"); + } catch (PulsarAdminException ignored) { + } + } + + @Test + public void testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws Exception { + conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName); + // Force to create a topic + final String namespace = "prop-xyz/ns1"; + List topicNames = Lists.newArrayList( + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-1").toString(), + (new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-2").toString()); + + List> producers = new ArrayList<>(2); + for (String topicName : topicNames) { + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producers.add(producer); + producer.send("message".getBytes()); + } + + assertTrue(admin.topics().getList(namespace).containsAll(topicNames)); + + try { + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); + } catch (Exception e) { + fail("split bundle shouldn't have thrown exception"); + } + NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace)); + NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(0))); + NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(1))); + assertNotEquals(bundle1, bundle2); + String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" }; + for (int i = 0; i < bundles.getBundles().size(); i++) { + assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]); + } + producers.forEach(Producer::closeAsync); + conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.rangeEquallyDivideName); + } + @Test public void testNamespaceSplitBundleConcurrent() throws Exception { // Force to create a topic @@ -1025,7 +1116,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1042,11 +1133,11 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { try { executorService.invokeAll(Arrays.asList(() -> { log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, null); return null; }, () -> { log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, null); return null; })); } catch (Exception e) { @@ -1064,19 +1155,19 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { try { executorService.invokeAll(Arrays.asList(() -> { log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, null); return null; })); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 3edf5845f823e..9c7e1d47b94eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -956,7 +956,7 @@ public void testNamespaceSplitBundle() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -986,7 +986,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName)); try { - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, null); } catch (Exception e) { fail("split bundle shouldn't have thrown exception"); } @@ -1007,13 +1007,13 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { () -> { log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, null); return null; }, () -> { log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, null); return null; } ) @@ -1039,25 +1039,25 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { () -> { log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, null); return null; }, () -> { log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff "); - admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, false); + admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, null); return null; } ) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 7b31314507c2b..d52a3ecb47bb7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -680,25 +680,25 @@ public void testNamespaceBundleAutoSplit() throws Exception { boolean isAutoUnooadSplitBundleEnabled = pulsarServices[0].getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(); // verify bundles are split verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-01", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-02", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-03", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-04", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-05", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-06", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-07", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-08", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-09", "0x00000000_0x80000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-10", "0x00000000_0x02000000", - isAutoUnooadSplitBundleEnabled, false); + isAutoUnooadSplitBundleEnabled, null); } /* diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index c282079631226..ce5f519cfb2c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -110,7 +111,7 @@ public void testSplitAndOwnBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(topicName); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result.get(); @@ -190,7 +191,7 @@ public void testSplitMapWithRefreshedStatMap() throws Exception { assertNotNull(list); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result.get(); } catch (Exception e) { @@ -383,7 +384,7 @@ public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception { NamespaceBundle originalBundle = bundles.findBundle(topicName); // Split bundle and take ownership of split bundles - CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, false); + CompletableFuture result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result.get(); @@ -448,7 +449,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname); NamespaceBundle originalBundle = bundles.findBundle(topicName); - CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false, false); + CompletableFuture result1 = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result1.get(); } catch (Exception e) { @@ -467,7 +468,7 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { } }); - CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, false); + CompletableFuture result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, NamespaceBundleSplitAlgorithm.rangeEquallyDivide); try { result2.get(); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java index bd299057373bc..0ad182498b440 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java @@ -103,7 +103,7 @@ public void testConnectToInvalidateBundleCacheBroker() throws Exception { // All brokers will invalidate bundles cache after namespace bundle split pulsarAdmins[0].namespaces().splitNamespaceBundle("my-tenant/my-ns", pulsarServices[0].getNamespaceService().getBundle(TopicName.get(topic1)).getBundleRange(), - true, false); + true, null); PulsarClient client = PulsarClient.builder(). serviceUrl(serviceUrlForTopic1) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 76d61f33742b9..0e926af5a055a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -871,7 +871,7 @@ public void testSplitUnloadLookupTest() throws Exception { assertEquals(bundleInBroker2.toString(), unsplitBundle); // (5) Split the bundle for topic-1 - admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, false); + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null); // (6) Broker-2 should get the watch and update bundle cache final int retry = 5; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index 2fa9fd2df6c9e..2a2521d4e1cf0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -249,7 +249,7 @@ public void testSplitBundleInTwo() throws Exception { } @Test - public void testSplitBundleByFixKey() throws Exception { + public void testSplitBundleByFixBoundary() throws Exception { NamespaceName nsname = NamespaceName.get("pulsar/global/ns1"); NamespaceBundles bundles = factory.getBundles(nsname); NamespaceBundle bundleToSplit = bundles.getBundles().get(0); @@ -265,10 +265,10 @@ public void testSplitBundleByFixKey() throws Exception { //No-op } - Long fixKey = bundleToSplit.getLowerEndpoint() + 10; - Pair> splitBundles = factory.splitBundles(bundleToSplit, 0, fixKey); + Long fixBoundary = bundleToSplit.getLowerEndpoint() + 10; + Pair> splitBundles = factory.splitBundles(bundleToSplit, 0, fixBoundary); assertEquals(splitBundles.getRight().get(0).getLowerEndpoint(), bundleToSplit.getLowerEndpoint()); - assertEquals(splitBundles.getRight().get(1).getLowerEndpoint().longValue(), bundleToSplit.getLowerEndpoint() + fixKey); + assertEquals(splitBundles.getRight().get(1).getLowerEndpoint().longValue(), bundleToSplit.getLowerEndpoint() + fixBoundary); } private void validateSplitBundlesRange(NamespaceBundle fullBundle, List splitBundles) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index c154d29a0ef6b..24e7152856718 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -61,6 +61,8 @@ public void testInit() throws Exception { && config.getBrokerServicePort().get().equals(brokerServicePort)); assertEquals(config.getBootstrapNamespaces().get(1), "ns2"); assertEquals(config.getBrokerDeleteInactiveTopicsMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + assertEquals(config.getDefaultNamespaceBundleSplitAlgorithm(), "topic_count_equally_divide"); + assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1); } @Test diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 5d37a11ec60f4..bd966ad76f5b7 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -86,3 +86,5 @@ replicationConnectionsPerBroker=16 replicationProducerQueueSize=1000 replicatorPrefix=pulsar.repl brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up +supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] +defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index ef3e459e95e66..e0bf5d6d6fc9b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -929,7 +929,7 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi * @throws PulsarAdminException * Unexpected error */ - void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, boolean balanceTopicCount) throws PulsarAdminException; + void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException; /** * Set message-publish-rate (topics under this namespace can publish this many messages per second) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 3d7f8096a15a3..99f829290fd35 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -526,13 +526,13 @@ public CompletableFuture unloadNamespaceBundleAsync(String namespace, Stri } @Override - public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, boolean balanceTopicCount) + public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException { try { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, bundle, "split"); request(path.queryParam("unload", Boolean.toString(unloadSplitBundles)) - .queryParam("balanceTopicCount", Boolean.toString(balanceTopicCount))) + .queryParam("splitAlgorithmName", splitAlgorithmName)) .put(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 792fd85d881c8..bb98798f6e4d6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -513,14 +513,15 @@ private class SplitBundle extends CliCommand { "-u" }, description = "Unload newly split bundles after splitting old bundle", required = false) private boolean unload; - @Parameter(names = { "--balance-topic-count", - "-btc" }, description = "Balance topic count for split bundles", required = false) - private boolean balanceTopicCount; + @Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split namespace bundle.\n" + + " Valid options are: [range_equally_divide, topic_count_equally_divide].\n" + + " Default is range_equally_divide.", required = false) + private String splitAlgorithmName; @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - admin.namespaces().splitNamespaceBundle(namespace, bundle, unload, balanceTopicCount); + admin.namespaces().splitNamespaceBundle(namespace, bundle, unload, splitAlgorithmName); } } From 6bd05a0ad9212912b2edf1e7eaa1360bc879780d Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Sat, 8 Feb 2020 00:28:41 +0800 Subject: [PATCH 6/9] Apply comments --- .../pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java | 2 +- .../main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 670e7ead7c6c4..5788656bb5eea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -169,7 +169,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach private SimpleResourceAllocationPolicies policies; // Pulsar service used to initialize this. - protected PulsarService pulsar; + private PulsarService pulsar; // Executor service used to regularly update broker data. private final ScheduledExecutorService scheduler; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index bb98798f6e4d6..0899ca36d3366 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -515,7 +515,7 @@ private class SplitBundle extends CliCommand { @Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split namespace bundle.\n" + " Valid options are: [range_equally_divide, topic_count_equally_divide].\n" + - " Default is range_equally_divide.", required = false) + " Use broker side config if absent", required = false) private String splitAlgorithmName; @Override From 73c64260db4259717e75982c51a74ea0ea405b9e Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Sat, 8 Feb 2020 09:17:59 +0800 Subject: [PATCH 7/9] Fix tests --- .../java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index d13895ee411e9..edeab1b6d4d88 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -309,7 +309,7 @@ void namespaces() throws Exception { verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff"); namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff")); - verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, false); + verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, null); namespaces.run(split("get-backlog-quotas myprop/clust/ns1")); verify(mockNamespaces).getBacklogQuotaMap("myprop/clust/ns1"); From 793d6a6008c214aa8876ba575f75dea08c9c4e18 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Wed, 12 Feb 2020 00:21:42 +0800 Subject: [PATCH 8/9] Fix unit test --- .../broker/admin/impl/NamespacesBase.java | 11 +- .../broker/namespace/NamespaceService.java | 109 ++++++++++-------- 2 files changed, 66 insertions(+), 54 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 5088e43594fae..3def090ad66d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -802,10 +802,15 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit try { pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get(); log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString()); - } catch (IllegalArgumentException e) { - log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName, + } catch (ExecutionException e) { + if (e.getCause() instanceof IllegalArgumentException) { + log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName, bundleRange, e.getMessage()); - throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request"); + throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request"); + } else { + log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e); + throw new RestException(e.getCause()); + } } catch (Exception e) { log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e); throw new RestException(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 87d57ef110747..d4e90be705d99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -656,60 +656,65 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, splitAlgorithm.getSplitBoundary(this, bundle).whenComplete((splitBoundary, ex) -> { CompletableFuture> updateFuture = new CompletableFuture<>(); if (ex == null) { - final Pair> splittedBundles = bundleFactory.splitBundles(bundle, - 2 /* by default split into 2 */, splitBoundary); - - // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. - if (splittedBundles != null) { - checkNotNull(splittedBundles.getLeft()); - checkNotNull(splittedBundles.getRight()); - checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles"); - NamespaceName nsname = bundle.getNamespaceObject(); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}", - nsname.toString(), bundle.getBundleRange(), counter.get(), - splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles", - splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles"); - } - try { - // take ownership of newly split bundles - for (NamespaceBundle sBundle : splittedBundles.getRight()) { - checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle)); + final Pair> splittedBundles; + try { + splittedBundles = bundleFactory.splitBundles(bundle, + 2 /* by default split into 2 */, splitBoundary); + + // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. + if (splittedBundles != null) { + checkNotNull(splittedBundles.getLeft()); + checkNotNull(splittedBundles.getRight()); + checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles"); + NamespaceName nsname = bundle.getNamespaceObject(); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}", + nsname.toString(), bundle.getBundleRange(), counter.get(), + splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles", + splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles"); } - updateNamespaceBundles(nsname, splittedBundles.getLeft(), - (rc, path, zkCtx, stat) -> { - if (rc == Code.OK.intValue()) { - // invalidate cache as zookeeper has new split - // namespace bundle - bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); - - updateFuture.complete(splittedBundles.getRight()); - } else if (rc == Code.BADVERSION.intValue()) { - KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc)); - String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " + - "due to %s, counter: %d", - nsname.toString(), bundle.getBundleRange(), - keeperException.getMessage(), counter.get()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServerMetadataException(keeperException)); - } else { - String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", - nsname.toString(), bundle.getBundleRange(), - KeeperException.create(KeeperException.Code.get(rc)).getMessage()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); - } - }); - } catch (Exception e) { - String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s", - nsname.toString(), e.getMessage()); - LOG.warn(msg, e); + try { + // take ownership of newly split bundles + for (NamespaceBundle sBundle : splittedBundles.getRight()) { + checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle)); + } + updateNamespaceBundles(nsname, splittedBundles.getLeft(), + (rc, path, zkCtx, stat) -> { + if (rc == Code.OK.intValue()) { + // invalidate cache as zookeeper has new split + // namespace bundle + bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); + + updateFuture.complete(splittedBundles.getRight()); + } else if (rc == Code.BADVERSION.intValue()) { + KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc)); + String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " + + "due to %s, counter: %d", + nsname.toString(), bundle.getBundleRange(), + keeperException.getMessage(), counter.get()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServerMetadataException(keeperException)); + } else { + String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s", + nsname.toString(), bundle.getBundleRange(), + KeeperException.create(KeeperException.Code.get(rc)).getMessage()); + LOG.warn(msg); + updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + }); + } catch (Exception e) { + String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s", + nsname.toString(), e.getMessage()); + LOG.warn(msg, e); + updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } + } else { + String msg = format("bundle %s not found under namespace", bundle.toString()); + LOG.warn(msg); updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } - } else { - String msg = format("bundle %s not found under namespace", bundle.toString()); - LOG.warn(msg); - updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + } catch (Exception e) { + updateFuture.completeExceptionally(e); } } else { updateFuture.completeExceptionally(ex); @@ -723,6 +728,8 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) { pulsar.getOrderedExecutor() .execute(() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm)); + } else if (t instanceof IllegalArgumentException) { + unloadFuture.completeExceptionally(t); } else { // Retry enough, or meet other exception String msg2 = format(" %s not success update nsBundles, counter %d, reason %s", From 317be17decd6ecf8c98dea259eb8751d880cc179 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 13 Feb 2020 10:49:24 +0800 Subject: [PATCH 9/9] Add docs --- site2/docs/reference-configuration.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 9adfcab82177e..d72778a4d8097 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -225,6 +225,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |defaultRetentionSizeInMB| Default retention size |0| |keepAliveIntervalSeconds| How often to check whether the connections are still alive |30| |loadManagerClassName| Name of load manager to use |org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl| +|supportedNamespaceBundleSplitAlgorithms| Supported algorithms name for namespace bundle split |[range_equally_divide,topic_count_equally_divide]| +|defaultNamespaceBundleSplitAlgorithm| Default algorithm name for namespace bundle split |range_equally_divide| |managedLedgerOffloadDriver| Driver to use to offload old data to long term storage (Possible values: S3) || |managedLedgerOffloadMaxThreads| Maximum number of thread pool threads for ledger offloading |2| |managedLedgerUnackedRangesOpenCacheSetEnabled| Use Open Range-Set to cache unacknowledged messages |true|