Skip to content

Commit d619eb9

Browse files
committed
fixed test failure
1 parent 7e157e8 commit d619eb9

File tree

4 files changed

+75
-22
lines changed

4 files changed

+75
-22
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.extensions.reporter;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ScheduledExecutorService;
2224
import java.util.concurrent.TimeUnit;
2325
import lombok.Getter;
2426
import lombok.extern.slf4j.Slf4j;
@@ -61,7 +63,9 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,
6163

6264
private final BrokerLoadData lastData;
6365

64-
private volatile long lastTombstonedAt;
66+
private final ScheduledExecutorService executor;
67+
68+
private long lastTombstonedAt;
6569

6670
private long tombstoneDelayInMillis;
6771

@@ -71,6 +75,7 @@ public BrokerLoadDataReporter(PulsarService pulsar,
7175
this.brokerLoadDataStore = brokerLoadDataStore;
7276
this.lookupServiceAddress = lookupServiceAddress;
7377
this.pulsar = pulsar;
78+
this.executor = pulsar.getLoadManagerExecutor();
7479
this.conf = this.pulsar.getConfiguration();
7580
if (SystemUtils.IS_OS_LINUX) {
7681
brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar);
@@ -176,7 +181,8 @@ protected double percentChange(final double oldValue, final double newValue) {
176181
return 100 * Math.abs((oldValue - newValue) / oldValue);
177182
}
178183

179-
private void tombstone() {
184+
@VisibleForTesting
185+
protected void tombstone() {
180186
var now = System.currentTimeMillis();
181187
if (now - lastTombstonedAt < tombstoneDelayInMillis) {
182188
return;
@@ -201,7 +207,7 @@ private void tombstone() {
201207

202208
@Override
203209
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
204-
this.pulsar.getLoadManagerExecutor().execute(() -> {
210+
executor.execute(() -> {
205211
ServiceUnitState state = ServiceUnitStateData.state(data);
206212
switch (state) {
207213
case Releasing, Splitting -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.extensions.reporter;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ScheduledExecutorService;
2224
import lombok.extern.slf4j.Slf4j;
2325
import org.apache.commons.lang3.StringUtils;
2426
import org.apache.pulsar.broker.PulsarService;
@@ -46,15 +48,18 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa
4648

4749
private final TopKBundles topKBundles;
4850

51+
private final ScheduledExecutorService executor;
52+
4953
private long lastBundleStatsUpdatedAt;
5054

5155
private long lastTombstonedAt;
52-
private volatile long tombstoneDelayInMillis;
56+
private long tombstoneDelayInMillis;
5357

5458
public TopBundleLoadDataReporter(PulsarService pulsar,
5559
String lookupServiceAddress,
5660
LoadDataStore<TopBundlesLoadData> bundleLoadDataStore) {
5761
this.pulsar = pulsar;
62+
this.executor = pulsar.getLoadManagerExecutor();
5863
this.lookupServiceAddress = lookupServiceAddress;
5964
this.bundleLoadDataStore = bundleLoadDataStore;
6065
this.lastBundleStatsUpdatedAt = 0;
@@ -97,7 +102,8 @@ public CompletableFuture<Void> reportAsync(boolean force) {
97102
}
98103
}
99104

100-
private void tombstone() {
105+
@VisibleForTesting
106+
protected void tombstone() {
101107
var now = System.currentTimeMillis();
102108
if (now - lastTombstonedAt < tombstoneDelayInMillis) {
103109
return;
@@ -110,7 +116,7 @@ private void tombstone() {
110116
log.error("Failed to clean broker load data.");
111117
lastTombstonedAt = lastSuccessfulTombstonedAt;
112118
} else {
113-
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log);
119+
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
114120
if (debug) {
115121
log.info("Cleaned broker load data.");
116122
}
@@ -121,7 +127,7 @@ private void tombstone() {
121127

122128
@Override
123129
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
124-
this.pulsar.getLoadManagerExecutor().execute(() -> {
130+
executor.execute(() -> {
125131
ServiceUnitState state = ServiceUnitStateData.state(data);
126132
switch (state) {
127133
case Releasing, Splitting -> {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java

+30-8
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
import static org.mockito.ArgumentMatchers.eq;
2424
import static org.mockito.Mockito.doReturn;
2525
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.spy;
2627
import static org.mockito.Mockito.times;
2728
import static org.mockito.Mockito.verify;
2829
import static org.testng.Assert.assertEquals;
2930
import java.util.concurrent.CompletableFuture;
3031
import java.util.concurrent.Executors;
32+
import java.util.concurrent.ScheduledExecutorService;
3133
import java.util.concurrent.TimeUnit;
3234
import org.apache.commons.lang.reflect.FieldUtils;
3335
import org.apache.pulsar.broker.PulsarService;
@@ -40,11 +42,13 @@
4042
import org.apache.pulsar.broker.service.BrokerService;
4143
import org.apache.pulsar.broker.service.PulsarStats;
4244
import org.apache.pulsar.broker.stats.BrokerStats;
45+
import org.apache.pulsar.client.util.ExecutorProvider;
4346
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
4447
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
4548
import org.mockito.MockedStatic;
4649
import org.mockito.Mockito;
4750
import org.testcontainers.shaded.org.awaitility.Awaitility;
51+
import org.testng.annotations.AfterMethod;
4852
import org.testng.annotations.BeforeMethod;
4953
import org.testng.annotations.Test;
5054

@@ -59,17 +63,22 @@ public class BrokerLoadDataReporterTest {
5963
SystemResourceUsage usage;
6064
String broker = "broker1";
6165
String bundle = "bundle1";
66+
ScheduledExecutorService executor;
6267

6368
@BeforeMethod
6469
void setup() {
6570
config = new ServiceConfiguration();
71+
config.setLoadBalancerDebugModeEnabled(true);
6672
pulsar = mock(PulsarService.class);
6773
store = mock(LoadDataStore.class);
6874
brokerService = mock(BrokerService.class);
6975
pulsarStats = mock(PulsarStats.class);
7076
doReturn(brokerService).when(pulsar).getBrokerService();
7177
doReturn(config).when(pulsar).getConfiguration();
72-
doReturn(Executors.newSingleThreadScheduledExecutor()).when(pulsar).getLoadManagerExecutor();
78+
executor = Executors
79+
.newSingleThreadScheduledExecutor(new
80+
ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
81+
doReturn(executor).when(pulsar).getLoadManagerExecutor();
7382
doReturn(pulsarStats).when(brokerService).getPulsarStats();
7483
brokerStats = new BrokerStats(0);
7584
brokerStats.topics = 6;
@@ -81,6 +90,7 @@ void setup() {
8190
doReturn(pulsarStats).when(brokerService).getPulsarStats();
8291
doReturn(brokerStats).when(pulsarStats).getBrokerStats();
8392
doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any());
93+
doReturn(CompletableFuture.completedFuture(null)).when(store).removeAsync(any());
8494

8595
usage = new SystemResourceUsage();
8696
usage.setCpu(new ResourceUsage(1.0, 100.0));
@@ -90,6 +100,11 @@ void setup() {
90100
usage.setBandwidthOut(new ResourceUsage(4.0, 100.0));
91101
}
92102

103+
@AfterMethod
104+
void shutdown(){
105+
executor.shutdown();
106+
}
107+
93108
public void testGenerate() throws IllegalAccessException {
94109
try (MockedStatic<LoadManagerShared> mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class)) {
95110
mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage);
@@ -132,47 +147,54 @@ public void testReport() throws IllegalAccessException {
132147
}
133148

134149
@Test
135-
public void testTombstone() throws IllegalAccessException {
150+
public void testTombstone() throws IllegalAccessException, InterruptedException {
136151

137-
var target = new BrokerLoadDataReporter(pulsar, broker, store);
152+
var target = spy(new BrokerLoadDataReporter(pulsar, broker, store));
138153

139154
target.handleEvent(bundle,
140155
new ServiceUnitStateData(ServiceUnitState.Assigning, broker, VERSION_ID_INIT), null);
141156
verify(store, times(0)).removeAsync(eq(broker));
157+
verify(target, times(0)).tombstone();
142158

143159
target.handleEvent(bundle,
144160
new ServiceUnitStateData(ServiceUnitState.Deleted, broker, VERSION_ID_INIT), null);
145161
verify(store, times(0)).removeAsync(eq(broker));
162+
verify(target, times(0)).tombstone();
146163

147164

148165
target.handleEvent(bundle,
149166
new ServiceUnitStateData(ServiceUnitState.Init, broker, VERSION_ID_INIT), null);
150167
verify(store, times(0)).removeAsync(eq(broker));
168+
verify(target, times(0)).tombstone();
151169

152170
target.handleEvent(bundle,
153171
new ServiceUnitStateData(ServiceUnitState.Free, broker, VERSION_ID_INIT), null);
154172
verify(store, times(0)).removeAsync(eq(broker));
173+
verify(target, times(0)).tombstone();
155174

156175
target.handleEvent(bundle,
157176
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null);
158177
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
178+
verify(target, times(1)).tombstone();
159179
verify(store, times(1)).removeAsync(eq(broker));
160180
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
161181
assertEquals(localData, new BrokerLoadData());
162182
});
163183

164-
{
165-
target.handleEvent(bundle,
166-
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null);
184+
target.handleEvent(bundle,
185+
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null);
186+
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
187+
verify(target, times(2)).tombstone();
167188
verify(store, times(1)).removeAsync(eq(broker));
168189
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
169190
assertEquals(localData, new BrokerLoadData());
170-
}
191+
});
171192

172193
FieldUtils.writeDeclaredField(target, "tombstoneDelayInMillis", 0, true);
173194
target.handleEvent(bundle,
174195
new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-2", broker, VERSION_ID_INIT), null);
175196
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
197+
verify(target, times(3)).tombstone();
176198
verify(store, times(2)).removeAsync(eq(broker));
177199
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
178200
assertEquals(localData, new BrokerLoadData());
@@ -181,10 +203,10 @@ public void testTombstone() throws IllegalAccessException {
181203
target.handleEvent(bundle,
182204
new ServiceUnitStateData(ServiceUnitState.Owned, broker, VERSION_ID_INIT), null);
183205
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
206+
verify(target, times(4)).tombstone();
184207
verify(store, times(3)).removeAsync(eq(broker));
185208
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
186209
assertEquals(localData, new BrokerLoadData());
187210
});
188-
189211
}
190212
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java

+26-7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.ArgumentMatchers.eq;
2424
import static org.mockito.Mockito.doReturn;
2525
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.spy;
2627
import static org.mockito.Mockito.times;
2728
import static org.mockito.Mockito.verify;
2829
import static org.testng.Assert.assertEquals;
@@ -32,6 +33,7 @@
3233
import java.util.Optional;
3334
import java.util.concurrent.CompletableFuture;
3435
import java.util.concurrent.Executors;
36+
import java.util.concurrent.ScheduledExecutorService;
3537
import java.util.concurrent.TimeUnit;
3638
import org.apache.commons.lang.reflect.FieldUtils;
3739
import org.apache.pulsar.broker.PulsarService;
@@ -50,6 +52,7 @@
5052
import org.apache.pulsar.client.util.ExecutorProvider;
5153
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
5254
import org.testcontainers.shaded.org.awaitility.Awaitility;
55+
import org.testng.annotations.AfterMethod;
5356
import org.testng.annotations.BeforeMethod;
5457
import org.testng.annotations.Test;
5558

@@ -68,6 +71,7 @@ public class TopBundleLoadDataReporterTest {
6871
String bundle2 = "my-tenant/my-namespace2/0x00000000_0x0FFFFFFF";
6972
String bundle = bundle1;
7073
String broker = "broker-1";
74+
ScheduledExecutorService executor;
7175

7276
@BeforeMethod
7377
void setup() throws MetadataStoreException {
@@ -81,15 +85,16 @@ void setup() throws MetadataStoreException {
8185
isolationPolicyResources = mock(NamespaceResources.IsolationPolicyResources.class);
8286
var namespaceResources = mock(NamespaceResources.class);
8387
localPoliciesResources = mock(LocalPoliciesResources.class);
88+
executor = Executors
89+
.newSingleThreadScheduledExecutor(new
90+
ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
8491

8592
doReturn(brokerService).when(pulsar).getBrokerService();
8693
doReturn(config).when(pulsar).getConfiguration();
8794
doReturn(pulsarStats).when(brokerService).getPulsarStats();
8895
doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any());
89-
doReturn(Executors
90-
.newSingleThreadScheduledExecutor(new
91-
ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager")))
92-
.when(pulsar).getLoadManagerExecutor();
96+
doReturn(CompletableFuture.completedFuture(null)).when(store).removeAsync(any());
97+
doReturn(executor).when(pulsar).getLoadManagerExecutor();
9398

9499
doReturn(pulsarResources).when(pulsar).getPulsarResources();
95100
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
@@ -109,6 +114,11 @@ void setup() throws MetadataStoreException {
109114
doReturn(bundleStats).when(brokerService).getBundleStats();
110115
}
111116

117+
@AfterMethod
118+
void shutdown(){
119+
executor.shutdown();
120+
}
121+
112122
public void testZeroUpdatedAt() {
113123
doReturn(0l).when(pulsarStats).getUpdatedAt();
114124
var target = new TopBundleLoadDataReporter(pulsar, "", store);
@@ -165,47 +175,56 @@ public void testReport(){
165175
@Test
166176
public void testTombstone() throws IllegalAccessException {
167177

168-
var target = new TopBundleLoadDataReporter(pulsar, broker, store);
178+
var target = spy(new TopBundleLoadDataReporter(pulsar, broker, store));
169179

170180
target.handleEvent(bundle,
171181
new ServiceUnitStateData(ServiceUnitState.Assigning, broker, VERSION_ID_INIT), null);
172182
verify(store, times(0)).removeAsync(eq(broker));
183+
verify(target, times(0)).tombstone();
173184

174185
target.handleEvent(bundle,
175186
new ServiceUnitStateData(ServiceUnitState.Deleted, broker, VERSION_ID_INIT), null);
176187
verify(store, times(0)).removeAsync(eq(broker));
188+
verify(target, times(0)).tombstone();
177189

178190

179191
target.handleEvent(bundle,
180192
new ServiceUnitStateData(ServiceUnitState.Init, broker, VERSION_ID_INIT), null);
181193
verify(store, times(0)).removeAsync(eq(broker));
194+
verify(target, times(0)).tombstone();
182195

183196
target.handleEvent(bundle,
184197
new ServiceUnitStateData(ServiceUnitState.Free, broker, VERSION_ID_INIT), null);
185198
verify(store, times(0)).removeAsync(eq(broker));
199+
verify(target, times(0)).tombstone();
186200

187201
target.handleEvent(bundle,
188202
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null);
189203
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
204+
verify(target, times(1)).tombstone();
190205
verify(store, times(1)).removeAsync(eq(broker));
191206
});
192207

193208
target.handleEvent(bundle,
194209
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null);
195-
verify(store, times( 1)).removeAsync(eq(broker));
210+
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
211+
verify(target, times(2)).tombstone();
212+
verify(store, times(1)).removeAsync(eq(broker));
213+
});
196214

197215
FieldUtils.writeDeclaredField(target, "tombstoneDelayInMillis", 0, true);
198216
target.handleEvent(bundle,
199217
new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-2", broker, VERSION_ID_INIT), null);
200218
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
219+
verify(target, times(3)).tombstone();
201220
verify(store, times(2)).removeAsync(eq(broker));
202221
});
203222

204223
target.handleEvent(bundle,
205224
new ServiceUnitStateData(ServiceUnitState.Owned, broker, VERSION_ID_INIT), null);
206225
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
226+
verify(target, times(4)).tombstone();
207227
verify(store, times(3)).removeAsync(eq(broker));
208228
});
209-
210229
}
211230
}

0 commit comments

Comments
 (0)