Skip to content

Commit 0f681d9

Browse files
committed
[improve][broker] PIP-192 made split handler idempotent
1 parent 07acdbc commit 0f681d9

16 files changed

+540
-174
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
440440
log.warn(msg);
441441
throw new IllegalArgumentException(msg);
442442
}
443-
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker);
443+
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true);
444444
UnloadDecision unloadDecision =
445445
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
446446
return unloadAsync(unloadDecision,

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+152-92
Large diffs are not rendered by default.

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java

+15
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,25 @@ public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String sou
4545
System.currentTimeMillis(), versionId);
4646
}
4747

48+
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String sourceBroker,
49+
Map<String, Optional<String>> splitServiceUnitToDestBroker, boolean force,
50+
long versionId) {
51+
this(state, dstBroker, sourceBroker, splitServiceUnitToDestBroker, force,
52+
System.currentTimeMillis(), versionId);
53+
}
54+
4855
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String sourceBroker, long versionId) {
4956
this(state, dstBroker, sourceBroker, null, false, System.currentTimeMillis(), versionId);
5057
}
5158

59+
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String sourceBroker, boolean force,
60+
long versionId) {
61+
this(state, dstBroker, sourceBroker, null, force,
62+
System.currentTimeMillis(), versionId);
63+
}
64+
65+
66+
5267
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, long versionId) {
5368
this(state, dstBroker, null, null, false, System.currentTimeMillis(), versionId);
5469
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,8 @@ public record Split(
3030

3131
public Split {
3232
Objects.requireNonNull(serviceUnit);
33-
if (splitServiceUnitToDestBroker != null && splitServiceUnitToDestBroker.size() != 2) {
33+
if (splitServiceUnitToDestBroker == null || splitServiceUnitToDestBroker.size() != 2) {
3434
throw new IllegalArgumentException("Split service unit should be split into 2 service units.");
3535
}
3636
}
37-
38-
public Split(String serviceUnit, String sourceBroker) {
39-
this(serviceUnit, sourceBroker, null);
40-
}
4137
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,18 @@
2424
/**
2525
* Defines the information required to unload or transfer a service unit(e.g. bundle).
2626
*/
27-
public record Unload(String sourceBroker, String serviceUnit, Optional<String> destBroker) {
27+
public record Unload(String sourceBroker, String serviceUnit, Optional<String> destBroker, boolean force) {
2828
public Unload {
2929
Objects.requireNonNull(sourceBroker);
3030
Objects.requireNonNull(serviceUnit);
3131
}
32+
33+
3234
public Unload(String sourceBroker, String serviceUnit) {
33-
this(sourceBroker, serviceUnit, Optional.empty());
35+
this(sourceBroker, serviceUnit, Optional.empty(), false);
36+
}
37+
38+
public Unload(String sourceBroker, String serviceUnit, Optional<String> destBroker) {
39+
this(sourceBroker, serviceUnit, destBroker, false);
3440
}
3541
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java

+63-1
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,23 @@
2626
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
2727
import java.util.HashMap;
2828
import java.util.HashSet;
29+
import java.util.List;
2930
import java.util.Map;
31+
import java.util.Optional;
3032
import java.util.Set;
33+
import java.util.concurrent.TimeUnit;
3134
import lombok.extern.slf4j.Slf4j;
3235
import org.apache.pulsar.broker.PulsarService;
3336
import org.apache.pulsar.broker.ServiceConfiguration;
3437
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
38+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
3539
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
3640
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
3741
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
3842
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
3943
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
4044
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
45+
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
4146
import org.apache.pulsar.common.naming.NamespaceName;
4247
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
4348

@@ -178,6 +183,27 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
178183
continue;
179184
}
180185

186+
var ranges = bundleRange.split("_");
187+
var foundSplittingBundle = false;
188+
for (var etr : channel.getOwnershipEntrySet()) {
189+
var eBundle = etr.getKey();
190+
var eData = etr.getValue();
191+
if (eData.state() == ServiceUnitState.Splitting && eBundle.startsWith(namespace)) {
192+
final String eRange = LoadManagerShared.getBundleRangeFromBundleName(eBundle);
193+
if (eRange.startsWith(ranges[0]) || eRange.endsWith(ranges[1])) {
194+
if (debug) {
195+
log.info(String.format(CANNOT_SPLIT_BUNDLE_MSG
196+
+ " (parent) bundle:%s is in Splitting state.", bundle, eBundle));
197+
}
198+
foundSplittingBundle = true;
199+
break;
200+
}
201+
}
202+
}
203+
if (foundSplittingBundle) {
204+
continue;
205+
}
206+
181207
if (debug) {
182208
log.info(String.format(
183209
"Splitting bundle: %s. "
@@ -194,7 +220,43 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
194220
));
195221
}
196222
var decision = new SplitDecision();
197-
decision.setSplit(new Split(bundle, context.brokerRegistry().getBrokerId()));
223+
var namespaceService = pulsar.getNamespaceService();
224+
var namespaceBundle = namespaceService.getNamespaceBundleFactory()
225+
.getBundle(namespaceName, bundleRange);
226+
NamespaceBundleSplitAlgorithm algorithm =
227+
namespaceService.getNamespaceBundleSplitAlgorithmByName(
228+
conf.getDefaultNamespaceBundleSplitAlgorithm());
229+
List<Long> splitBoundary = null;
230+
try {
231+
splitBoundary = namespaceService
232+
.getSplitBoundary(namespaceBundle, null, algorithm)
233+
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
234+
} catch (Throwable e) {
235+
counter.update(Failure, Unknown);
236+
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Failed to get split boundaries.", bundle, e));
237+
continue;
238+
}
239+
if (splitBoundary == null) {
240+
counter.update(Failure, Unknown);
241+
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The split boundaries is null.", bundle));
242+
continue;
243+
}
244+
if (splitBoundary.size() != 1) {
245+
counter.update(Failure, Unknown);
246+
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The size of split boundaries is not 1. "
247+
+ "splitBoundary:%s", bundle, splitBoundary));
248+
continue;
249+
}
250+
251+
var parentRange = namespaceBundle.getKeyRange();
252+
var leftChildBundle = namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
253+
NamespaceBundleFactory.getRange(parentRange.lowerEndpoint(), splitBoundary.get(0)));
254+
var rightChildBundle = namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
255+
NamespaceBundleFactory.getRange(splitBoundary.get(0), parentRange.upperEndpoint()));
256+
Map<String, Optional<String>> splitServiceUnitToDestBroker = Map.of(
257+
leftChildBundle.getBundleRange(), Optional.empty(),
258+
rightChildBundle.getBundleRange(), Optional.empty());
259+
decision.setSplit(new Split(bundle, context.brokerRegistry().getBrokerId(), splitServiceUnitToDestBroker));
198260
decision.succeed(reason);
199261
decisionCache.add(decision);
200262
int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -967,9 +967,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
967967
*/
968968
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplitBoundary(
969969
NamespaceBundle bundle, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm, List<Long> boundaries) {
970-
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
971-
CompletableFuture<List<Long>> splitBoundary =
972-
nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
970+
CompletableFuture<List<Long>> splitBoundary = getSplitBoundary(bundle, boundaries, nsBundleSplitAlgorithm);
973971
return splitBoundary.thenCompose(splitBoundaries -> {
974972
if (splitBoundaries == null || splitBoundaries.size() == 0) {
975973
LOG.info("[{}] No valid boundary found in {} to split bundle {}",
@@ -981,6 +979,12 @@ public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplit
981979
});
982980
}
983981

982+
public CompletableFuture<List<Long>> getSplitBoundary(
983+
NamespaceBundle bundle, List<Long> boundaries, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm) {
984+
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
985+
return nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
986+
}
987+
984988
private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
985989
List<Long> boundaries,
986990
ServiceConfiguration config) {

pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,7 @@ public NamespaceBundle getBundle(String namespace, String bundleRange) {
272272
String[] boundaries = bundleRange.split("_");
273273
Long lowerEndpoint = Long.decode(boundaries[0]);
274274
Long upperEndpoint = Long.decode(boundaries[1]);
275-
Range<Long> hashRange = Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint,
276-
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN);
275+
Range<Long> hashRange = getRange(lowerEndpoint, upperEndpoint);
277276
return getBundle(NamespaceName.get(namespace), hashRange);
278277
}
279278

@@ -414,4 +413,9 @@ public static String getNamespaceFromPoliciesPath(String path) {
414413
return Joiner.on("/").join(i);
415414
}
416415

416+
public static Range<Long> getRange(Long lowerEndpoint, Long upperEndpoint) {
417+
return Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint,
418+
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN);
419+
}
420+
417421
}

pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ public interface NamespaceBundleSplitAlgorithm {
3939
NamespaceBundleSplitAlgorithm TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new TopicCountEquallyDivideBundleSplitAlgorithm();
4040
NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_ALGO =
4141
new SpecifiedPositionsBundleSplitAlgorithm();
42+
43+
NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO =
44+
new SpecifiedPositionsBundleSplitAlgorithm(true);
4245
NamespaceBundleSplitAlgorithm FLOW_OR_QPS_EQUALLY_DIVIDE_ALGO = new FlowOrQpsEquallyDivideBundleSplitAlgorithm();
4346

4447
static NamespaceBundleSplitAlgorithm of(String algorithmName) {

pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java

+33-14
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@
2929
* This algorithm divides the bundle into several parts by the specified positions.
3030
*/
3131
public class SpecifiedPositionsBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm{
32+
33+
private boolean force;
34+
35+
public SpecifiedPositionsBundleSplitAlgorithm() {
36+
force = false;
37+
}
38+
39+
public SpecifiedPositionsBundleSplitAlgorithm(boolean force) {
40+
this.force = force;
41+
}
3242
@Override
3343
public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
3444
NamespaceService service = bundleSplitOption.getService();
@@ -39,19 +49,28 @@ public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSp
3949
}
4050
// sort all positions
4151
Collections.sort(positions);
42-
return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
43-
if (topics == null || topics.size() <= 1) {
44-
return CompletableFuture.completedFuture(null);
45-
}
46-
List<Long> splitBoundaries = positions
47-
.stream()
48-
.filter(position -> position > bundle.getLowerEndpoint() && position < bundle.getUpperEndpoint())
49-
.collect(Collectors.toList());
50-
51-
if (splitBoundaries.size() == 0) {
52-
return CompletableFuture.completedFuture(null);
53-
}
54-
return CompletableFuture.completedFuture(splitBoundaries);
55-
});
52+
if (force) {
53+
return getBoundaries(bundle, positions);
54+
} else {
55+
return service.getOwnedTopicListForNamespaceBundle(bundle)
56+
.thenCompose(topics -> {
57+
if (topics == null || topics.size() <= 1) {
58+
return CompletableFuture.completedFuture(null);
59+
}
60+
return getBoundaries(bundle, positions);
61+
});
62+
}
63+
}
64+
65+
private CompletableFuture<List<Long>> getBoundaries(NamespaceBundle bundle, List<Long> positions) {
66+
List<Long> splitBoundaries = positions
67+
.stream()
68+
.filter(position -> position > bundle.getLowerEndpoint() && position < bundle.getUpperEndpoint())
69+
.collect(Collectors.toList());
70+
71+
if (splitBoundaries.size() == 0) {
72+
return CompletableFuture.completedFuture(null);
73+
}
74+
return CompletableFuture.completedFuture(splitBoundaries);
5675
}
5776
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public void setup() throws Exception {
139139
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
140140
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
141141
conf.setLoadBalancerSheddingEnabled(false);
142+
conf.setLoadBalancerDebugModeEnabled(true);
142143
super.internalSetup(conf);
143144
pulsar1 = pulsar;
144145
ServiceConfiguration defaultConf = getDefaultConf();

0 commit comments

Comments
 (0)