Skip to content

Commit 42c23e6

Browse files
committed
[improve][broker] PIP-192 Added Split Scheduler
1 parent af1360f commit 42c23e6

File tree

11 files changed

+854
-63
lines changed

11 files changed

+854
-63
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+24
Original file line numberDiff line numberDiff line change
@@ -2557,6 +2557,30 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
25572557
+ "(only used in load balancer extension logics)"
25582558
)
25592559
private double loadBalancerBundleLoadReportPercentage = 10;
2560+
@FieldContext(
2561+
category = CATEGORY_LOAD_BALANCER,
2562+
doc = "Service units'(bundles) split interval. Broker periodically checks whether "
2563+
+ "some service units(e.g. bundles) should split if they become hot-spots. "
2564+
+ "(only used in load balancer extension logics)"
2565+
)
2566+
private int loadBalancerSplitIntervalMinutes = 1;
2567+
@FieldContext(
2568+
category = CATEGORY_LOAD_BALANCER,
2569+
dynamic = true,
2570+
doc = "Max number of bundles to split to per cycle. "
2571+
+ "(only used in load balancer extension logics)"
2572+
)
2573+
private int loadBalancerMaxNumberOfBundlesToSplitPerCycle = 10;
2574+
@FieldContext(
2575+
category = CATEGORY_LOAD_BALANCER,
2576+
dynamic = true,
2577+
doc = "Threshold to the consecutive count of fulfilled split conditions. "
2578+
+ "If the split scheduler consecutively finds bundles that meet split conditions "
2579+
+ "many times bigger than this threshold, the scheduler will trigger splits on the bundles "
2580+
+ "(if the number of bundles is less than loadBalancerNamespaceMaximumBundles). "
2581+
+ "(only used in load balancer extension logics)"
2582+
)
2583+
private int loadBalancerNamespaceBundleSplitConditionThreshold = 5;
25602584

25612585
@FieldContext(
25622586
category = CATEGORY_LOAD_BALANCER,

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+6-11
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@
4747
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
4848
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
4949
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
50-
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
5150
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
5251
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
5352
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
5453
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
5554
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
5655
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
56+
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
5757
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
5858
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
5959
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
@@ -102,7 +102,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
102102

103103
@Getter
104104
private final List<BrokerFilter> brokerFilterPipeline;
105-
106105
/**
107106
* The load data reporter.
108107
*/
@@ -112,6 +111,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
112111

113112
private ScheduledFuture brokerLoadDataReportTask;
114113
private ScheduledFuture topBundlesLoadDataReportTask;
114+
private SplitScheduler splitScheduler;
115115

116116
private UnloadManager unloadManager;
117117

@@ -182,7 +182,6 @@ public void start() throws PulsarServerException {
182182
.brokerLoadDataStore(brokerLoadDataStore)
183183
.topBundleLoadDataStore(topBundlesLoadDataStore).build();
184184

185-
186185
this.brokerLoadDataReporter =
187186
new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
188187

@@ -214,10 +213,12 @@ public void start() throws PulsarServerException {
214213
interval,
215214
interval, TimeUnit.MILLISECONDS);
216215

217-
// TODO: Start bundle split scheduler.
218216
this.unloadScheduler = new UnloadScheduler(
219217
pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel);
220218
this.unloadScheduler.start();
219+
this.splitScheduler = new SplitScheduler(
220+
pulsar, serviceUnitStateChannel, splitCounter, splitMetrics, context);
221+
this.splitScheduler.start();
221222
this.started = true;
222223
}
223224

@@ -376,6 +377,7 @@ public void close() throws PulsarServerException {
376377
this.brokerLoadDataStore.close();
377378
this.topBundlesLoadDataStore.close();
378379
this.unloadScheduler.close();
380+
this.splitScheduler.close();
379381
} catch (IOException ex) {
380382
throw new PulsarServerException(ex);
381383
} finally {
@@ -407,13 +409,6 @@ private void updateUnloadMetrics(UnloadDecision decision) {
407409
this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress()));
408410
}
409411

