Skip to content

Commit 8145f59

Browse files
committed
resolved comments
1 parent 3fb8583 commit 8145f59

File tree

4 files changed

+223
-143
lines changed

4 files changed

+223
-143
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadData.java

+61-36
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.extensions.data;
2020

21-
import lombok.Data;
21+
import lombok.EqualsAndHashCode;
22+
import lombok.Getter;
2223
import org.apache.pulsar.broker.ServiceConfiguration;
2324
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
2425
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
@@ -30,65 +31,86 @@
3031
* Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
3132
* And removed the lookup data, see {@link BrokerLookupData}
3233
*/
33-
@Data
34+
@Getter
35+
@EqualsAndHashCode
3436
public class BrokerLoadData {
3537

38+
private static final double DEFAULT_RESOURCE_USAGE = 1.0d;
39+
3640
// Most recently available system resource usage.
3741
private ResourceUsage cpu;
3842
private ResourceUsage memory;
3943
private ResourceUsage directMemory;
40-
4144
private ResourceUsage bandwidthIn;
4245
private ResourceUsage bandwidthOut;
4346

4447
// Message data from the most recent namespace bundle stats.
45-
private double msgThroughputIn;
46-
private ResourceUsage msgThroughputInUsage;
47-
private double msgThroughputOut;
48-
private ResourceUsage msgThroughputOutUsage;
49-
private double msgRateIn;
50-
private double msgRateOut;
48+
private double msgThroughputIn; // bytes/sec
49+
private double msgThroughputOut; // bytes/sec
50+
private double msgRateIn; // messages/sec
51+
private double msgRateOut; // messages/sec
5152

52-
// load data features
53-
private Double weightedMaxEMA; // exponential moving average of max of weighted resource usages
53+
// Load data features computed from the above resources.
54+
private double maxResourceUsage; // max of resource usages
55+
private double weightedMaxEMA; // exponential moving average of max of weighted resource usages
56+
private long updatedAt;
5457

5558
public BrokerLoadData() {
5659
cpu = new ResourceUsage();
5760
memory = new ResourceUsage();
5861
directMemory = new ResourceUsage();
5962
bandwidthIn = new ResourceUsage();
6063
bandwidthOut = new ResourceUsage();
61-
msgThroughputInUsage = new ResourceUsage();
62-
msgThroughputOutUsage = new ResourceUsage();
63-
weightedMaxEMA = null;
64+
maxResourceUsage = DEFAULT_RESOURCE_USAGE;
65+
weightedMaxEMA = DEFAULT_RESOURCE_USAGE;
6466
}
6567

6668
/**
67-
* Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
69+
* Using the system resource usage from the Pulsar client, update this BrokerLoadData.
6870
*
69-
* @param systemResourceUsage
71+
* @param usage
7072
* System resource usage (cpu, memory, and direct memory).
73+
* @param msgThroughputIn
74+
* broker-level message input throughput in bytes/s.
75+
* @param msgThroughputOut
76+
* broker-level message output throughput in bytes/s.
77+
* @param msgRateIn
78+
* broker-level message input rate in messages/s.
79+
* @param msgRateOut
80+
* broker-level message output rate in messages/s.
81+
* @param conf
82+
* Service configuration to compute load data features.
7183
*/
72-
public void update(final SystemResourceUsage systemResourceUsage, ServiceConfiguration conf) {
73-
updateSystemResourceUsage(systemResourceUsage);
84+
public void update(final SystemResourceUsage usage,
85+
double msgThroughputIn,
86+
double msgThroughputOut,
87+
double msgRateIn,
88+
double msgRateOut,
89+
ServiceConfiguration conf) {
90+
updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory, usage.bandwidthIn, usage.bandwidthOut);
91+
this.msgThroughputIn = msgThroughputIn;
92+
this.msgThroughputOut = msgThroughputOut;
93+
this.msgRateIn = msgRateIn;
94+
this.msgRateOut = msgRateOut;
7495
updateFeatures(conf);
96+
updatedAt = System.currentTimeMillis();
7597
}
7698

7799
/**
78-
* Using another LocalBrokerData, update this.
100+
* Using another BrokerLoadData, update this.
79101
*
80102
* @param other
81-
* LocalBrokerData to update from.
103+
* BrokerLoadData to update from.
82104
*/
83-
public void update(final BrokerLoadData other, ServiceConfiguration conf) {
105+
public void update(final BrokerLoadData other) {
84106
updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
85-
updateFeatures(conf);
86-
}
87-
88-
// Set the cpu, memory, and direct memory to that of the new system resource usage data.
89-
private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
90-
updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
91-
systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
107+
msgThroughputIn = other.msgThroughputIn;
108+
msgThroughputOut = other.msgThroughputOut;
109+
msgRateIn = other.msgRateIn;
110+
msgRateOut = other.msgRateOut;
111+
weightedMaxEMA = other.weightedMaxEMA;
112+
maxResourceUsage = other.maxResourceUsage;
113+
updatedAt = other.updatedAt;
92114
}
93115

94116
// Update resource usage given each individual usage.
@@ -102,31 +124,34 @@ private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUs
102124
this.bandwidthOut = bandwidthOut;
103125
}
104126

