Skip to content

Commit 421d707

Browse files
authored
[improve][broker] PIP-192 made split handler idempotent (#19988)
Master Issue: #16691 ### Motivation Raising a PR to implement: #16691 ### Modifications This PR - Makes split handler idempotent . - Makes Leader's orphan monitor keep trying to send split msg until finished. - Select bundle boundaries at the SplitScheduler to have the same split boundaries for each Split handler retry. - Adds a split condition to check if the parent's Splitting state has moved. - Made Admin Unload command forceful to unload any bundles in invalid states.
1 parent 55acbe6 commit 421d707

18 files changed

+767
-200
lines changed

conf/broker.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -1455,7 +1455,7 @@ loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3
14551455
# the service-unit state channel when there are a large number of bundles.
14561456
# minimum value = 30 secs
14571457
# (only used in load balancer extension logics)
1458-
loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds=604800
1458+
loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600
14591459

14601460

14611461
### --- Replication --- ###

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2609,7 +2609,7 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
26092609
+ "minimum value = 30 secs"
26102610
+ "(only used in load balancer extension logics)"
26112611
)
2612-
private long loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds = 604800;
2612+
private long loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds = 3600;
26132613

26142614
@FieldContext(
26152615
category = CATEGORY_LOAD_BALANCER,

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
449449
log.warn(msg);
450450
throw new IllegalArgumentException(msg);
451451
}
452-
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker);
452+
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true);
453453
UnloadDecision unloadDecision =
454454
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
455455
return unloadAsync(unloadDecision,

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+166-97
Large diffs are not rendered by default.

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java

+15
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,25 @@ public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String sou
4545
System.currentTimeMillis(), versionId);
4646
}
4747

48+
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String sourceBroker,
49+
Map<String, Optional<String>> splitServiceUnitToDestBroker, boolean force,
50+
long versionId) {
51+
this(state, dstBroker, sourceBroker, splitServiceUnitToDestBroker, force,
52+
System.currentTimeMillis(), versionId);
53+
}
54+
4855
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String sourceBroker, long versionId) {
4956
this(state, dstBroker, sourceBroker, null, false, System.currentTimeMillis(), versionId);
5057
}
5158

