Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supports evenly distribute topics count when splits bundle #6241

Merged
merged 9 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,14 @@ loadBalancerOverrideBrokerNicSpeedGbps=
# Name of load manager to use
loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl

# Supported algorithms name for namespace bundle split.
# "range_equally_divide" divides the bundle into two parts with the same hash range size.
# "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide,topic_count_equally_divide]

# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide

### --- Replication --- ###

# Enable replication metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker;


import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import io.netty.util.internal.PlatformDependent;
Expand Down Expand Up @@ -1189,7 +1190,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Name of load manager to use"
)
private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl";

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Supported algorithms name for namespace bundle split"
)
private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide");
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Default algorithm name for namespace bundle split"
)
private String defaultNamespaceBundleSplitAlgorithm = "range_equally_divide";
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Option to override the auto-detected network interfaces max speed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
Expand Down Expand Up @@ -152,6 +153,15 @@ private static class BrokerStarter {
throw new IllegalArgumentException("Max message size need smaller than jvm directMemory");
}

if (!NamespaceBundleSplitAlgorithm.availableAlgorithms.containsAll(brokerConfig.getSupportedNamespaceBundleSplitAlgorithms())) {
throw new IllegalArgumentException("The given supported namespace bundle split algorithm has unavailable algorithm. " +
"Available algorithms are " + NamespaceBundleSplitAlgorithm.availableAlgorithms);
}

if (!brokerConfig.getSupportedNamespaceBundleSplitAlgorithms().contains(brokerConfig.getDefaultNamespaceBundleSplitAlgorithm())) {
throw new IllegalArgumentException("Supported namespace bundle split algorithms must contains the default namespace bundle split algorithm");
}

// init functions worker
if (starterArguments.runFunctionsWorker || brokerConfig.isFunctionsWorkerEnabled()) {
WorkerConfig workerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -774,7 +775,7 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat
}

@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload) {
protected void internalSplitNamespaceBundle(String bundleRange, boolean authoritative, boolean unload, String splitAlgorithmName) {
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange);

validateSuperUserAccess();
Expand All @@ -792,19 +793,41 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit
NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);

List<String> supportedNamespaceBundleSplitAlgorithms = pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName) && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are " + supportedNamespaceBundleSplitAlgorithms);
}

try {
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload).get();
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
} catch (IllegalArgumentException e) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalArgumentException) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
bundleRange, e.getMessage());
throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request");
throw new RestException(Status.PRECONDITION_FAILED, "Split bundle failed due to invalid request");
} else {
log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e);
throw new RestException(e.getCause());
}
} catch (Exception e) {
log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, e);
throw new RestException(e);
}
}

private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.of(pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.rangeEquallyDivide;
}
return algorithm;
}

protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
validateSuperUserAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
Expand Down Expand Up @@ -446,7 +447,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
validateNamespaceName(property, cluster, namespace);
internalSplitNamespaceBundle(bundleRange, authoritative, unload);
internalSplitNamespaceBundle(bundleRange, authoritative, unload, NamespaceBundleSplitAlgorithm.rangeEquallyDivideName);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,10 @@ public void unloadNamespaceBundle(@PathParam("tenant") String tenant, @PathParam
public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName) {
validateNamespaceName(tenant, namespace);
internalSplitNamespaceBundle(bundleRange, authoritative, unload);
internalSplitNamespaceBundle(bundleRange, authoritative, unload, splitAlgorithmName);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public void checkNamespaceBundleSplit() {
}
log.info("Load-manager splitting bundle {} and unloading {}", bundleName, unloadSplitBundles);
pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange,
unloadSplitBundles);
unloadSplitBundles, null);
// Make sure the same bundle is not selected again.
loadData.getBundleData().remove(bundleName);
localData.getLastStats().remove(bundleName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
Expand Down Expand Up @@ -1478,9 +1479,9 @@ public void doNamespaceBundleSplit() throws Exception {
for (String bundleName : bundlesToBeSplit) {
try {
pulsar.getAdminClient().namespaces().splitNamespaceBundle(
LoadManagerShared.getNamespaceNameFromBundleName(bundleName),
LoadManagerShared.getBundleRangeFromBundleName(bundleName),
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled());
LoadManagerShared.getNamespaceNameFromBundleName(bundleName),
LoadManagerShared.getBundleRangeFromBundleName(bundleName),
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled(), null);
log.info("Successfully split namespace bundle {}", bundleName);
} catch (Exception e) {
log.error("Failed to split namespace bundle {}", bundleName, e);
Expand Down
Loading