Skip to content

Commit 32ba747

Browse files
Demogorgon314Technoboy-
authored andcommitted
[improve][broker] Emit the namespace bundle listener event on extensible load manager (#20525)
1 parent 99b7b1c commit 32ba747

File tree

3 files changed

+93
-19
lines changed

3 files changed

+93
-19
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+3
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,7 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
703703
stateChangeListeners.notify(serviceUnit, data, null);
704704
if (isTargetBroker(data.dstBroker())) {
705705
log(null, serviceUnit, data, null);
706+
pulsar.getNamespaceService().onNamespaceBundleOwned(getNamespaceBundle(serviceUnit));
706707
lastOwnEventHandledAt = System.currentTimeMillis();
707708
} else if (data.force() && isTargetBroker(data.sourceBroker())) {
708709
closeServiceUnit(serviceUnit);
@@ -841,6 +842,7 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
841842
.whenComplete((__, ex) -> {
842843
// clean up topics that failed to unload from the broker ownership cache
843844
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
845+
pulsar.getNamespaceService().onNamespaceBundleUnload(bundle);
844846
double unloadBundleTime = TimeUnit.NANOSECONDS
845847
.toMillis((System.nanoTime() - startTime));
846848
if (ex != null) {
@@ -912,6 +914,7 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
912914
double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
913915
log.info("Successfully split {} parent namespace-bundle to {} in {} ms",
914916
parentBundle, childBundles, splitBundleTime);
917+
namespaceService.onNamespaceBundleSplit(parentBundle);
915918
completionFuture.complete(null);
916919
})
917920
.exceptionally(ex -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1202,13 +1202,13 @@ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBun
12021202
return future.thenRun(() -> bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
12031203
}
12041204

1205-
protected void onNamespaceBundleOwned(NamespaceBundle bundle) {
1205+
public void onNamespaceBundleOwned(NamespaceBundle bundle) {
12061206
for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) {
12071207
notifyNamespaceBundleOwnershipListener(bundle, bundleOwnedListener);
12081208
}
12091209
}
12101210

1211-
protected void onNamespaceBundleUnload(NamespaceBundle bundle) {
1211+
public void onNamespaceBundleUnload(NamespaceBundle bundle) {
12121212
for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) {
12131213
try {
12141214
if (bundleOwnedListener.test(bundle)) {
@@ -1220,7 +1220,7 @@ protected void onNamespaceBundleUnload(NamespaceBundle bundle) {
12201220
}
12211221
}
12221222

1223-
protected void onNamespaceBundleSplit(NamespaceBundle bundle) {
1223+
public void onNamespaceBundleSplit(NamespaceBundle bundle) {
12241224
for (NamespaceBundleSplitListener bundleSplitListener : bundleSplitListeners) {
12251225
try {
12261226
if (bundleSplitListener.test(bundle)) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+87-16
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@
8787
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
8888
import org.apache.pulsar.broker.lookup.LookupResult;
8989
import org.apache.pulsar.broker.namespace.LookupOptions;
90+
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
91+
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
9092
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
9193
import org.apache.pulsar.client.admin.PulsarAdminException;
9294
import org.apache.pulsar.client.impl.TableViewImpl;
@@ -126,6 +128,8 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
126128
private ServiceUnitStateChannelImpl channel1;
127129
private ServiceUnitStateChannelImpl channel2;
128130

131+
private final String defaultTestNamespace = "public/test";
132+
129133
@BeforeClass
130134
@Override
131135
public void setup() throws Exception {
@@ -136,6 +140,7 @@ public void setup() throws Exception {
136140
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
137141
conf.setLoadBalancerSheddingEnabled(false);
138142
conf.setLoadBalancerDebugModeEnabled(true);
143+
conf.setTopicLevelPoliciesEnabled(false);
139144
super.internalSetup(conf);
140145
pulsar1 = pulsar;
141146
ServiceConfiguration defaultConf = getDefaultConf();
@@ -144,6 +149,7 @@ public void setup() throws Exception {
144149
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
145150
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
146151
defaultConf.setLoadBalancerSheddingEnabled(false);
152+
defaultConf.setTopicLevelPoliciesEnabled(false);
147153
additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
148154
pulsar2 = additionalPulsarTestContext.getPulsarService();
149155

@@ -159,6 +165,10 @@ public void setup() throws Exception {
159165
admin.namespaces().createNamespace("public/default");
160166
admin.namespaces().setNamespaceReplicationClusters("public/default",
161167
Sets.newHashSet(this.conf.getClusterName()));
168+
169+
admin.namespaces().createNamespace(defaultTestNamespace);
170+
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
171+
Sets.newHashSet(this.conf.getClusterName()));
162172
}
163173

164174
@Override
@@ -172,7 +182,7 @@ protected void cleanup() throws Exception {
172182

173183
@BeforeMethod(alwaysRun = true)
174184
protected void initializeState() throws PulsarAdminException {
175-
admin.namespaces().unload("public/default");
185+
admin.namespaces().unload(defaultTestNamespace);
176186
reset(primaryLoadManager, secondaryLoadManager);
177187
}
178188

@@ -196,7 +206,7 @@ public void testAssignInternalTopic() throws Exception {
196206

197207
@Test
198208
public void testAssign() throws Exception {
199-
TopicName topicName = TopicName.get("test-assign");
209+
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-assign");
200210
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
201211
Optional<BrokerLookupData> brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get();
202212
assertTrue(brokerLookupData.isPresent());
@@ -221,7 +231,8 @@ public void testAssign() throws Exception {
221231

222232
@Test
223233
public void testCheckOwnershipAsync() throws Exception {
224-
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get("test-check-ownership")).get();
234+
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-check-ownership");
235+
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
225236
// 1. The bundle is never assigned.
226237
assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
227238
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
@@ -241,7 +252,7 @@ public void testCheckOwnershipAsync() throws Exception {
241252

242253
@Test
243254
public void testFilter() throws Exception {
244-
TopicName topicName = TopicName.get("test-filter");
255+
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter");
245256
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
246257

247258
doReturn(List.of(new BrokerFilter() {
@@ -267,7 +278,7 @@ public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> broker
267278

268279
@Test
269280
public void testFilterHasException() throws Exception {
270-
TopicName topicName = TopicName.get("test-filter-has-exception");
281+
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter-has-exception");
271282
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
272283

273284
doReturn(List.of(new MockBrokerFilter() {
@@ -286,20 +297,58 @@ public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> broker
286297

287298
@Test(timeOut = 30 * 1000)
288299
public void testUnloadAdminAPI() throws Exception {
289-
TopicName topicName = TopicName.get("test-unload");
300+
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-unload");
290301
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
291302

303+
AtomicInteger onloadCount = new AtomicInteger(0);
304+
AtomicInteger unloadCount = new AtomicInteger(0);
305+
306+
NamespaceBundleOwnershipListener listener = new NamespaceBundleOwnershipListener() {
307+
@Override
308+
public void onLoad(NamespaceBundle bundle) {
309+
onloadCount.incrementAndGet();
310+
}
311+
312+
@Override
313+
public void unLoad(NamespaceBundle bundle) {
314+
unloadCount.incrementAndGet();
315+
}
316+
317+
@Override
318+
public boolean test(NamespaceBundle namespaceBundle) {
319+
return namespaceBundle.equals(bundle);
320+
}
321+
};
322+
pulsar1.getNamespaceService().addNamespaceBundleOwnershipListener(listener);
323+
pulsar2.getNamespaceService().addNamespaceBundleOwnershipListener(listener);
292324
String broker = admin.lookups().lookupTopic(topicName.toString());
293325
log.info("Assign the bundle {} to {}", bundle, broker);
294326

295327
checkOwnershipState(broker, bundle);
328+
Awaitility.await().untilAsserted(() -> {
329+
assertEquals(onloadCount.get(), 1);
330+
assertEquals(unloadCount.get(), 0);
331+
});
332+
296333
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange());
297334
assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
298335
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get());
336+
Awaitility.await().untilAsserted(() -> {
337+
assertEquals(onloadCount.get(), 1);
338+
assertEquals(unloadCount.get(), 1);
339+
});
299340

300341
broker = admin.lookups().lookupTopic(topicName.toString());
301342
log.info("Assign the bundle {} to {}", bundle, broker);
302343

344+
String finalBroker = broker;
345+
Awaitility.await().untilAsserted(() -> {
346+
checkOwnershipState(finalBroker, bundle);
347+
assertEquals(onloadCount.get(), 2);
348+
assertEquals(unloadCount.get(), 1);
349+
});
350+
351+
303352
String dstBrokerUrl = pulsar1.getLookupServiceAddress();
304353
String dstBrokerServiceUrl;
305354
if (broker.equals(pulsar1.getBrokerServiceUrl())) {
@@ -311,6 +360,10 @@ public void testUnloadAdminAPI() throws Exception {
311360
checkOwnershipState(broker, bundle);
312361

313362
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl);
363+
Awaitility.await().untilAsserted(() -> {
364+
assertEquals(onloadCount.get(), 3);
365+
assertEquals(unloadCount.get(), 2);
366+
});
314367

315368
assertEquals(admin.lookups().lookupTopic(topicName.toString()), dstBrokerServiceUrl);
316369

@@ -338,7 +391,7 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle)
338391

339392
@Test(timeOut = 30 * 1000)
340393
public void testSplitBundleAdminAPI() throws Exception {
341-
String namespace = "public/default";
394+
String namespace = defaultTestNamespace;
342395
String topic = "persistent://" + namespace + "/test-split";
343396
admin.topics().createPartitionedTopic(topic, 10);
344397
BundlesData bundles = admin.namespaces().getBundles(namespace);
@@ -347,6 +400,23 @@ public void testSplitBundleAdminAPI() throws Exception {
347400

348401
String firstBundle = bundleRanges.get(0) + "_" + bundleRanges.get(1);
349402

403+
AtomicInteger splitCount = new AtomicInteger(0);
404+
NamespaceBundleSplitListener namespaceBundleSplitListener = new NamespaceBundleSplitListener() {
405+
@Override
406+
public void onSplit(NamespaceBundle bundle) {
407+
splitCount.incrementAndGet();
408+
}
409+
410+
@Override
411+
public boolean test(NamespaceBundle namespaceBundle) {
412+
return namespaceBundle
413+
.toString()
414+
.equals(String.format(namespace + "/0x%08x_0x%08x", bundleRanges.get(0), bundleRanges.get(1)));
415+
}
416+
};
417+
pulsar1.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener);
418+
pulsar2.getNamespaceService().addNamespaceBundleSplitListener(namespaceBundleSplitListener);
419+
350420
long mid = bundleRanges.get(0) + (bundleRanges.get(1) - bundleRanges.get(0)) / 2;
351421

352422
admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null);
@@ -359,6 +429,7 @@ public void testSplitBundleAdminAPI() throws Exception {
359429
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
360430
assertTrue(bundlesData.getBoundaries().contains(midBundle));
361431
assertTrue(bundlesData.getBoundaries().contains(highBundle));
432+
assertEquals(splitCount.get(), 1);
362433

363434
// Test split bundle with invalid bundle range.
364435
try {
@@ -371,7 +442,7 @@ public void testSplitBundleAdminAPI() throws Exception {
371442

372443
@Test(timeOut = 30 * 1000)
373444
public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception {
374-
String namespace = "public/default";
445+
String namespace = defaultTestNamespace;
375446
String topic = "persistent://" + namespace + "/test-split-with-specific-position";
376447
admin.topics().createPartitionedTopic(topic, 10);
377448
BundlesData bundles = admin.namespaces().getBundles(namespace);
@@ -398,7 +469,9 @@ public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception {
398469
}
399470
@Test(timeOut = 30 * 1000)
400471
public void testDeleteNamespaceBundle() throws Exception {
401-
TopicName topicName = TopicName.get("test-delete-namespace-bundle");
472+
final String namespace = "public/testDeleteNamespaceBundle";
473+
admin.namespaces().createNamespace(namespace, 3);
474+
TopicName topicName = TopicName.get(namespace + "/test-delete-namespace-bundle");
402475
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
403476

404477
String broker = admin.lookups().lookupTopic(topicName.toString());
@@ -447,7 +520,7 @@ public void testCheckOwnershipPresentWithSystemNamespace() throws Exception {
447520

448521
@Test
449522
public void testMoreThenOneFilter() throws Exception {
450-
TopicName topicName = TopicName.get("test-filter-has-exception");
523+
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter-has-exception");
451524
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
452525

453526
String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
@@ -485,7 +558,7 @@ public void testDeployAndRollbackLoadManager() throws Exception {
485558
try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
486559
// start pulsar3 with old load manager
487560
var pulsar3 = additionalPulsarTestContext.getPulsarService();
488-
String topic = "persistent://public/default/test";
561+
String topic = "persistent://" + defaultTestNamespace + "/test";
489562

490563
String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
491564
assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
@@ -594,7 +667,7 @@ public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Except
594667
restartBroker();
595668
pulsar1 = pulsar;
596669
setPrimaryLoadManager();
597-
admin.namespaces().setNamespaceReplicationClusters("public/default",
670+
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
598671
Sets.newHashSet(this.conf.getClusterName()));
599672

600673
var serviceUnitStateChannelPrimaryNew =
@@ -614,10 +687,7 @@ public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Except
614687
}
615688

616689
@Test
617-
public void testRoleChange()
618-
throws Exception {
619-
620-
690+
public void testRoleChange() throws Exception {
621691
var topBundlesLoadDataStorePrimary = (LoadDataStore<TopBundlesLoadData>)
622692
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", true);
623693
var topBundlesLoadDataStorePrimarySpy = spy(topBundlesLoadDataStorePrimary);
@@ -962,6 +1032,7 @@ public void testListTopic() throws Exception {
9621032

9631033
List<String> list = admin.topics().getList(namespace);
9641034
assertEquals(list.size(), 6);
1035+
admin.namespaces().deleteNamespace(namespace, true);
9651036
}
9661037

9671038
private static abstract class MockBrokerFilter implements BrokerFilter {

0 commit comments

Comments
 (0)