105-
public double getMaxResourceUsage() {
106-
return LocalBrokerData.max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
127+
private void updateFeatures(ServiceConfiguration conf) {
128+
updateMaxResourceUsage();
129+
updateWeightedMaxEMA(conf);
130+
}
131+
132+
private void updateMaxResourceUsage() {
133+
maxResourceUsage = LocalBrokerData.max(cpu.percentUsage(), directMemory.percentUsage(),
134+
bandwidthIn.percentUsage(),
107135
bandwidthOut.percentUsage()) / 100;
108136
}
109137

110-
public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
138+
private double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
111139
final double directMemoryWeight, final double bandwidthInWeight,
112140
final double bandwidthOutWeight) {
113141
return LocalBrokerData.max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
114142
directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
115143
bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
116144
}
117145

118-
private void updateFeatures(ServiceConfiguration conf) {
119-
updateWeightedMaxEMA(conf);
120-
}
121146

122-
public void updateWeightedMaxEMA(ServiceConfiguration conf) {
147+
private void updateWeightedMaxEMA(ServiceConfiguration conf) {
123148
var historyPercentage = conf.getLoadBalancerHistoryResourcePercentage();
124149
var weightedMax = getMaxResourceUsageWithWeight(
125150
conf.getLoadBalancerCPUResourceWeight(),
126151
conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
127152
conf.getLoadBalancerBandwithInResourceWeight(),
128153
conf.getLoadBalancerBandwithOutResourceWeight());
129-
weightedMaxEMA = weightedMaxEMA == null ? weightedMax :
154+
weightedMaxEMA = updatedAt == 0 ? weightedMax :
130155
weightedMaxEMA * historyPercentage + (1 - historyPercentage) * weightedMax;
131156
}
132157

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
*/
3838
@Slf4j
3939
public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy {
40-
private static final double MAX_RESOURCE_USAGE = 1.0d;
4140
// Maintain this list to reduce object creation.
4241
private final ArrayList<String> bestBrokers;
4342
private final Set<String> noLoadDataBrokers;
@@ -52,6 +51,7 @@ private double getMaxResourceUsageWithWeight(final String broker, final BrokerLo
5251
final ServiceConfiguration conf, boolean debugMode) {
5352
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
5453
final var maxUsageWithWeight = brokerLoadData.getWeightedMaxEMA();
54+
5555
if (maxUsageWithWeight > overloadThreshold) {
5656
log.warn(
5757
"Broker {} is overloaded, max resource usage with weight percentage: {}%, "
@@ -128,8 +128,7 @@ public Optional<String> select(List<String> candidates, ServiceUnitId bundleToAs
128128
log.warn("There is no broker load data for broker:{}. Skipping this broker. Phase two", broker);
129129
continue;
130130
}
131-
Double avgResUsageObj = brokerLoadDataOptional.get().getWeightedMaxEMA();
132-
double avgResUsage = avgResUsageObj == null ? MAX_RESOURCE_USAGE : avgResUsageObj.doubleValue();
131+
double avgResUsage = brokerLoadDataOptional.get().getWeightedMaxEMA();
133132
if ((avgResUsage + diffThreshold <= avgUsage && !noLoadDataBrokers.contains(broker))) {
134133
bestBrokers.add(broker);
135134
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLoadDataTest.java

+87-40
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.extensions.data;
2020

21+
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
2123
import static org.testng.Assert.assertEquals;
2224

2325
import org.apache.pulsar.broker.ServiceConfiguration;
2426
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
27+
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
2528
import org.testng.annotations.Test;
2629

2730
/**
@@ -32,31 +35,75 @@
3235
public class BrokerLoadDataTest {
3336

3437
@Test
35-
public void testMaxResourceUsage() {
38+
public void testUpdateBySystemResourceUsage() {
39+
40+
ServiceConfiguration conf = new ServiceConfiguration();
41+
conf.setLoadBalancerCPUResourceWeight(0.5);
42+
conf.setLoadBalancerMemoryResourceWeight(0.5);
43+
conf.setLoadBalancerDirectMemoryResourceWeight(0.5);
44+
conf.setLoadBalancerBandwithInResourceWeight(0.5);
45+
conf.setLoadBalancerBandwithOutResourceWeight(0.5);
46+
conf.setLoadBalancerHistoryResourcePercentage(0.75);
47+
3648
BrokerLoadData data = new BrokerLoadData();
37-
data.setCpu(new ResourceUsage(1.0, 100.0));
38-
data.setMemory(new ResourceUsage(800.0, 200.0));
39-
data.setDirectMemory(new ResourceUsage(2.0, 100.0));
40-
data.setBandwidthIn(new ResourceUsage(3.0, 100.0));
41-
data.setBandwidthOut(new ResourceUsage(4.0, 100.0));
42-
43-
double epsilon = 0.00001;
44-
double weight = 0.5;
45-
// skips memory usage
46-
assertEquals(data.getMaxResourceUsage(), 0.04, epsilon);
47-
48-
assertEquals(
49-
data.getMaxResourceUsageWithWeight(
50-
weight, weight, weight, weight, weight), 2.0, epsilon);
51-
assertEquals(
52-
data.getWeightedMaxEMA(), null);
49+
50+
long now = System.currentTimeMillis();
51+
SystemResourceUsage usage1 = new SystemResourceUsage();
52+
var cpu = new ResourceUsage(1.0, 100.0);
53+
var memory = new ResourceUsage(800.0, 200.0);
54+
var directMemory= new ResourceUsage(2.0, 100.0);
55+
var bandwidthIn= new ResourceUsage(3.0, 100.0);
56+
var bandwidthOut= new ResourceUsage(4.0, 100.0);
57+
usage1.setCpu(cpu);
58+
usage1.setMemory(memory);
59+
usage1.setDirectMemory(directMemory);
60+
usage1.setBandwidthIn(bandwidthIn);
61+
usage1.setBandwidthOut(bandwidthOut);
62+
data.update(usage1, 1,2,3,4, conf);
63+
64+
assertEquals(data.getCpu(), cpu);
65+
assertEquals(data.getMemory(), memory);
66+
assertEquals(data.getDirectMemory(), directMemory);
67+
assertEquals(data.getBandwidthIn(), bandwidthIn);
68+
assertEquals(data.getBandwidthOut(), bandwidthOut);
69+
assertEquals(data.getMsgThroughputIn(), 1.0);
70+
assertEquals(data.getMsgThroughputOut(), 2.0);
71+
assertEquals(data.getMsgRateIn(), 3.0);
72+
assertEquals(data.getMsgRateOut(), 4.0);
73+
assertEquals(data.getMaxResourceUsage(), 0.04); // skips memory usage
74+
assertEquals(data.getWeightedMaxEMA(), 2);
75+
assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
76+
77+
now = System.currentTimeMillis();
78+
SystemResourceUsage usage2 = new SystemResourceUsage();
79+
cpu = new ResourceUsage(300.0, 100.0);
80+
memory = new ResourceUsage(200.0, 200.0);
81+
directMemory= new ResourceUsage(2.0, 100.0);
82+
bandwidthIn= new ResourceUsage(3.0, 100.0);
83+
bandwidthOut= new ResourceUsage(4.0, 100.0);
84+
usage2.setCpu(cpu);
85+
usage2.setMemory(memory);
86+
usage2.setDirectMemory(directMemory);
87+
usage2.setBandwidthIn(bandwidthIn);
88+
usage2.setBandwidthOut(bandwidthOut);
89+
data.update(usage2, 5,6,7,8, conf);
90+
91+
assertEquals(data.getCpu(), cpu);
92+
assertEquals(data.getMemory(), memory);
93+
assertEquals(data.getDirectMemory(), directMemory);
94+
assertEquals(data.getBandwidthIn(), bandwidthIn);
95+
assertEquals(data.getBandwidthOut(), bandwidthOut);
96+
assertEquals(data.getMsgThroughputIn(), 5.0);
97+
assertEquals(data.getMsgThroughputOut(), 6.0);
98+
assertEquals(data.getMsgRateIn(), 7.0);
99+
assertEquals(data.getMsgRateOut(), 8.0);
100+
assertEquals(data.getMaxResourceUsage(), 3.0);
101+
assertEquals(data.getWeightedMaxEMA(), 1.875);
102+
assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
53103
}
54104

55105
@Test
56-
public void testWeightedMaxEMA() {
57-
BrokerLoadData data = new BrokerLoadData();
58-
assertEquals(
59-
data.getWeightedMaxEMA(), null);
106+
public void testUpdateByBrokerLoadData() {
60107
ServiceConfiguration conf = new ServiceConfiguration();
61108
conf.setLoadBalancerCPUResourceWeight(0.5);
62109
conf.setLoadBalancerMemoryResourceWeight(0.5);
@@ -65,25 +112,25 @@ public void testWeightedMaxEMA() {
65112
conf.setLoadBalancerBandwithOutResourceWeight(0.5);
66113
conf.setLoadBalancerHistoryResourcePercentage(0.75);
67114

68-
BrokerLoadData data2 = new BrokerLoadData();
69-
data2.setCpu(new ResourceUsage(1.0, 100.0));
70-
data2.setMemory(new ResourceUsage(800.0, 200.0));
71-
data2.setDirectMemory(new ResourceUsage(2.0, 100.0));
72-
data2.setBandwidthIn(new ResourceUsage(3.0, 100.0));
73-
data2.setBandwidthOut(new ResourceUsage(4.0, 100.0));
74-
data.update(data2, conf);
75-
assertEquals(
76-
data.getWeightedMaxEMA(), 2);
77-
78-
BrokerLoadData data3 = new BrokerLoadData();
79-
data3.setCpu(new ResourceUsage(300.0, 100.0));
80-
data3.setMemory(new ResourceUsage(200.0, 200.0));
81-
data3.setDirectMemory(new ResourceUsage(2.0, 100.0));
82-
data3.setBandwidthIn(new ResourceUsage(3.0, 100.0));
83-
data3.setBandwidthOut(new ResourceUsage(4.0, 100.0));
84-
data.update(data3, conf);
85-
assertEquals(
86-
data.getWeightedMaxEMA(), 1.875);
115+
BrokerLoadData data = new BrokerLoadData();
116+
BrokerLoadData other = new BrokerLoadData();
117+
118+
SystemResourceUsage usage1 = new SystemResourceUsage();
119+
var cpu = new ResourceUsage(1.0, 100.0);
120+
var memory = new ResourceUsage(800.0, 200.0);
121+
var directMemory= new ResourceUsage(2.0, 100.0);
122+
var bandwidthIn= new ResourceUsage(3.0, 100.0);
123+
var bandwidthOut= new ResourceUsage(4.0, 100.0);
124+
usage1.setCpu(cpu);
125+
usage1.setMemory(memory);
126+
usage1.setDirectMemory(directMemory);
127+
usage1.setBandwidthIn(bandwidthIn);
128+
usage1.setBandwidthOut(bandwidthOut);
129+
other.update(usage1, 1,2,3,4, conf);
130+
data.update(other);
87131

132+
assertEquals(data, other);
88133
}
134+
135+
89136
}

0 commit comments

Comments
 (0)