Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Write the child ownership to ServiceUnitStateChannel instead of ZK when handling bundle split #18858

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Write the child ownerships to BSC instead of ZK when split bundle
  • Loading branch information
Demogorgon314 committed Jan 31, 2023
commit 93fc19a94f0a3693a9c24080c05a43ab5f5720f1
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.channel;

import static java.lang.String.format;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
@@ -32,6 +33,7 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -47,22 +49,32 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.BundleSplitOption;
import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -410,8 +422,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {

private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTargetBroker(data.broker())) {
splitServiceUnit(serviceUnit)
.thenCompose(__ -> tombstoneAsync(serviceUnit))
splitServiceUnit(serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
}
}
@@ -509,25 +520,101 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
});
}

private CompletableFuture<Void> splitServiceUnit(String serviceUnit) {
// TODO: after the split we need to write the child ownerships to BSC instead of ZK.
private CompletableFuture<Void> splitServiceUnit(String serviceUnit, ServiceUnitStateData data) {
// Write the child ownerships to BSC.
long startTime = System.nanoTime();
return pulsar.getNamespaceService()
.splitAndOwnBundle(getNamespaceBundle(serviceUnit),
false,
NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()),
null)
.whenComplete((__, ex) -> {
double splitBundleTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
if (ex == null) {
log.info("Successfully split {} namespace-bundle in {} ms",
serviceUnit, splitBundleTime);
} else {
log.error("Failed to split {} namespace-bundle in {} ms",
serviceUnit, splitBundleTime, ex);
}
});
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory();
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);

CompletableFuture<Void> completionFuture = new CompletableFuture<>();
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();

getSplitBoundary(bundle).thenAccept(splittedBundles -> {
// Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper.
if (splittedBundles == null) {
String msg = format("Bundle %s not found under namespace", serviceUnit);
log.warn(msg);
updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
return;
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker());
for (NamespaceBundle sBundle : splittedBundles.getRight()) {
futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> {}));
}
NamespaceName nsname = bundle.getNamespaceObject();
FutureUtil.waitForAll(futures).thenRun(() -> {
namespaceService.updateNamespaceBundles(nsname, splittedBundles.getLeft()).thenCompose(__ ->
namespaceService.updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft()))
.thenRun(() -> {
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
updateFuture.complete(splittedBundles.getRight());
}).exceptionally(ex1 -> {
String msg = format("failed to update namespace policies [%s], "
+ "NamespaceBundle: %s due to %s",
nsname.toString(), bundle.getBundleRange(), ex1.getMessage());
log.warn(msg);
updateFuture.completeExceptionally(
new BrokerServiceException.ServiceUnitNotReadyException(msg, ex1.getCause()));
return null;
});
}).exceptionally(e -> {
updateFuture.completeExceptionally(e);
return null;
});
}).exceptionally(e -> {
updateFuture.completeExceptionally(e);
return null;
});

updateFuture.thenAccept(r -> {
// Free the old bundle
tombstoneAsync(serviceUnit).thenRun(() -> {
// Update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
// TODO: Update the load data if needed.
completionFuture.complete(null);
double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
log.info("Successfully split {} namespace-bundle in {} ms", serviceUnit, splitBundleTime);
}).exceptionally(e -> {
String msg = format(
"Failed to free bundle %s under namespace [%s] with error %s",
bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage());
completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
log.error("Failed to split {} namespace-bundle in {} ms", serviceUnit, splitBundleTime, e);
return null;
});
}).exceptionally(ex -> {
// TODO: Retry when have exception.
completionFuture.completeExceptionally(ex);
return null;
});
return completionFuture;
}

private CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplitBoundary(NamespaceBundle bundle) {
ServiceConfiguration config = pulsar.getConfig();
NamespaceService namespaceService = pulsar.getNamespaceService();
BundleSplitOption bundleSplitOption;
if (config.getDefaultNamespaceBundleSplitAlgorithm()
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(namespaceService, bundle, null,
topicStatsMap,
config.getLoadBalancerNamespaceBundleMaxMsgRate(),
config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
config.getFlowOrQpsDifferenceThresholdPercentage());
} else {
bundleSplitOption = new BundleSplitOption(namespaceService, bundle, null);
}
NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm =
NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm());
CompletableFuture<List<Long>> splitBoundary =
nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
return splitBoundary.thenCompose(splitBoundaries -> pulsar.getNamespaceService().getNamespaceBundleFactory()
.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries));
}

public void handleMetadataSessionEvent(SessionEvent e) {
Original file line number Diff line number Diff line change
@@ -950,7 +950,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
* @param nsBundles
* @throws Exception
*/
private CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname,
public CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname,
NamespaceBundles nsBundles) {
Objects.requireNonNull(nsname);
Objects.requireNonNull(nsBundles);
@@ -979,7 +979,7 @@ private CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName
* @param nsBundles
* @throws Exception
*/
private CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
public CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
Objects.requireNonNull(nsname);
Objects.requireNonNull(nsBundles);

Original file line number Diff line number Diff line change
@@ -455,7 +455,18 @@ public void splitTest() throws Exception {
waitUntilNewOwner(channel1, bundle, null);
waitUntilNewOwner(channel2, bundle, null);

// TODO: assert child bundle ownerships in the channels.
// Assert child bundle ownerships in the channels.
String childBundle1 = "public/default/0x7fffffff_0xffffffff";
String childBundle2 = "public/default/0x00000000_0x7fffffff";

waitUntilNewOwner(channel1, childBundle1, lookupServiceAddress1);
waitUntilNewOwner(channel1, childBundle2, lookupServiceAddress1);
waitUntilNewOwner(channel2, childBundle1, lookupServiceAddress1);
waitUntilNewOwner(channel2, childBundle2, lookupServiceAddress1);
assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(childBundle1).get());
assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(childBundle2).get());
assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(childBundle1).get());
assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(childBundle2).get());
}

@Test(priority = 7)