Pulsar has two load balance interfaces:
LoadSheddingStrategy
is an unloading strategy that identifies high load brokers and unloads some of the bundles they carry to reduce the load.ModularLoadManagerStrategy
is a placement strategy responsible for assigning bundles to brokers.
There are three available algorithms: ThresholdShedder
, OverloadShedder
, UniformLoadShedder
.
ThresholdShedder
uses the following method to calculate the maximum resource utilization rate for each broker,
which includes CPU, direct memory, bandwidth in, and bandwidth out.
public double getMaxResourceUsageWithWeight(final double cpuWeight,
final double directMemoryWeight, final double bandwidthInWeight,
final double bandwidthOutWeight) {
return max(cpu.percentUsage() * cpuWeight,
directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
}
After calculating the maximum resource utilization rate for each broker, a historical weight algorithm will also be executed to obtain the final score.
historyUsage = historyUsage == null ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
The historyPercentage is determined by configuring the loadBalancerHistoryResourcePercentage
.
The default value is 0.9, which means that the last calculated score accounts for 90%,
while the current calculated score only accounts for 10%.
The introduction of this historical weight algorithm is to avoid bundle switching caused by short-term abnormal load increase or decrease, but in fact, this algorithm will introduce some serious problems, which will be explained in detail later.
Next, calculate the average score of all brokers in the entire cluster: avgUsage=totalUsage/totalBrokers
.
When the score of any broker exceeds a certain threshold of avgUsage, it is determined that the broker is overloaded.
The threshold is determined by the configuration loadBalancerBrokerThresholdShedderPercentage
, with a default value of 10.
OverloadShedder
use the same method getMaxResourceUsageWithWeight
to calculate the maximum resource utilization rate for each broker.
The difference is that OverloadShedder
will not use the historical weight algorithm to calculate the final score,
the final score is the current maximum resource utilization rate of the broker.
After obtaining the load score for each broker, compare it with the loadBalancerBrokerOverloadedThresholdPercentage
.
If the threshold is exceeded, it is considered overloaded, with a default value of 85%.
This algorithm is relatively simple, but there are many serious corner cases, so it is not recommended to use OverloadShedder
.
Here are two cases:
- When the load on each broker in the cluster reaches the threshold, the bundle unload will continue to be executed, but it will only switch from one overloaded broker to another, which is meaningless.
- If there are no broker whose load reaches the threshold, adding new brokers will not balance the traffic to the new added brokers. The impact of these two points is quite serious, so we won't talk about it next.
UniformLoadShedder
will first calculate the maximum and minimum message rates, as well as the maximum and minimum
traffic throughput and corresponding broker. Then calculate the maximum and minimum difference, with two thresholds
corresponding to message rate and throughput size, respectively.
- loadBalancerMsgRateDifferenceShedderThreshold
The message rate percentage threshold between the highest and lowest loaded brokers, with a default value of 50, can trigger bundle unload when the maximum message rate is 1.5 times the minimum message rate. For example, broker 1 with 50K msgRate and broker 2 with 30K msgRate will have a (50-30)/30=66%>50% difference in msgRate, and the load balancer can unload the bundle from broker 1 to broker 2.
- loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold
The threshold for the message throughput multiplier between the highest and lowest loaded brokers, with a default value of 4, can trigger bundle unload when the maximum throughput is 4 times the minimum throughput. For example, if the msgRate of broker 1 is 450MB, broker 2 is 100MB, and the difference in msgThrough is 450/100=4.5>4 times, then the load balancer can unload the bundle from broker 1 to broker 2.
After introducing the algorithm of UniformLoadShedder
, we can clearly obtain the following information:
UniformLoadShedder
does not have the logic to handle load jitter. For example,
when the traffic suddenly increases or decreases. This load data point is adopted, triggering a bundle unload.
However, the traffic of this topic will soon return to normal, so it is very likely to trigger a bundle unload again.
This type of bundle unload should be avoided. This kind of scenario is very common, actually.
UniformLoadShedder
does not rely on indicators such as CPU usage and network card usage to determine high load
and low load brokers, but rather determines them based on message rate and traffic throughput size,
while ThresholdShedder
and OverloadShedder
rely on machine resource indicators such as CPU usage to determine.
If the cluster is heterogeneous, such as different machines with different hardware configurations,
or if there are other processes sharing resources on the machine where the broker is located,
UniformLoadShedder
is likely to misjudge high and low load brokers, thereby migrating the load from high-performance
but low load brokers to low-performance but high load brokers.
Therefore, it is not recommended for users to use UniformLoadShedder
in heterogeneous environments.
UniformLoadShedder
will only unload the bundle from one of the highest loaded brokers at a time,
which may take a considerable amount of time for a large cluster to complete all load balancing tasks.
For example, if there are 100 high load brokers in the current cluster and 100 new machines to be added,
it is roughly estimated that it will take 100 shedding to complete the balancing.
However, since the execution time interval of the LoadSheddingStrategy
policy is determined by the
configuration of loadBalancerSheddingIntervalMinutes
, which defaults to once every 1 minute,
so it will take 100 minutes to complete all tasks. For users using large partition topics, their tasks
are likely to be disconnected multiple times within this 100 minutes, which greatly affects the user experience.
The LoadSheddingStrategy
strategy is used to unload bundles of high load brokers. However, in order to
achieve a good load balancing effect, it is necessary not only to "unload" correctly, but also to "load" correctly.
The ModularLoadManagerStrategy
strategy is responsible for assigning bundles to brokers.
The coordination between LoadSheddingStrategy
and ModularLoadManagerStrategy
is also a key point worth paying attention to.
The LeastLongTermMessageRate
algorithm directly used the maximum resource usage of CPU and so on as the broker's score,
and reused the OverloadShedder
configuration, loadBalancerBrokerOverloadedThresholdPercentage
.
If the score is greater than it (default 85%), set score=INF
; Otherwise, update the broker's score to the sum of the
message in and out rates obtained from the broker's long-term aggregation.
score = longTerm MsgIn rate+longTerm MsgOut rate,
Finally, randomly select a broker from the broker with the lowest score to return. If the score of each broker is INF, randomly select broker from all brokers.
The scoring algorithm in LeastLongTermMessageRate
is essentially based on message rate. Although it initially examines
the maximum resource utilization, it is to exclude overloaded brokers only.
Therefore, in most cases, brokers are sorted based on the size of the message rate as a score, which results in the same
issues with heterogeneous environments, similar to UniformLoadShedder
.
Next, we will attempt to analyze the effect together with the LoadSheddingStrategy
.
-
LeastLongTermMessageRate + OverloadShedder This is the initial combination, but due to some inherent flaws in
OverloadShedder
, it is not recommended. -
LeastLongTermMessageRate + ThresholdShedder This combination is even worse than
LeastLongTermMessageRate + OverloadShedder
and is not recommended. BecauseOverloadShedder
uses the maximum weighted resource usage and historical score to score brokers, while LeastLongTermMessage Rate is scored based on message rate. Inconsistent unloading and placement criteria can lead to incorrect load balancing execution. This is also why a new placement strategyLeastResourceUsageWithWeight
will be introduced later. -
LeastLongTermMessageRate + UniformLoadShedder This is recommended. Both uninstallation and placement policy are based on message rate, but using message rate as a standard naturally leads to issues with heterogeneous environments.
LeastResourceUsageWithWeight
uses the same scoring algorithm as ThresholdShedder
to score brokers, which uses
weighted maximum resource usage and historical scores to calculate the current score.
Next, select candidate brokers based on the configuration of loadBalancerAverageResourceUsageDifferenceThresholdPercentage
.
If a broker's score plus this threshold is still not greater than the average score, the broker will be added to the
candidate broker list. After obtaining the candidate broker list, a broker will be randomly selected from it;
If there are no candidate brokers, randomly select from all brokers.
For example, if the resource utilization rate of broker 1 is 10%, broker 2 is 30%, and broker 3 is 80%, the average resource utilization rate is 40%. The placement strategy can choose Broker1 and Broker2 as the best candidates, as the thresholds are 10, 10+10<=40, 30+10<=40. In this way, the bundles uninstalled from broker 3 will be evenly distributed among broker 1 and broker 2, rather than being completely placed on broker 1.
Over placement problem is that the bundle is placed on high load brokers and make them overloaded.
In practice, it will be found that it is difficult to determine a suitable value for loadBalancerAverageResourceUsageDifferenceThresholdPercentage
,
which often triggers a fallback global random selection logic. For example, if there are 6 brokers in the current
cluster, with scores of 40, 40, 40, 40, 69, and 70 respectively, the average score is 49.83.
Using the default configuration, there are no candidate brokers because 40+10>49.83.
Triggering a bottom-up global random selection logic and the bundle may be offloaded from the overloaded broker5
to the overloaded broker6, or vice versa, causing the over placement problem.
Attempting to reduce the configuration value to expand the random pool, such as setting it to 0, may also include some overloaded brokers in the candidate broker list. For example, if there are 5 brokers in the current cluster with scores of 10, 60, 70, 80, and 80 respectively, the average score is 60. As the configuration value is 0, then broker 1 and broker 2 are both candidate brokers. If broker 2 shares half of the offloaded traffic, it is highly likely to overload.
Therefore, it is difficult to configure the LeastResourceUsageWithWeight
algorithm well to avoid incorrect load balancing.
Of course, if you want to use the ThresholdShedder
algorithm, the combination of ThresholdShedder+LeastResourceUsageWithWeight
will still be superior to the combination of ThresholdShedder+LeastLongTermMessageRate
, because at least the scoring algorithm
of LeastResourceUsageWithWeight
is consistent with that of ThresholdShedder
.
The root of over placement problem is that the frequency of updating the load data is limited due to the performance of zookeeper. If we assign a bundle to a broker, the broker's load will increase after a while, and it's load data also need some time to be updated to leader broker. If there are many bundles unloaded in a shedding, how can we assign these bundles to brokers?
The most simple way is to assign them to the broker with the lowest load, but it may cause the over placement problem
as it is most likely that there is only one single broker with the lowest load. With all bundles assigned to this broker,
it will be overloaded. This is the reason why LeastResourceUsageWithWeight
try to determine a candidate broker list
to avoid the over placement problem. But we also find that candidate broker list can be empty or include some overloaded
brokers, which will also cause the over placement problem.
So why doesn't LeastLongTermMessageRate
have over placement problem? The reason is that each time a bundle is assigned,
the bundle will be added into PreallocatedBundleData
. When scoring a broker, not only will the long-term message rate
aggregated by the broker itself be used, but also the message rate of bundles in PreallocatedBundleData
that have been
assigned to the broker but have not yet been reflected in the broker's load data will be calculated.
For example, if there are two bundles with 20KB/s message rate to be assigned, and broker1 and broker2 at 100KB/s
and 110KB/s respectively. The first bundle is assigned to broker1, However, broker1's load data will not be updated
in the short term. Before the load data is updated, LeastLongTermMessageRate
try to assign the second bundle.
At this time, the score of broker1 is 100+20=120KB/s, where 20KB/s is the message rate of the first bundle
from PreallocatedBundleData
. As broker1's score is greater than broker2, the second bundle will be assigned to broker2.
LeastLongTermMessageRate
predict the load of the broker after the bundle is assigned to avoid the over placement problem.
Why doesn't LeastResourceUsageWithWeight
have this feature? Because it is not possible to predict how much resource
utilization a broker will increase when loading a bundle. All algorithms scoring brokers based on resource utilization
can't fix the over placement problem with this feature.
So LeastResourceUsageWithWeight
try to determine a candidate broker list to avoid the over placement problem, which is
proved to be not a good solution.
Over unloading problem is that the load offloaded from high load brokers is too much and make them underloaded.
Finally, let's talk about the issue of historical weighted scoring algorithms. The historical weighted scoring algorithm
is used by the ThresholdShedder
and LeastResourceUsageWithWeight
algorithms, as follows:
HistoryUsage=historyUsage=null? ResourceUsage: historyUsage * historyPercentage+(1- historyPercentage) * resourceUsage;
The default value of historyPercentage is 0.9, indicating that the score calculated last time has a significant impact on the current score. The current maximum resource utilization only accounts for 10%, which is to solves the problem of load jitter. However, introducing this algorithm has its side effects, such as over unloading problem.
For example, there is currently one broker1 in the cluster with a load of 90%, and broker2 is added with a current load of 10%.
-
At the first execution of shedding: broker1 scores 90, broker2 scores 10. For simplicity, assuming that the algorithm will move some bundles to make their load the same, thus the true load of broker 1 and broker 2 become 50 after load shedding is completed.
-
At the second execution of shedding: broker1 scores 900.9+500.1=86, broker2 scores 100.9+500.1=14. Note that the actual load of broker1 here is 50, but it is overestimated as 86! The true load of broker2 is also 50, but it is underestimated at 14! Due to the significant difference in ratings between the two, although their actual loads are already the same, broker1 will continue to unload traffic corresponding to 36 points from broker1 to broker2, resulting in broker1's actual load score becoming 14, broker2's actual load score becoming 86.
-
At the third execution of shedding: broker1 scored 860.9+140.1=78.8, broker2 scored 140.9+860.1=21.2. It is ridiculous that broker1 is still considered overloaded, and broker2 is still considered underloaded. All loads in broker1 are moved to broker2, which is the over unloading problem.
Although this example is an idealized theoretical analysis, we can still see that using historical scoring algorithms can seriously overestimate or underestimate the true load of the broker. Although it can avoid the problem of load jitter, it will introduce a more serious and broader problem: overestimating or underestimating the true load of the broker, leading to incorrect load balancing execution.
Based on the previous analysis, although we have three shedding strategies and two placement strategies that can generate 6 combinations of 3 * 2, we actually only have two recommended options:
- ThresholdShedder + LeastResourceUsageWithWeight
- UniformLoadShedder + LeastLongTermMessageRate
These two options each have their own advantages and disadvantages, and users can choose one according to their requirements. The following table summarizes the advantages and disadvantages of the two options:
Combination | heterogeneous environment | load jitter | over placement problem | over unloading problem | slow load balancing |
---|---|---|---|---|---|
ThresholdShedder + LeastResourceUsageWithWeight | normal(1) | good | bad | bad | normal(1) |
UniformLoadShedder + LeastLongTermMessageRate | bad(2) | bad | good | good | normal(1) |
-
In terms of adapting to heterogeneous environments,
ThresholdShedder+LeastResourceUsageWithWeight
can only be rated asnormal
. This is becauseThresholdShedder
is not fully adaptable to heterogeneous environments. Although it does not misjudge overloaded brokers as underloaded, heterogeneous environments can still have a significant impact on the load balancing effect ofThresholdShedder
. For example, there are three brokers in the current cluster with resource utilization rates of 10, 50, and 70, respectively. Broker1 and Broker2 are isomorphic. Though Broker3 don't bear any load, its resource utilization rate has reached to 70 due to the deployment of other processes at the same machine. At this point, we would like broker 1 to share some of the pressure from broker2, but since the average load is 43.33, 43.33+10>50, broker2 will not be judged as overloaded, and overloaded broker 3 also has no traffic to unload, causing the load balancing algorithm to be in an inoperable state. -
In the same scenario, if
UniformLoadShedder+LeastLongTermMessageRate
is used, the problem will be more severe, as some of the load will be offloaded from broker2 to broker3. As a result, the performance of those topics in broker3 services will experience significant performance degradation. Therefore, it is not recommended to run Pulsar in heterogeneous environments as current load balancing algorithms cannot adapt too well. If it is unavoidable, it is recommended to chooseThresholdShedder+LeastResourceUsageWithWeight
. -
In terms of load balancing speed, although
ThresholdShedder+LeastResourceUsageWithWeight
can unload the load of all overloaded brokers at once, historical scoring algorithms can seriously affect the accuracy of load balancing decisions. Therefore, in reality, it also requires multiple load balancing executions to finally stabilize. This is why the load balancing speed ofThresholdShedder+LeastResourceUsageWithWeight
is rated asnormal
. -
In terms of load balancing speed,
UniformLoadShedder+LeastLongTermMessageRate
can only unload the load of one overloaded broker at a time, so it takes a long time to complete load balancing when there are many brokers, so it is also rated asnormal
.
The current load balance algorithm has some serious problems, such as load jitter, heterogeneous environment, slow load balancing, etc.
This PIP aims to introduce a new load balance algorithm AvgShedder
to solve these problems.
Introduce a new load balance algorithm AvgShedder
that can solve the problems of load jitter, heterogeneous environment, slow load balancing, etc.
First of all, to determine high load brokers, it is necessary to rate and sort them. Currently, there are two scoring criteria:
- Resource utilization rate of broker
- The message rate and throughput of the broker
Based on the previous analysis, it can be seen that scoring based on message rate and throughput will face
the same problem as
UniformLoadShedder
in heterogeneous environments, while scoring based on resource utilization rate will face the over placement problem likeLeastResourceUsageWithWeight
.
To solve the problem of heterogeneous environments, we use the resource utilization rate of the broker as the scoring criterion.
So how can we avoid the over placement problem? The key is to bind the shedding and placement strategies together. If every bundle unloaded from the high load broker is assigned to the right low load broker in shedding strategy, the over placement problem will be solved.
For example, if the broker rating of the current cluster is 20,30,52,80,80, and the shedding and placement strategies are decoupled, the bundles will be unloaded from the two brokers with score of 80, and then all these bundles will be placed on the broker with a score of 20, causing the over placement problem.
If the shedding and placement strategies are coupled, one broker with 80 score can unload some bundles to a broker with 20 score, and another broker with 80 score can unload the bundle to the broker with 30 score. In this way, we can avoid the over placement problem.
We will first pick out the highest and lowest loaded brokers, and then evenly distribute the traffic between them.
For example, if the broker rating of the current cluster is 20,30,52,70,80, and the message rate of the highest loaded broker is 1000, the message rate of the lowest loaded broker is 500. We introduce a threshold to whether trigger the bundle unload, for example, the threshold is 40. As the difference between the score of the highest and lowest loaded brokers is 100-50=50>40, the shedding strategy will be triggered.
To achieve the goal of evenly distributing the traffic between the highest and lowest loaded brokers, the shedding strategy will try to make the message rate of two brokers the same, which is (1000+500)/2=750. The shedding strategy will unload 250 message rate from the highest loaded broker to the lowest loaded broker. After the shedding strategy is completed, the message rate of two brokers will be same, which is 750.
As we mentioned earlier in UniformLoadShedder
, if strategy only handles one high load broker at a time, it will take a long time to
complete all load balancing tasks. Therefore, we further optimize it by matching multiple pairs of high and low load brokers in
a single shedding. After sorting the broker scores, the first and last place are paired, the second and and the second to last are paired,
and so on. When the score difference between the two paired brokers is greater than the threshold, the load will be evenly distributed
between the two, which can solve the problem of slow speed.
For example, if the broker rating of the current cluster is 20,30,52,70,80, we will pair 20 and 80, 30 and 70. As the difference between the two paired brokers is 80-20=60, 70-30=40, which are both greater than the threshold 40, the shedding strategy will be triggered.
What about the historical weighting algorithm used in ThresholdShedder
? It is used to solve the problem of load jitter, but previous
analysis and experiments have shown that it can bring serious negative effects, so we can no longer use this method to solve the
problem of load jitter.
We mimic the way alarms are triggered: the threshold is triggered multiple times before the bundle unload is finally triggered. For example, when the difference between a pair of brokers exceeds the threshold three times, load balancing is triggered.
In situations of cluster rolling restart or expansion, there is often a significant load difference between different brokers, and we hope to complete load balancing more quickly.
Therefore, we introduce two thresholds:
- loadBalancerAvgShedderLowThreshold, default value is 15
- loadBalancerAvgShedderHighThreshold, default value is 40
Two thresholds correspond to two continuous hit count requirements:
- loadBalancerAvgShedderHitCountLowThreshold, default value is 8
- loadBalancerAvgShedderHitCountHighThreshold, default value of 2
When the difference in scores between two paired brokers exceeds the loadBalancerAvgShedderLowThreshold
by
loadBalancerAvgShedderHitCountLowThreshold
times, or exceeds the loadBalancerAvgShedderHighThreshold
by
loadBalancerAvgShedderHitCountHighThreshold
times, a bundle unload is triggered.
For example, with the default value, if the score difference exceeds 15, it needs to be triggered 8 times continuously,
and if the score difference exceeds 40, it needs to be triggered 2 times continuously.
The larger the load difference between brokers, the smaller the number of times it takes to trigger bundle unloads, which can adapt to scenarios such as cluster rolling restart or expansion.
As mentioned earlier, AvgShedder
bundles the shedding and placement strategies, and a bundle has already determined
its next owner broker based on the shedding strategy during shedding. But we not only use placement strategies after
executing shedding, but also need to use placement strategies to assign bundles during cluster initialization, rolling
restart, and broker shutdown. So how should we assign these bundles without shedding strategies?
We use a hash allocation method: hash mapping a random number to broker. Hash mapping roughly conforms to a uniform distribution, so bundles will be roughly evenly distributed across all brokers. However, due to the different throughput between different bundles, the cluster will exhibit a certain degree of imbalance. However, this problem is not significant, and the subsequent balancing can be achieved through shedding strategies. Moreover, the frequency of cluster initialization, rolling restart, and broker shutdown scenarios is not high, so the impact is slight.
In summary, AvgShedder
can solve the problems of load jitter, heterogeneous environment, slow load balancing, etc.
Following table summarizes the advantages and disadvantages of the three options:
Combination | heterogeneous environment | load jitter | over placement problem | over unloading problem | slow load balancing |
---|---|---|---|---|---|
ThresholdShedder + LeastResourceUsageWithWeight | normal | good | bad | bad | normal |
UniformLoadShedder + LeastLongTermMessageRate | bad | bad | good | good | normal |
AvgShedder | normal | good | good | good | good |
To avoid introducing too many configurations when calculating how much traffic needs to be unloaded, AvgShedder
reuses the
following three UniformLoadShedder
configurations:
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the minimum message that triggers unload."
)
private int minUnloadMessage = 1000;
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload."
)
private int minUnloadMessageThroughput = 1 * 1024 * 1024;
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the maximum unload ratio."
)
private double maxUnloadPercentage = 0.2;
The maxUnloadPercentage
controls the allocation ratio. Although the default value is 0.2, our goal is to evenly distribute the
pressure between two brokers. Therefore, we set the value to 0.5, so that after load balancing is completed, the message rate/throughput
of the two brokers will be almost equal.
The following configurations are introduced to control the shedding strategy:
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "The low threshold for the difference between the highest and lowest loaded brokers."
)
private int loadBalancerAvgShedderLowThreshold = 15;
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "The high threshold for the difference between the highest and lowest loaded brokers."
)
private int loadBalancerAvgShedderHighThreshold = 40;
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "The number of times the low threshold is triggered before the bundle is unloaded."
)
private int loadBalancerAvgShedderHitCountLowThreshold = 8;
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "The number of times the high threshold is triggered before the bundle is unloaded."
)
private int loadBalancerAvgShedderHitCountHighThreshold = 2;
Fully compatible.
- Mailing List discussion thread: https://lists.apache.org/thread/cy39b6jp38n38zyzd3bbw8b9vm5fwf3f
- Mailing List voting thread: https://lists.apache.org/thread/2v9fw5t5m5hlmjkrvjz6ywxjcqpmd02q