You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository was archived by the owner on Apr 1, 2024. It is now read-only.
As PIP-192 enables the bundle transfer protocol, we could implement a new shedding strategy to specify a new owner broker.
Improve the edge cases in the current shedders:
More aggressive load balance strategy: https://docs.google.com/document/d/1nXaiSK39E10awqinnDUX5Sial79FSTczFCmMdc9q8o8/ (reported)
We need to revisit the current load balance strategy to balance the load more evenly and frequently. With the current static threshold approach, sometimes it appears that the load balance is not working when 1. the threshold is set too high, and overall usage is relatively low, and 2 when new brokers are added.
Case 1) broker usages [12%, 2%, 1%, 1%], 10% default threshold, avg_usage= 4%, all brokers’ usage is below avg_usage+threshold(14%), no shedding will happen.
Case 2) broker usages [50%, 50%, 50%, 50%, 50%, 0%(new broker)], 10% default threshold, avg_usage= 41.6%, all brokers’ usage is below avg_usage+threshold(51.6%), no shedding will happen.
Even with semi-optimal initial broker assignments, this load balance logic should be aggressive enough to guarantee the “load balance” over time.
add bundle msg throughput signal when computing broker resource usage
more aggressive unloading to the new broker
minimize the required configs to tune.
improve the accuracy of broker load data normalization(clean lingering load data after transfers)
optimize the number of bundle unloading for balancing load in the cluster.
clarify the global load-balance optimization target(clarify the epoch error function)
This algo will be only used in the new broker load balancer introduced in PIP-192
API Changes
No.
Implementation
Pseudo code
The idea is straightforward. We want to keep unloading bundles from max loaded broker to min loaded broker until the standard deviation of the broker load distribution is below our target.
The following is the Pseudo code.
// compute load data for each broker
for( broker_load_data in active_brokers) {
// we don't want to use the outdated load data before the last transfer
// , and we should give enough time for each broker to recompute its load after transfers
if(broker_load_data.timestamp - last_transfer_timestamp < x secs){
continue;
}
// max(cpu, memory, dic_memory, network_in, network_out, msg_throughput_in, msg_throughput_out)
cur_load = compute_load(brokerLoadData)
load = normalize(cur_load)
load_map.put(broker, load)
top_k_min_load_brokers.add(broker, load);
top_k_max_load_brokers.add(broker, load);
}
// compute std
std = standard_deviation(load_map, offload_map)
// force-unload if min_broker is a new broker
for(int i =0; i < max_transfer_cnt && (std > std_threshold || top_k_min_load_broker.peek().msg_throughput == 0 ); i++){
(dst_broker, dst_load) = top_k_min_load_brokers.pop()
(src_broker, src_load) = top_k_max_load_brokers.pop()
if(dst_broker== null|| src_broker==null || dst_broker == src_broker ) return;
// we could adjust this offload_percent by other threshold configs
offload_percent = (src_load - dst_load) / 2
offload_throughput = offload_percent * src_broker.throughput
// Transfer bundles, from highest loaded to lowest, from src_broker to dst_broker til sum(bundle.throughput) < offload_throughput
...
// mark offload_throughput
offload_map.put(dst_broker, -offload_percent)
offload_map.set(src_broker, offload_percent)
transferred_brokers.add(dst_broker)
transferred_brokers.add(src_broker)
// recompute std by considering the offload_throughput
std = standard_deviation(load_map, offload_map)
}
// clean load caches.
// we need to track new load data to avoid repeated transfers.
offload_map.clear()
for (broker : transferred_brokers) {
load_map.remove(broker)
}
// mark the timestamp at the end of the transfer.
last_transfer_timestamp = now()
normalize(cur_load){
// this is an exponential moving window version
// we could make this normalization configurable for other configurable methods
return historical_load_weight * prev_load + ( 1 - historical_load_weight) * cur_load
}
standard_deviation(load_map, offload_map){
// for each broker recompute load considering offload_map
return std(load = load_map.get(broker) - offload_map.get(broker));
}
Default Configurable Parameters
max_transfer_cnt = 3 // max number of transfers per unload cycle( 1 min by default)
std_threshold = 15 // load standard deviation threshold (target load distribution)
Theoretical Load Balance Epochs(Transfer Counts) per Cluster Size for Target std=15
Load Balance Epochs means the required execution count of the transfer runs to reach the target global load distribution(load standard deviation threshold)
This data shows how many epochs are required to tune the max_transfer_cnt config.
In general, the number of required transfer cycles(minutes) for (target std=15) = epochs / max_transfer_cnt
Cluster Size(number of brokers)
Epochs
10
3
20
5
40
8
80
15
100
19
200
36
300
54
400
72
500
89
600
107
700
125
800
143
900
160
1000
178
Alternatives
N/A
Anything else?
No response
The text was updated successfully, but these errors were encountered:
heesung-sn
changed the title
ISSUE-18215: PIP-219: TransferShedder
ISSUE-18215: PIP-220: TransferShedder
Oct 27, 2022
sijie
changed the title
ISSUE-18215: PIP-220: TransferShedder
ISSUE-18215: PIP-220: TransferShedder (Only for PIP-192 New Broker Load Manager )
Jan 10, 2023
Original Issue: apache#18215
Motivation
bundle transfer protocol
, we could implement a new shedding strategy to specify a new owner broker.More aggressive load balance strategy: https://docs.google.com/document/d/1nXaiSK39E10awqinnDUX5Sial79FSTczFCmMdc9q8o8/ (reported)
We need to revisit the current load balance strategy to balance the load more evenly and frequently. With the current static threshold approach, sometimes it appears that the load balance is not working when 1. the threshold is set too high, and overall usage is relatively low, and 2 when new brokers are added.
Even with semi-optimal initial broker assignments, this load balance logic should be aggressive enough to guarantee the “load balance” over time.
Repeated Shedding due to highly-weighted historical load: PIP-217: LoadShedding Strategy Improment apache/pulsar#18173 (reported)
Goal
bundle transfer
protocol introduced in PIP-192: New Pulsar Broker Load Balancer apache/pulsar#16691API Changes
No.
Implementation
Pseudo code
The idea is straightforward. We want to keep unloading bundles from max loaded broker to min loaded broker until the standard deviation of the broker load distribution is below our target.
The following is the Pseudo code.
max_transfer_cnt = 3 // max number of transfers per unload cycle( 1 min by default)
std_threshold = 15 // load standard deviation threshold (target load distribution)
Theoretical Load Balance Epochs(Transfer Counts) per Cluster Size for Target std=15
This data shows how many
epochs
are required to tune themax_transfer_cnt
config.In general, the number of required transfer cycles(minutes) for (target std=15) =
epochs
/max_transfer_cnt
Alternatives
N/A
Anything else?
No response
The text was updated successfully, but these errors were encountered: