Skip to content

Commit 9a85dea

Browse files
authored
[improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl (#19622)
Master Issue: #16691 ### Motivation We will start raising PRs to implement PIP-192, #16691 ### Modifications This PR implemented - SplitScheduler - DefaultNamespaceBundleSplitStrategyImpl - SplitManager - and their unit test.
1 parent b1a463a commit 9a85dea

File tree

12 files changed

+1208
-62
lines changed

12 files changed

+1208
-62
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+24
Original file line numberDiff line numberDiff line change
@@ -2557,6 +2557,30 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
25572557
+ "(only used in load balancer extension logics)"
25582558
)
25592559
private double loadBalancerBundleLoadReportPercentage = 10;
2560+
@FieldContext(
2561+
category = CATEGORY_LOAD_BALANCER,
2562+
doc = "Service units'(bundles) split interval. Broker periodically checks whether "
2563+
+ "some service units(e.g. bundles) should split if they become hot-spots. "
2564+
+ "(only used in load balancer extension logics)"
2565+
)
2566+
private int loadBalancerSplitIntervalMinutes = 1;
2567+
@FieldContext(
2568+
category = CATEGORY_LOAD_BALANCER,
2569+
dynamic = true,
2570+
doc = "Max number of bundles to split to per cycle. "
2571+
+ "(only used in load balancer extension logics)"
2572+
)
2573+
private int loadBalancerMaxNumberOfBundlesToSplitPerCycle = 10;
2574+
@FieldContext(
2575+
category = CATEGORY_LOAD_BALANCER,
2576+
dynamic = true,
2577+
doc = "Threshold to the consecutive count of fulfilled split conditions. "
2578+
+ "If the split scheduler consecutively finds bundles that meet split conditions "
2579+
+ "many times bigger than this threshold, the scheduler will trigger splits on the bundles "
2580+
+ "(if the number of bundles is less than loadBalancerNamespaceMaximumBundles). "
2581+
+ "(only used in load balancer extension logics)"
2582+
)
2583+
private int loadBalancerNamespaceBundleSplitConditionThreshold = 5;
25602584

25612585
@FieldContext(
25622586
category = CATEGORY_LOAD_BALANCER,

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,17 @@
4545
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
4646
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
4747
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
48+
import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
4849
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
4950
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
5051
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
51-
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
5252
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
5353
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
5454
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
5555
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
5656
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
5757
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
58+
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
5859
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
5960
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
6061
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
@@ -103,7 +104,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
103104

104105
@Getter
105106
private final List<BrokerFilter> brokerFilterPipeline;
106-
107107
/**
108108
* The load data reporter.
109109
*/
@@ -113,9 +113,12 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
113113

114114
private ScheduledFuture brokerLoadDataReportTask;
115115
private ScheduledFuture topBundlesLoadDataReportTask;
116+
private SplitScheduler splitScheduler;
116117

117118
private UnloadManager unloadManager;
118119

120+
private SplitManager splitManager;
121+
119122
private boolean started = false;
120123

121124
private final AssignCounter assignCounter = new AssignCounter();
@@ -166,7 +169,9 @@ public void start() throws PulsarServerException {
166169
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
167170
this.brokerRegistry.start();
168171
this.unloadManager = new UnloadManager();
172+
this.splitManager = new SplitManager(splitCounter);
169173
this.serviceUnitStateChannel.listen(unloadManager);
174+
this.serviceUnitStateChannel.listen(splitManager);
170175
this.serviceUnitStateChannel.start();
171176

172177
try {
@@ -184,7 +189,6 @@ public void start() throws PulsarServerException {
184189
.brokerLoadDataStore(brokerLoadDataStore)
185190
.topBundleLoadDataStore(topBundlesLoadDataStore).build();
186191

187-
188192
this.brokerLoadDataReporter =
189193
new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
190194

@@ -216,10 +220,12 @@ public void start() throws PulsarServerException {
216220
interval,
217221
interval, TimeUnit.MILLISECONDS);
218222

219-
// TODO: Start bundle split scheduler.
220223
this.unloadScheduler = new UnloadScheduler(
221224
pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel);
222225
this.unloadScheduler.start();
226+
this.splitScheduler = new SplitScheduler(
227+
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
228+
this.splitScheduler.start();
223229
this.started = true;
224230
}
225231

@@ -380,6 +386,7 @@ public void close() throws PulsarServerException {
380386
this.brokerLoadDataStore.close();
381387
this.topBundlesLoadDataStore.close();
382388
this.unloadScheduler.close();
389+
this.splitScheduler.close();
383390
} catch (IOException ex) {
384391
throw new PulsarServerException(ex);
385392
} finally {
@@ -411,13 +418,6 @@ private void updateUnloadMetrics(UnloadDecision decision) {
411418
this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress()));
412419
}
413420

414-
private void updateSplitMetrics(List<SplitDecision> decisions) {
415-
for (var decision : decisions) {
416-
splitCounter.update(decision);
417-
}
418-
this.splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress()));
419-
}
420-
421421
public List<Metrics> getMetrics() {
422422
List<Metrics> metricsCollection = new ArrayList<>();
423423

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.manager;
20+
21+
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
22+
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
23+
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.TimeUnit;
27+
import lombok.extern.slf4j.Slf4j;
28+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
29+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
30+
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
31+
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
32+
33+
/**
34+
* Split manager.
35+
*/
36+
@Slf4j
37+
public class SplitManager implements StateChangeListener {
38+
39+
40+
private final Map<String, CompletableFuture<Void>> inFlightSplitRequests;
41+
42+
private final SplitCounter counter;
43+
44+
public SplitManager(SplitCounter splitCounter) {
45+
this.inFlightSplitRequests = new ConcurrentHashMap<>();
46+
this.counter = splitCounter;
47+
}
48+
49+
private void complete(String serviceUnit, Throwable ex) {
50+
inFlightSplitRequests.computeIfPresent(serviceUnit, (__, future) -> {
51+
if (!future.isDone()) {
52+
if (ex != null) {
53+
future.completeExceptionally(ex);
54+
} else {
55+
future.complete(null);
56+
}
57+
}
58+
return null;
59+
});
60+
}
61+
62+
public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
63+
String bundle,
64+
SplitDecision decision,
65+
long timeout,
66+
TimeUnit timeoutUnit) {
67+
return eventPubFuture
68+
.thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> {
69+
log.info("Published the bundle split event for bundle:{}. "
70+
+ "Waiting the split event to complete. Timeout: {} {}",
71+
bundle, timeout, timeoutUnit);
72+
CompletableFuture<Void> future = new CompletableFuture<>();
73+
future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> {
74+
if (ex != null) {
75+
inFlightSplitRequests.remove(bundle);
76+
log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex);
77+
}
78+
});
79+
return future;
80+
}))
81+
.whenComplete((__, ex) -> {
82+
if (ex != null) {
83+
log.error("Failed the bundle split event for bundle:{}", bundle, ex);
84+
counter.update(Failure, Unknown);
85+
} else {
86+
log.info("Completed the bundle split event for bundle:{}", bundle);
87+
counter.update(decision);
88+
}
89+
});
90+
}
91+
92+
@Override
93+
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
94+
ServiceUnitState state = ServiceUnitStateData.state(data);
95+
if (t != null && inFlightSplitRequests.containsKey(serviceUnit)) {
96+
this.complete(serviceUnit, t);
97+
return;
98+
}
99+
switch (state) {
100+
case Deleted, Owned, Init -> this.complete(serviceUnit, t);
101+
default -> {
102+
if (log.isDebugEnabled()) {
103+
log.debug("Handling {} for service unit {}", data, serviceUnit);
104+
}
105+
}
106+
}
107+
}
108+
109+
public void close() {
110+
inFlightSplitRequests.forEach((bundle, future) -> {
111+
if (!future.isDone()) {
112+
String msg = String.format("Splitting bundle: %s, but the manager already closed.", bundle);
113+
log.warn(msg);
114+
future.completeExceptionally(new IllegalStateException(msg));
115+
}
116+
});
117+
inFlightSplitRequests.clear();
118+
}
119+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java

+27-19
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
package org.apache.pulsar.broker.loadbalance.extensions.models;
2020

2121
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
22-
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
2322
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
2423
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
25-
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
2624
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
2725
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
2826
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
@@ -32,39 +30,45 @@
3230
import java.util.HashMap;
3331
import java.util.List;
3432
import java.util.Map;
35-
import org.apache.commons.lang3.mutable.MutableLong;
33+
import java.util.concurrent.atomic.AtomicLong;
3634
import org.apache.pulsar.common.stats.Metrics;
3735

3836
/**
3937
* Defines the information required for a service unit split(e.g. bundle split).
4038
*/
4139
public class SplitCounter {
4240

43-
long splitCount = 0;
44-
45-
final Map<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> breakdownCounters;
41+
private long splitCount = 0;
42+
private final Map<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> breakdownCounters;
43+
private volatile long updatedAt = 0;
4644

4745
public SplitCounter() {
4846
breakdownCounters = Map.of(
4947
Success, Map.of(
50-
Topics, new MutableLong(),
51-
Sessions, new MutableLong(),
52-
MsgRate, new MutableLong(),
53-
Bandwidth, new MutableLong(),
54-
Admin, new MutableLong()),
55-
Skip, Map.of(
56-
Balanced, new MutableLong()
57-
),
48+
Topics, new AtomicLong(),
49+
Sessions, new AtomicLong(),
50+
MsgRate, new AtomicLong(),
51+
Bandwidth, new AtomicLong(),
52+
Admin, new AtomicLong()),
5853
Failure, Map.of(
59-
Unknown, new MutableLong())
54+
Unknown, new AtomicLong())
6055
);
6156
}
6257

6358
public void update(SplitDecision decision) {
6459
if (decision.label == Success) {
6560
splitCount++;
6661
}
67-
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment();
62+
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet();
63+
updatedAt = System.currentTimeMillis();
64+
}
65+
66+
public void update(SplitDecision.Label label, SplitDecision.Reason reason) {
67+
if (label == Success) {
68+
splitCount++;
69+
}
70+
breakdownCounters.get(label).get(reason).incrementAndGet();
71+
updatedAt = System.currentTimeMillis();
6872
}
6973

7074
public List<Metrics> toMetrics(String advertisedBrokerAddress) {
@@ -77,22 +81,26 @@ public List<Metrics> toMetrics(String advertisedBrokerAddress) {
7781
m.put("brk_lb_bundles_split_total", splitCount);
7882
metrics.add(m);
7983

80-
for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> etr
84+
85+
for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> etr
8186
: breakdownCounters.entrySet()) {
8287
var result = etr.getKey();
83-
for (Map.Entry<SplitDecision.Reason, MutableLong> counter : etr.getValue().entrySet()) {
88+
for (Map.Entry<SplitDecision.Reason, AtomicLong> counter : etr.getValue().entrySet()) {
8489
var reason = counter.getKey();
8590
var count = counter.getValue();
8691
Map<String, String> breakdownDims = new HashMap<>(dimensions);
8792
breakdownDims.put("result", result.toString());
8893
breakdownDims.put("reason", reason.toString());
8994
Metrics breakdownMetric = Metrics.create(breakdownDims);
90-
breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count);
95+
breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count.get());
9196
metrics.add(breakdownMetric);
9297
}
9398
}
9499

95100
return metrics;
96101
}
97102

103+
public long updatedAt() {
104+
return updatedAt;
105+
}
98106
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java

-9
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
package org.apache.pulsar.broker.loadbalance.extensions.models;
2020

2121
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
22-
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
2322
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
24-
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
2523
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
2624
import lombok.Data;
2725

@@ -36,7 +34,6 @@ public class SplitDecision {
3634

3735
public enum Label {
3836
Success,
39-
Skip,
4037
Failure
4138
}
4239

@@ -46,7 +43,6 @@ public enum Reason {
4643
MsgRate,
4744
Bandwidth,
4845
Admin,
49-
Balanced,
5046
Unknown
5147
}
5248

@@ -62,11 +58,6 @@ public void clear() {
6258
reason = null;
6359
}
6460

65-
public void skip() {
66-
label = Skip;
67-
reason = Balanced;
68-
}
69-
7061
public void succeed(Reason reason) {
7162
label = Success;
7263
this.reason = reason;

0 commit comments

Comments
 (0)