410-
private void updateSplitMetrics(List<SplitDecision> decisions) {
411-
for (var decision : decisions) {
412-
splitCounter.update(decision);
413-
}
414-
this.splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress()));
415-
}
416-
417412
public List<Metrics> getMetrics() {
418413
List<Metrics> metricsCollection = new ArrayList<>();
419414

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

-1
Original file line numberDiff line numberDiff line change
@@ -882,7 +882,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
882882
return null;
883883
});
884884
}
885-
886885
public void handleMetadataSessionEvent(SessionEvent e) {
887886
if (e == SessionReestablished || e == SessionLost) {
888887
lastMetadataSessionEvent = e;

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java

+27-19
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
package org.apache.pulsar.broker.loadbalance.extensions.models;
2020

2121
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
22-
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
2322
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
2423
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
25-
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
2624
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
2725
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
2826
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
@@ -32,39 +30,45 @@
3230
import java.util.HashMap;
3331
import java.util.List;
3432
import java.util.Map;
35-
import org.apache.commons.lang3.mutable.MutableLong;
33+
import java.util.concurrent.atomic.AtomicLong;
3634
import org.apache.pulsar.common.stats.Metrics;
3735

3836
/**
3937
* Defines the information required for a service unit split(e.g. bundle split).
4038
*/
4139
public class SplitCounter {
4240

43-
long splitCount = 0;
44-
45-
final Map<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> breakdownCounters;
41+
private long splitCount = 0;
42+
private final Map<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> breakdownCounters;
43+
private volatile long updatedAt = 0;
4644

4745
public SplitCounter() {
4846
breakdownCounters = Map.of(
4947
Success, Map.of(
50-
Topics, new MutableLong(),
51-
Sessions, new MutableLong(),
52-
MsgRate, new MutableLong(),
53-
Bandwidth, new MutableLong(),
54-
Admin, new MutableLong()),
55-
Skip, Map.of(
56-
Balanced, new MutableLong()
57-
),
48+
Topics, new AtomicLong(),
49+
Sessions, new AtomicLong(),
50+
MsgRate, new AtomicLong(),
51+
Bandwidth, new AtomicLong(),
52+
Admin, new AtomicLong()),
5853
Failure, Map.of(
59-
Unknown, new MutableLong())
54+
Unknown, new AtomicLong())
6055
);
6156
}
6257

6358
public void update(SplitDecision decision) {
6459
if (decision.label == Success) {
6560
splitCount++;
6661
}
67-
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment();
62+
breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet();
63+
updatedAt = System.currentTimeMillis();
64+
}
65+
66+
public void update(SplitDecision.Label label, SplitDecision.Reason reason) {
67+
if (label == Success) {
68+
splitCount++;
69+
}
70+
breakdownCounters.get(label).get(reason).incrementAndGet();
71+
updatedAt = System.currentTimeMillis();
6872
}
6973

7074
public List<Metrics> toMetrics(String advertisedBrokerAddress) {
@@ -77,22 +81,26 @@ public List<Metrics> toMetrics(String advertisedBrokerAddress) {
7781
m.put("brk_lb_bundles_split_total", splitCount);
7882
metrics.add(m);
7983

80-
for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> etr
84+
85+
for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> etr
8186
: breakdownCounters.entrySet()) {
8287
var result = etr.getKey();
83-
for (Map.Entry<SplitDecision.Reason, MutableLong> counter : etr.getValue().entrySet()) {
88+
for (Map.Entry<SplitDecision.Reason, AtomicLong> counter : etr.getValue().entrySet()) {
8489
var reason = counter.getKey();
8590
var count = counter.getValue();
8691
Map<String, String> breakdownDims = new HashMap<>(dimensions);
8792
breakdownDims.put("result", result.toString());
8893
breakdownDims.put("reason", reason.toString());
8994
Metrics breakdownMetric = Metrics.create(breakdownDims);
90-
breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count);
95+
breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count.get());
9196
metrics.add(breakdownMetric);
9297
}
9398
}
9499

95100
return metrics;
96101
}
97102

103+
public long updatedAt() {
104+
return updatedAt;
105+
}
98106
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java

-9
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
package org.apache.pulsar.broker.loadbalance.extensions.models;
2020

2121
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
22-
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
2322
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
24-
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
2523
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
2624
import lombok.Data;
2725

@@ -36,7 +34,6 @@ public class SplitDecision {
3634

3735
public enum Label {
3836
Success,
39-
Skip,
4037
Failure
4138
}
4239

@@ -46,7 +43,6 @@ public enum Reason {
4643
MsgRate,
4744
Bandwidth,
4845
Admin,
49-
Balanced,
5046
Unknown
5147
}
5248

@@ -62,11 +58,6 @@ public void clear() {
6258
reason = null;
6359
}
6460

65-
public void skip() {
66-
label = Skip;
67-
reason = Balanced;
68-
}
69-
7061
public void succeed(Reason reason) {
7162
label = Success;
7263
this.reason = reason;

0 commit comments

Comments
 (0)