59+
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, String sourceBroker, boolean force,
60+
long versionId) {
61+
this(state, dstBroker, sourceBroker, null, force,
62+
System.currentTimeMillis(), versionId);
63+
}
64+
65+
66+
5267
public ServiceUnitStateData(ServiceUnitState state, String dstBroker, long versionId) {
5368
this(state, dstBroker, null, null, false, System.currentTimeMillis(), versionId);
5469
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Split.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,8 @@ public record Split(
3030

3131
public Split {
3232
Objects.requireNonNull(serviceUnit);
33-
if (splitServiceUnitToDestBroker != null && splitServiceUnitToDestBroker.size() != 2) {
33+
if (splitServiceUnitToDestBroker == null || splitServiceUnitToDestBroker.size() != 2) {
3434
throw new IllegalArgumentException("Split service unit should be split into 2 service units.");
3535
}
3636
}
37-
38-
public Split(String serviceUnit, String sourceBroker) {
39-
this(serviceUnit, sourceBroker, null);
40-
}
4137
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/Unload.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,18 @@
2424
/**
2525
* Defines the information required to unload or transfer a service unit(e.g. bundle).
2626
*/
27-
public record Unload(String sourceBroker, String serviceUnit, Optional<String> destBroker) {
27+
public record Unload(String sourceBroker, String serviceUnit, Optional<String> destBroker, boolean force) {
2828
public Unload {
2929
Objects.requireNonNull(sourceBroker);
3030
Objects.requireNonNull(serviceUnit);
3131
}
32+
33+
3234
public Unload(String sourceBroker, String serviceUnit) {
33-
this(sourceBroker, serviceUnit, Optional.empty());
35+
this(sourceBroker, serviceUnit, Optional.empty(), false);
36+
}
37+
38+
public Unload(String sourceBroker, String serviceUnit, Optional<String> destBroker) {
39+
this(sourceBroker, serviceUnit, destBroker, false);
3440
}
3541
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java

+75-1
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,23 @@
2626
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
2727
import java.util.HashMap;
2828
import java.util.HashSet;
29+
import java.util.List;
2930
import java.util.Map;
31+
import java.util.Optional;
3032
import java.util.Set;
33+
import java.util.concurrent.TimeUnit;
3134
import lombok.extern.slf4j.Slf4j;
3235
import org.apache.pulsar.broker.PulsarService;
3336
import org.apache.pulsar.broker.ServiceConfiguration;
3437
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
38+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
3539
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
3640
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
3741
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
3842
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
3943
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
4044
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
45+
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
4146
import org.apache.pulsar.common.naming.NamespaceName;
4247
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
4348

@@ -53,12 +58,14 @@ public class DefaultNamespaceBundleSplitStrategyImpl implements NamespaceBundleS
5358
private final Set<SplitDecision> decisionCache;
5459
private final Map<String, Integer> namespaceBundleCount;
5560
private final Map<String, Integer> splitConditionHitCounts;
61+
private final Map<String, String> splittingBundles;
5662
private final SplitCounter counter;
5763

5864
public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) {
5965
decisionCache = new HashSet<>();
6066
namespaceBundleCount = new HashMap<>();
6167
splitConditionHitCounts = new HashMap<>();
68+
splittingBundles = new HashMap<>();
6269
this.counter = counter;
6370

6471
}
@@ -67,6 +74,7 @@ public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) {
6774
public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarService pulsar) {
6875
decisionCache.clear();
6976
namespaceBundleCount.clear();
77+
splittingBundles.clear();
7078
final ServiceConfiguration conf = pulsar.getConfiguration();
7179
int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
7280
long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
@@ -78,6 +86,15 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
7886
boolean debug = log.isDebugEnabled() || conf.isLoadBalancerDebugModeEnabled();
7987
var channel = ServiceUnitStateChannelImpl.get(pulsar);
8088

89+
for (var etr : channel.getOwnershipEntrySet()) {
90+
var eData = etr.getValue();
91+
if (eData.state() == ServiceUnitState.Splitting) {
92+
String bundle = etr.getKey();
93+
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
94+
splittingBundles.put(bundle, bundleRange);
95+
}
96+
}
97+
8198
Map<String, NamespaceBundleStats> bundleStatsMap = pulsar.getBrokerService().getBundleStats();
8299
NamespaceBundleFactory namespaceBundleFactory =
83100
pulsar.getNamespaceService().getNamespaceBundleFactory();
@@ -177,6 +194,27 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
177194
continue;
178195
}
179196

197+
var ranges = bundleRange.split("_");
198+
var foundSplittingBundle = false;
199+
for (var etr : splittingBundles.entrySet()) {
200+
var splittingBundle = etr.getKey();
201+
if (splittingBundle.startsWith(namespace)) {
202+
var splittingBundleRange = etr.getValue();
203+
if (splittingBundleRange.startsWith(ranges[0])
204+
|| splittingBundleRange.endsWith(ranges[1])) {
205+
if (debug) {
206+
log.info(String.format(CANNOT_SPLIT_BUNDLE_MSG
207+
+ " (parent) bundle:%s is in Splitting state.", bundle, splittingBundle));
208+
}
209+
foundSplittingBundle = true;
210+
break;
211+
}
212+
}
213+
}
214+
if (foundSplittingBundle) {
215+
continue;
216+
}
217+
180218
if (debug) {
181219
log.info(String.format(
182220
"Splitting bundle: %s. "
@@ -193,7 +231,43 @@ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarS
193231
));
194232
}
195233
var decision = new SplitDecision();
196-
decision.setSplit(new Split(bundle, context.brokerRegistry().getBrokerId()));
234+
var namespaceService = pulsar.getNamespaceService();
235+
var namespaceBundle = namespaceService.getNamespaceBundleFactory()
236+
.getBundle(namespaceName, bundleRange);
237+
NamespaceBundleSplitAlgorithm algorithm =
238+
namespaceService.getNamespaceBundleSplitAlgorithmByName(
239+
conf.getDefaultNamespaceBundleSplitAlgorithm());
240+
List<Long> splitBoundary = null;
241+
try {
242+
splitBoundary = namespaceService
243+
.getSplitBoundary(namespaceBundle, null, algorithm)
244+
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
245+
} catch (Throwable e) {
246+
counter.update(Failure, Unknown);
247+
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Failed to get split boundaries.", bundle, e));
248+
continue;
249+
}
250+
if (splitBoundary == null) {
251+
counter.update(Failure, Unknown);
252+
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The split boundaries is null.", bundle));
253+
continue;
254+
}
255+
if (splitBoundary.size() != 1) {
256+
counter.update(Failure, Unknown);
257+
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The size of split boundaries is not 1. "
258+
+ "splitBoundary:%s", bundle, splitBoundary));
259+
continue;
260+
}
261+
262+
var parentRange = namespaceBundle.getKeyRange();
263+
var leftChildBundle = namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
264+
NamespaceBundleFactory.getRange(parentRange.lowerEndpoint(), splitBoundary.get(0)));
265+
var rightChildBundle = namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
266+
NamespaceBundleFactory.getRange(splitBoundary.get(0), parentRange.upperEndpoint()));
267+
Map<String, Optional<String>> splitServiceUnitToDestBroker = Map.of(
268+
leftChildBundle.getBundleRange(), Optional.empty(),
269+
rightChildBundle.getBundleRange(), Optional.empty());
270+
decision.setSplit(new Split(bundle, context.brokerRegistry().getBrokerId(), splitServiceUnitToDestBroker));
197271
decision.succeed(reason);
198272
decisionCache.add(decision);
199273
int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -967,9 +967,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
967967
*/
968968
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplitBoundary(
969969
NamespaceBundle bundle, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm, List<Long> boundaries) {
970-
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
971-
CompletableFuture<List<Long>> splitBoundary =
972-
nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
970+
CompletableFuture<List<Long>> splitBoundary = getSplitBoundary(bundle, boundaries, nsBundleSplitAlgorithm);
973971
return splitBoundary.thenCompose(splitBoundaries -> {
974972
if (splitBoundaries == null || splitBoundaries.size() == 0) {
975973
LOG.info("[{}] No valid boundary found in {} to split bundle {}",
@@ -981,6 +979,12 @@ public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplit
981979
});
982980
}
983981

982+
public CompletableFuture<List<Long>> getSplitBoundary(
983+
NamespaceBundle bundle, List<Long> boundaries, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm) {
984+
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
985+
return nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
986+
}
987+
984988
private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
985989
List<Long> boundaries,
986990
ServiceConfiguration config) {

pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,7 @@ public NamespaceBundle getBundle(String namespace, String bundleRange) {
272272
String[] boundaries = bundleRange.split("_");
273273
Long lowerEndpoint = Long.decode(boundaries[0]);
274274
Long upperEndpoint = Long.decode(boundaries[1]);
275-
Range<Long> hashRange = Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint,
276-
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN);
275+
Range<Long> hashRange = getRange(lowerEndpoint, upperEndpoint);
277276
return getBundle(NamespaceName.get(namespace), hashRange);
278277
}
279278

@@ -414,4 +413,9 @@ public static String getNamespaceFromPoliciesPath(String path) {
414413
return Joiner.on("/").join(i);
415414
}
416415

416+
public static Range<Long> getRange(Long lowerEndpoint, Long upperEndpoint) {
417+
return Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint,
418+
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN);
419+
}
420+
417421
}

pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java

+3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ public interface NamespaceBundleSplitAlgorithm {
3939
NamespaceBundleSplitAlgorithm TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new TopicCountEquallyDivideBundleSplitAlgorithm();
4040
NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_ALGO =
4141
new SpecifiedPositionsBundleSplitAlgorithm();
42+
43+
NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO =
44+
new SpecifiedPositionsBundleSplitAlgorithm(true);
4245
NamespaceBundleSplitAlgorithm FLOW_OR_QPS_EQUALLY_DIVIDE_ALGO = new FlowOrQpsEquallyDivideBundleSplitAlgorithm();
4346

4447
static NamespaceBundleSplitAlgorithm of(String algorithmName) {

pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java

+33-14
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@
2929
* This algorithm divides the bundle into several parts by the specified positions.
3030
*/
3131
public class SpecifiedPositionsBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm{
32+
33+
private boolean force;
34+
35+
public SpecifiedPositionsBundleSplitAlgorithm() {
36+
force = false;
37+
}
38+
39+
public SpecifiedPositionsBundleSplitAlgorithm(boolean force) {
40+
this.force = force;
41+
}
3242
@Override
3343
public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
3444
NamespaceService service = bundleSplitOption.getService();
@@ -39,19 +49,28 @@ public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSp
3949
}
4050
// sort all positions
4151
Collections.sort(positions);
42-
return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
43-
if (topics == null || topics.size() <= 1) {
44-
return CompletableFuture.completedFuture(null);
45-
}
46-
List<Long> splitBoundaries = positions
47-
.stream()
48-
.filter(position -> position > bundle.getLowerEndpoint() && position < bundle.getUpperEndpoint())
49-
.collect(Collectors.toList());
50-
51-
if (splitBoundaries.size() == 0) {
52-
return CompletableFuture.completedFuture(null);
53-
}
54-
return CompletableFuture.completedFuture(splitBoundaries);
55-
});
52+
if (force) {
53+
return getBoundaries(bundle, positions);
54+
} else {
55+
return service.getOwnedTopicListForNamespaceBundle(bundle)
56+
.thenCompose(topics -> {
57+
if (topics == null || topics.size() <= 1) {
58+
return CompletableFuture.completedFuture(null);
59+
}
60+
return getBoundaries(bundle, positions);
61+
});
62+
}
63+
}
64+
65+
private CompletableFuture<List<Long>> getBoundaries(NamespaceBundle bundle, List<Long> positions) {
66+
List<Long> splitBoundaries = positions
67+
.stream()
68+
.filter(position -> position > bundle.getLowerEndpoint() && position < bundle.getUpperEndpoint())
69+
.collect(Collectors.toList());
70+
71+
if (splitBoundaries.size() == 0) {
72+
return CompletableFuture.completedFuture(null);
73+
}
74+
return CompletableFuture.completedFuture(splitBoundaries);
5675
}
5776
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public void setup() throws Exception {
133133
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
134134
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
135135
conf.setLoadBalancerSheddingEnabled(false);
136+
conf.setLoadBalancerDebugModeEnabled(true);
136137
super.internalSetup(conf);
137138
pulsar1 = pulsar;
138139
ServiceConfiguration defaultConf = getDefaultConf();

0 commit comments

Comments
 (0)