Skip to content

Commit 4caf41a

Browse files
authored
[improve][broker] PIP-220 Added TransferShedder (#18865)
1 parent 3e44d1e commit 4caf41a

File tree

6 files changed

+1239
-9
lines changed

6 files changed

+1239
-9
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+64
Original file line numberDiff line numberDiff line change
@@ -2450,6 +2450,70 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
24502450
)
24512451
private long namespaceBundleUnloadingTimeoutMs = 60000;
24522452

2453+
@FieldContext(
2454+
category = CATEGORY_LOAD_BALANCER,
2455+
dynamic = true,
2456+
doc = "Option to enable the debug mode for the load balancer logics. "
2457+
+ "The debug mode prints more logs to provide more information "
2458+
+ "such as load balance states and decisions. "
2459+
+ "(only used in load balancer extension logics)"
2460+
)
2461+
private boolean loadBalancerDebugModeEnabled = false;
2462+
2463+
@FieldContext(
2464+
category = CATEGORY_LOAD_BALANCER,
2465+
dynamic = true,
2466+
doc = "The target standard deviation of the resource usage across brokers "
2467+
+ "(100% resource usage is 1.0 load). "
2468+
+ "The shedder logic tries to distribute bundle load across brokers to meet this target std. "
2469+
+ "The smaller value will incur load balancing more frequently. "
2470+
+ "(only used in load balancer extension TransferSheddeer)"
2471+
)
2472+
private double loadBalancerBrokerLoadTargetStd = 0.25;
2473+
2474+
@FieldContext(
2475+
category = CATEGORY_LOAD_BALANCER,
2476+
dynamic = true,
2477+
doc = "Option to enable the bundle transfer mode when distributing bundle loads. "
2478+
+ "On: transfer bundles from overloaded brokers to underloaded "
2479+
+ "-- pre-assigns the destination broker upon unloading). "
2480+
+ "Off: unload bundles from overloaded brokers "
2481+
+ "-- post-assigns the destination broker upon lookups). "
2482+
+ "(only used in load balancer extension TransferSheddeer)"
2483+
)
2484+
private boolean loadBalancerTransferEnabled = true;
2485+
2486+
@FieldContext(
2487+
category = CATEGORY_LOAD_BALANCER,
2488+
dynamic = true,
2489+
doc = "Maximum number of brokers to transfer bundle load for each unloading cycle. "
2490+
+ "The bigger value will incur more unloading/transfers for each unloading cycle. "
2491+
+ "(only used in load balancer extension TransferSheddeer)"
2492+
)
2493+
private int loadBalancerMaxNumberOfBrokerTransfersPerCycle = 3;
2494+
2495+
@FieldContext(
2496+
category = CATEGORY_LOAD_BALANCER,
2497+
dynamic = true,
2498+
doc = "Delay (in seconds) to the next unloading cycle after unloading. "
2499+
+ "The logic tries to give enough time for brokers to recompute load after unloading. "
2500+
+ "The bigger value will delay the next unloading cycle longer. "
2501+
+ "(only used in load balancer extension TransferSheddeer)"
2502+
)
2503+
private long loadBalanceUnloadDelayInSeconds = 600;
2504+
2505+
@FieldContext(
2506+
category = CATEGORY_LOAD_BALANCER,
2507+
dynamic = true,
2508+
doc = "Broker load data time to live (TTL in seconds). "
2509+
+ "The logic tries to avoid (possibly unavailable) brokers with out-dated load data, "
2510+
+ "and those brokers will be ignored in the load computation. "
2511+
+ "When tuning this value, please consider loadBalancerReportUpdateMaxIntervalMinutes. "
2512+
+ "The current default is loadBalancerReportUpdateMaxIntervalMinutes * 2. "
2513+
+ "(only used in load balancer extension TransferSheddeer)"
2514+
)
2515+
private long loadBalancerBrokerLoadDataTTLInSeconds = 1800;
2516+
24532517
/**** --- Replication. --- ****/
24542518
@FieldContext(
24552519
category = CATEGORY_REPLICATION,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.models;
20+
21+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
22+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Skip;
23+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Success;
24+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Balanced;
25+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBundles;
26+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoLoadData;
27+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
28+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
29+
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
30+
import com.google.common.collect.ArrayListMultimap;
31+
import com.google.common.collect.Multimap;
32+
import lombok.Data;
33+
34+
/**
35+
* Defines the information required to unload or transfer a service unit(e.g. bundle).
36+
*/
37+
@Data
38+
public class UnloadDecision {
39+
Multimap<String, Unload> unloads;
40+
Label label;
41+
Reason reason;
42+
Double loadAvg;
43+
Double loadStd;
44+
public enum Label {
45+
Success,
46+
Skip,
47+
Failure
48+
}
49+
public enum Reason {
50+
Overloaded,
51+
Underloaded,
52+
Balanced,
53+
NoBundles,
54+
CoolDown,
55+
OutDatedData,
56+
NoLoadData,
57+
NoBrokers,
58+
Unknown
59+
}
60+
61+
public UnloadDecision() {
62+
unloads = ArrayListMultimap.create();
63+
label = null;
64+
reason = null;
65+
loadAvg = null;
66+
loadStd = null;
67+
}
68+
69+
public void clear() {
70+
unloads.clear();
71+
label = null;
72+
reason = null;
73+
loadAvg = null;
74+
loadStd = null;
75+
}
76+
77+
public void skip(int numOfOverloadedBrokers,
78+
int numOfUnderloadedBrokers,
79+
int numOfBrokersWithEmptyLoadData,
80+
int numOfBrokersWithFewBundles) {
81+
label = Skip;
82+
if (numOfOverloadedBrokers == 0 && numOfUnderloadedBrokers == 0) {
83+
reason = Balanced;
84+
} else if (numOfBrokersWithEmptyLoadData > 0) {
85+
reason = NoLoadData;
86+
} else if (numOfBrokersWithFewBundles > 0) {
87+
reason = NoBundles;
88+
} else {
89+
reason = Unknown;
90+
}
91+
}
92+
93+
public void skip(Reason reason) {
94+
label = Skip;
95+
this.reason = reason;
96+
}
97+
98+
public void succeed(
99+
int numOfOverloadedBrokers,
100+
int numOfUnderloadedBrokers) {
101+
102+
label = Success;
103+
if (numOfOverloadedBrokers > numOfUnderloadedBrokers) {
104+
reason = Overloaded;
105+
} else {
106+
reason = Underloaded;
107+
}
108+
}
109+
110+
111+
public void fail() {
112+
label = Failure;
113+
reason = Unknown;
114+
}
115+
116+
117+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceUnloadStrategy.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
2020

21-
import java.util.List;
2221
import java.util.Map;
2322
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
24-
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
23+
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
2524

2625
/**
2726
* The namespace unload strategy.
@@ -34,12 +33,13 @@ public interface NamespaceUnloadStrategy {
3433
/**
3534
* Recommend that all the returned bundles be unloaded.
3635
*
37-
* @param context The context used for decisions.
38-
* @param recentlyUnloadedBundles
39-
* The recently unloaded bundles.
40-
* @return A list of the bundles that should be unloaded.
36+
* @param context The context used for decisions.
37+
* @param recentlyUnloadedBundles The recently unloaded bundles.
38+
* @param recentlyUnloadedBrokers The recently unloaded brokers.
39+
* @return unloadDecision containing a list of the bundles that should be unloaded.
4140
*/
42-
List<Unload> findBundlesForUnloading(LoadManagerContext context,
43-
Map<String, Long> recentlyUnloadedBundles);
41+
UnloadDecision findBundlesForUnloading(LoadManagerContext context,
42+
Map<String, Long> recentlyUnloadedBundles,
43+
Map<String, Long> recentlyUnloadedBrokers);
4444

4545
}

0 commit comments

Comments
 (0)