Skip to content

Commit e5e853b

Browse files
Demogorgon314Technoboy-
authored andcommitted
[fix][broker] Handle heartbeat namespace in ExtensibleLoadManager (#20551)
1 parent 9dce020 commit e5e853b

File tree

4 files changed

+161
-42
lines changed

4 files changed

+161
-42
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+81-36
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.ScheduledFuture;
3939
import java.util.concurrent.TimeUnit;
4040
import java.util.concurrent.atomic.AtomicReference;
41+
import java.util.function.Function;
4142
import java.util.stream.Collectors;
4243
import lombok.Getter;
4344
import lombok.extern.slf4j.Slf4j;
@@ -84,6 +85,7 @@
8485
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
8586
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
8687
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
88+
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
8789
import org.apache.pulsar.client.admin.PulsarAdminException;
8890
import org.apache.pulsar.common.naming.NamespaceBundle;
8991
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
@@ -365,56 +367,99 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
365367

366368
final String bundle = serviceUnit.toString();
367369

368-
CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
370+
return dedupeLookupRequest(bundle, k -> {
369371
final CompletableFuture<Optional<String>> owner;
370372
// Assign the bundle to channel owner if is internal topic, to avoid circular references.
371373
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
372374
owner = serviceUnitStateChannel.getChannelOwnerAsync();
373375
} else {
374-
owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
375-
// If the bundle not assign yet, select and publish assign event to channel.
376-
if (broker.isEmpty()) {
377-
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
378-
if (brokerOpt.isPresent()) {
379-
assignCounter.incrementSuccess();
380-
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
381-
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
382-
.thenApply(Optional::of);
383-
} else {
384-
throw new IllegalStateException(
385-
"Failed to select the new owner broker for bundle: " + bundle);
386-
}
387-
});
376+
owner = getOwnerAsync(serviceUnit, bundle, false).thenApply(Optional::ofNullable);
377+
}
378+
return getBrokerLookupData(owner, bundle);
379+
});
380+
}
381+
382+
private CompletableFuture<String> getOwnerAsync(
383+
ServiceUnitId serviceUnit, String bundle, boolean ownByLocalBrokerIfAbsent) {
384+
return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
385+
// If the bundle not assign yet, select and publish assign event to channel.
386+
if (broker.isEmpty()) {
387+
CompletableFuture<Optional<String>> selectedBroker;
388+
if (ownByLocalBrokerIfAbsent) {
389+
String brokerId = this.brokerRegistry.getBrokerId();
390+
selectedBroker = CompletableFuture.completedFuture(Optional.of(brokerId));
391+
} else {
392+
selectedBroker = this.selectAsync(serviceUnit);
393+
}
394+
return selectedBroker.thenCompose(brokerOpt -> {
395+
if (brokerOpt.isPresent()) {
396+
assignCounter.incrementSuccess();
397+
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
398+
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
388399
}
389-
assignCounter.incrementSkip();
390-
// Already assigned, return it.
391-
return CompletableFuture.completedFuture(broker);
400+
throw new IllegalStateException(
401+
"Failed to select the new owner broker for bundle: " + bundle);
392402
});
393403
}
404+
assignCounter.incrementSkip();
405+
// Already assigned, return it.
406+
return CompletableFuture.completedFuture(broker.get());
407+
});
408+
}
394409

395-
return owner.thenCompose(broker -> {
396-
if (broker.isEmpty()) {
397-
String errorMsg = String.format(
398-
"Failed to get or assign the owner for bundle:%s", bundle);
399-
log.error(errorMsg);
400-
throw new IllegalStateException(errorMsg);
401-
}
402-
return CompletableFuture.completedFuture(broker.get());
403-
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
404-
if (brokerLookupData.isEmpty()) {
405-
String errorMsg = String.format(
406-
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
407-
log.error(errorMsg);
408-
throw new IllegalStateException(errorMsg);
409-
}
410-
return CompletableFuture.completedFuture(brokerLookupData);
411-
}));
410+
private CompletableFuture<Optional<BrokerLookupData>> getBrokerLookupData(
411+
CompletableFuture<Optional<String>> owner,
412+
String bundle) {
413+
return owner.thenCompose(broker -> {
414+
if (broker.isEmpty()) {
415+
String errorMsg = String.format(
416+
"Failed to get or assign the owner for bundle:%s", bundle);
417+
log.error(errorMsg);
418+
throw new IllegalStateException(errorMsg);
419+
}
420+
return CompletableFuture.completedFuture(broker.get());
421+
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
422+
if (brokerLookupData.isEmpty()) {
423+
String errorMsg = String.format(
424+
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
425+
log.error(errorMsg);
426+
throw new IllegalStateException(errorMsg);
427+
}
428+
return CompletableFuture.completedFuture(brokerLookupData);
429+
}));
430+
}
431+
432+
/**
433+
* Method to get the current owner of the <code>NamespaceBundle</code>
434+
* or set the local broker as the owner if absent.
435+
*
436+
* @param namespaceBundle the <code>NamespaceBundle</code>
437+
* @return The ephemeral node data showing the current ownership info in <code>ServiceUnitStateChannel</code>
438+
*/
439+
public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
440+
log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId());
441+
final String bundle = namespaceBundle.toString();
442+
return dedupeLookupRequest(bundle, k -> {
443+
final CompletableFuture<String> owner =
444+
this.getOwnerAsync(namespaceBundle, bundle, true);
445+
return getBrokerLookupData(owner.thenApply(Optional::ofNullable), bundle);
446+
}).thenApply(brokerLookupData -> {
447+
if (brokerLookupData.isEmpty()) {
448+
throw new IllegalStateException(
449+
"Failed to get the broker lookup data for bundle: " + bundle);
450+
}
451+
return brokerLookupData.get().toNamespaceEphemeralData();
412452
});
453+
}
454+
455+
private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
456+
String key, Function<String, CompletableFuture<Optional<BrokerLookupData>>> provider) {
457+
CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(key, provider);
413458
future.whenComplete((r, t) -> {
414459
if (t != null) {
415460
assignCounter.incrementFailure();
416461
}
417-
lookupRequests.remove(bundle);
462+
lookupRequests.remove(key);
418463
}
419464
);
420465
return future;

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+18
Original file line numberDiff line numberDiff line change
@@ -1192,6 +1192,11 @@ private synchronized void doCleanup(String broker) {
11921192
log.info("Started ownership cleanup for the inactive broker:{}", broker);
11931193
int orphanServiceUnitCleanupCnt = 0;
11941194
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
1195+
String heartbeatNamespace =
1196+
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
1197+
.toString();
1198+
String heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
1199+
pulsar.getConfiguration()).toString();
11951200

11961201
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
11971202
for (var etr : tableview.entrySet()) {
@@ -1202,6 +1207,19 @@ private synchronized void doCleanup(String broker) {
12021207
if (isActiveState(state)) {
12031208
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
12041209
orphanSystemServiceUnits.put(serviceUnit, stateData);
1210+
} else if (serviceUnit.startsWith(heartbeatNamespace)
1211+
|| serviceUnit.startsWith(heartbeatNamespaceV2)) {
1212+
// Skip the heartbeat namespace
1213+
log.info("Skip override heartbeat namespace bundle"
1214+
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
1215+
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
1216+
if (e != null) {
1217+
log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, "
1218+
+ "stateData:{}, cleanupErrorCnt:{}.",
1219+
serviceUnit, stateData,
1220+
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
1221+
}
1222+
});
12051223
} else {
12061224
overrideOwnership(serviceUnit, stateData, broker);
12071225
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,14 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
367367
// all pre-registered namespace is assumed to have bundles disabled
368368
nsFullBundle = bundleFactory.getFullBundle(nsname);
369369
// v2 namespace will always use full bundle object
370-
NamespaceEphemeralData otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
370+
final NamespaceEphemeralData otherData;
371+
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
372+
ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
373+
otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
374+
} else {
375+
otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
376+
}
377+
371378
if (StringUtils.equals(pulsar.getBrokerServiceUrl(), otherData.getNativeUrl())
372379
|| StringUtils.equals(pulsar.getBrokerServiceUrlTls(), otherData.getNativeUrlTls())) {
373380
if (nsFullBundle != null) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+54-5
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,16 @@
8989
import org.apache.pulsar.broker.namespace.LookupOptions;
9090
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
9191
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
92+
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
93+
import org.apache.pulsar.broker.namespace.NamespaceService;
9294
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
9395
import org.apache.pulsar.client.admin.PulsarAdminException;
9496
import org.apache.pulsar.client.impl.TableViewImpl;
9597
import org.apache.pulsar.common.naming.NamespaceBundle;
9698
import org.apache.pulsar.common.naming.NamespaceName;
9799
import org.apache.pulsar.common.naming.ServiceUnitId;
98100
import org.apache.pulsar.common.naming.TopicName;
101+
import org.apache.pulsar.common.naming.TopicVersion;
99102
import org.apache.pulsar.common.policies.data.BrokerAssignment;
100103
import org.apache.pulsar.common.policies.data.BundlesData;
101104
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -1038,19 +1041,48 @@ public void testListTopic() throws Exception {
10381041
}
10391042

10401043
@Test(timeOut = 30 * 1000)
1041-
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws PulsarAdminException {
1044+
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception {
1045+
NamespaceName heartbeatNamespacePulsar1V1 =
1046+
NamespaceService.getHeartbeatNamespace(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration());
1047+
NamespaceName heartbeatNamespacePulsar1V2 =
1048+
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration());
1049+
1050+
NamespaceName heartbeatNamespacePulsar2V1 =
1051+
NamespaceService.getHeartbeatNamespace(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration());
1052+
NamespaceName heartbeatNamespacePulsar2V2 =
1053+
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration());
1054+
1055+
NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
1056+
.getFullBundle(heartbeatNamespacePulsar1V1);
1057+
NamespaceBundle bundle2 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
1058+
.getFullBundle(heartbeatNamespacePulsar1V2);
1059+
1060+
NamespaceBundle bundle3 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
1061+
.getFullBundle(heartbeatNamespacePulsar2V1);
1062+
NamespaceBundle bundle4 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
1063+
.getFullBundle(heartbeatNamespacePulsar2V2);
1064+
10421065
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
10431066
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
1044-
assertTrue(ownedServiceUnitsByPulsar1.isEmpty());
1067+
// heartbeat namespace bundle will own by pulsar1
1068+
assertEquals(ownedServiceUnitsByPulsar1.size(), 2);
1069+
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
1070+
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
10451071
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
10461072
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
1047-
assertTrue(ownedServiceUnitsByPulsar2.isEmpty());
1073+
assertEquals(ownedServiceUnitsByPulsar2.size(), 2);
1074+
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
1075+
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
10481076
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
10491077
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
10501078
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
10511079
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
1052-
assertTrue(ownedNamespacesByPulsar1.isEmpty());
1053-
assertTrue(ownedNamespacesByPulsar2.isEmpty());
1080+
assertEquals(ownedNamespacesByPulsar1.size(), 2);
1081+
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
1082+
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
1083+
assertEquals(ownedNamespacesByPulsar2.size(), 2);
1084+
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
1085+
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));
10541086

10551087
String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units";
10561088
admin.topics().createPartitionedTopic(topic, 1);
@@ -1083,6 +1115,23 @@ private void assertOwnedServiceUnits(
10831115
assertEquals(status.broker_assignment, BrokerAssignment.shared);
10841116
}
10851117

1118+
@Test(timeOut = 30 * 1000)
1119+
public void testTryAcquiringOwnership()
1120+
throws PulsarAdminException, ExecutionException, InterruptedException {
1121+
final String namespace = "public/testTryAcquiringOwnership";
1122+
admin.namespaces().createNamespace(namespace, 1);
1123+
String topic = "persistent://" + namespace + "/test";
1124+
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
1125+
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
1126+
assertEquals(namespaceEphemeralData.getNativeUrl(), pulsar1.getBrokerServiceUrl());
1127+
admin.namespaces().deleteNamespace(namespace, true);
1128+
}
1129+
1130+
@Test(timeOut = 30 * 1000)
1131+
public void testHealthcheck() throws PulsarAdminException {
1132+
admin.brokers().healthcheck(TopicVersion.V2);
1133+
}
1134+
10861135
private static abstract class MockBrokerFilter implements BrokerFilter {
10871136

10881137
@Override

0 commit comments

Comments
 (0)