Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Write the child ownership to ServiceUnitStateChannel instead of ZK when handling bundle split #18858

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor getSplitBoundary method
Demogorgon314 committed Feb 6, 2023
commit 563da58ce0608abc01ac763bdf56e589f2e3c25d
Original file line number Diff line number Diff line change
@@ -1028,7 +1028,9 @@ protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundl
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false))
.thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));
pulsar().getNamespaceService()
.getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
splitBoundaries));
});
}

@@ -1109,18 +1111,6 @@ private CompletableFuture<NamespaceBundle> findHotBundleAsync(NamespaceName name
.getBundleWithHighestThroughputAsync(namespaceName);
}

private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.of(
pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
}
return algorithm;
}

protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
Original file line number Diff line number Diff line change
@@ -828,18 +828,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
CompletableFuture<Void> completionFuture,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
BundleSplitOption bundleSplitOption;
if (config.getDefaultNamespaceBundleSplitAlgorithm()
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
topicStatsMap,
config.getLoadBalancerNamespaceBundleMaxMsgRate(),
config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
config.getFlowOrQpsDifferenceThresholdPercentage());
} else {
bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
}
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);

splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
@@ -967,21 +956,9 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
*/
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplitBoundary(
NamespaceBundle bundle, List<Long> boundaries) {
ServiceConfiguration config = pulsar.getConfig();
BundleSplitOption bundleSplitOption;
if (config.getDefaultNamespaceBundleSplitAlgorithm()
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
topicStatsMap,
config.getLoadBalancerNamespaceBundleMaxMsgRate(),
config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
config.getFlowOrQpsDifferenceThresholdPercentage());
} else {
bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
}
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm =
NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm());
getNamespaceBundleSplitAlgorithmByName(config.getDefaultNamespaceBundleSplitAlgorithm());
CompletableFuture<List<Long>> splitBoundary =
nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
return splitBoundary.thenCompose(splitBoundaries -> {
@@ -995,6 +972,35 @@ public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplit
});
}

private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
List<Long> boundaries,
ServiceConfiguration config) {
BundleSplitOption bundleSplitOption;
if (config.getDefaultNamespaceBundleSplitAlgorithm()
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
topicStatsMap,
config.getLoadBalancerNamespaceBundleMaxMsgRate(),
config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
config.getFlowOrQpsDifferenceThresholdPercentage());
} else {
bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
}
return bundleSplitOption;
}

public NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
}
return algorithm;
}

/**
* Update new bundle-range to admin/policies/namespace.
* Update may fail because of concurrent write to Zookeeper.