Skip to content

Commit 115ca33

Browse files
committed
Handle heartbeat namespace in ExtensibleLoadManager
1 parent 45d882a commit 115ca33

File tree

4 files changed

+131
-37
lines changed

4 files changed

+131
-37
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+83-36
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.ScheduledFuture;
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.atomic.AtomicReference;
40+
import java.util.function.Function;
4041
import lombok.Getter;
4142
import lombok.extern.slf4j.Slf4j;
4243
import org.apache.pulsar.broker.PulsarServerException;
@@ -79,6 +80,7 @@
7980
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
8081
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
8182
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
83+
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
8284
import org.apache.pulsar.client.admin.PulsarAdminException;
8385
import org.apache.pulsar.common.naming.NamespaceBundle;
8486
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
@@ -342,56 +344,101 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
342344

343345
final String bundle = serviceUnit.toString();
344346

345-
CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
347+
return computeIfAbsent(bundle, k -> {
346348
final CompletableFuture<Optional<String>> owner;
347349
// Assign the bundle to channel owner if is internal topic, to avoid circular references.
348350
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
349351
owner = serviceUnitStateChannel.getChannelOwnerAsync();
350352
} else {
351-
owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
352-
// If the bundle not assign yet, select and publish assign event to channel.
353-
if (broker.isEmpty()) {
354-
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
355-
if (brokerOpt.isPresent()) {
356-
assignCounter.incrementSuccess();
357-
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
358-
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
359-
.thenApply(Optional::of);
360-
} else {
361-
throw new IllegalStateException(
362-
"Failed to select the new owner broker for bundle: " + bundle);
363-
}
364-
});
353+
owner = getOwnerAsync(serviceUnit, bundle, false);
354+
}
355+
return getBrokerLookupData(owner, bundle);
356+
});
357+
}
358+
359+
private CompletableFuture<Optional<String>> getOwnerAsync(
360+
ServiceUnitId serviceUnit, String bundle, boolean ownByLocalBrokerIfAbsent) {
361+
return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
362+
// If the bundle not assign yet, select and publish assign event to channel.
363+
if (broker.isEmpty()) {
364+
CompletableFuture<Optional<String>> selectedBroker;
365+
if (ownByLocalBrokerIfAbsent){
366+
String brokerId = this.brokerRegistry.getBrokerId();
367+
selectedBroker = CompletableFuture.completedFuture(Optional.of(brokerId));
368+
} else {
369+
selectedBroker = this.selectAsync(serviceUnit);
370+
}
371+
return selectedBroker.thenCompose(brokerOpt -> {
372+
if (brokerOpt.isPresent()) {
373+
assignCounter.incrementSuccess();
374+
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
375+
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
376+
.thenApply(Optional::of);
377+
} else {
378+
throw new IllegalStateException(
379+
"Failed to select the new owner broker for bundle: " + bundle);
365380
}
366-
assignCounter.incrementSkip();
367-
// Already assigned, return it.
368-
return CompletableFuture.completedFuture(broker);
369381
});
370382
}
383+
assignCounter.incrementSkip();
384+
// Already assigned, return it.
385+
return CompletableFuture.completedFuture(broker);
386+
});
387+
}
371388

372-
return owner.thenCompose(broker -> {
373-
if (broker.isEmpty()) {
374-
String errorMsg = String.format(
375-
"Failed to get or assign the owner for bundle:%s", bundle);
376-
log.error(errorMsg);
377-
throw new IllegalStateException(errorMsg);
378-
}
379-
return CompletableFuture.completedFuture(broker.get());
380-
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
381-
if (brokerLookupData.isEmpty()) {
382-
String errorMsg = String.format(
383-
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
384-
log.error(errorMsg);
385-
throw new IllegalStateException(errorMsg);
386-
}
387-
return CompletableFuture.completedFuture(brokerLookupData);
388-
}));
389+
private CompletableFuture<Optional<BrokerLookupData>> getBrokerLookupData(
390+
CompletableFuture<Optional<String>> owner,
391+
String bundle) {
392+
return owner.thenCompose(broker -> {
393+
if (broker.isEmpty()) {
394+
String errorMsg = String.format(
395+
"Failed to get or assign the owner for bundle:%s", bundle);
396+
log.error(errorMsg);
397+
throw new IllegalStateException(errorMsg);
398+
}
399+
return CompletableFuture.completedFuture(broker.get());
400+
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
401+
if (brokerLookupData.isEmpty()) {
402+
String errorMsg = String.format(
403+
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
404+
log.error(errorMsg);
405+
throw new IllegalStateException(errorMsg);
406+
}
407+
return CompletableFuture.completedFuture(brokerLookupData);
408+
}));
409+
}
410+
411+
/**
412+
* Method to get the current owner of the <code>NamespaceBundle</code>
413+
* or set the local broker as the owner if absent.
414+
*
415+
* @param namespaceBundle the <code>NamespaceBundle</code>
416+
* @return The ephemeral node data showing the current ownership info in <code>ServiceUnitStateChannel</code>
417+
*/
418+
public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
419+
log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId());
420+
final String bundle = namespaceBundle.toString();
421+
return computeIfAbsent(bundle, k -> {
422+
final CompletableFuture<Optional<String>> owner =
423+
this.getOwnerAsync(namespaceBundle, bundle, true);
424+
return getBrokerLookupData(owner, bundle);
425+
}).thenApply(brokerLookupData -> {
426+
if (brokerLookupData.isEmpty()) {
427+
throw new IllegalStateException(
428+
"Failed to get the broker lookup data for bundle: " + bundle);
429+
}
430+
return brokerLookupData.get().toNamespaceEphemeralData();
389431
});
432+
}
433+
434+
private CompletableFuture<Optional<BrokerLookupData>> computeIfAbsent(
435+
String key, Function<String, CompletableFuture<Optional<BrokerLookupData>>> provider) {
436+
CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(key, provider);
390437
future.whenComplete((r, t) -> {
391438
if (t != null) {
392439
assignCounter.incrementFailure();
393440
}
394-
lookupRequests.remove(bundle);
441+
lookupRequests.remove(key);
395442
}
396443
);
397444
return future;

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+20
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,11 @@ private synchronized void doCleanup(String broker) {
11971197
log.info("Started ownership cleanup for the inactive broker:{}", broker);
11981198
int orphanServiceUnitCleanupCnt = 0;
11991199
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
1200+
String heartbeatNamespace =
1201+
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
1202+
.toString();
1203+
String heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
1204+
pulsar.getConfiguration()).toString();
12001205

12011206
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
12021207
for (var etr : tableview.entrySet()) {
@@ -1208,6 +1213,21 @@ private synchronized void doCleanup(String broker) {
12081213
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
12091214
orphanSystemServiceUnits.put(serviceUnit, stateData);
12101215
} else {
1216+
if (serviceUnit.startsWith(heartbeatNamespace)
1217+
|| serviceUnit.startsWith(heartbeatNamespaceV2)) {
1218+
// Skip the heartbeat namespace
1219+
log.info("Found semi-terminal states to tombstone"
1220+
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
1221+
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
1222+
if (e != null) {
1223+
log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, "
1224+
+ "stateData:{}, cleanupErrorCnt:{}.",
1225+
serviceUnit, stateData,
1226+
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
1227+
}
1228+
});
1229+
continue;
1230+
}
12111231
overrideOwnership(serviceUnit, stateData, broker);
12121232
}
12131233
orphanServiceUnitCleanupCnt++;

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,14 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
367367
// all pre-registered namespace is assumed to have bundles disabled
368368
nsFullBundle = bundleFactory.getFullBundle(nsname);
369369
// v2 namespace will always use full bundle object
370-
NamespaceEphemeralData otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
370+
final NamespaceEphemeralData otherData;
371+
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
372+
ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
373+
otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
374+
} else {
375+
otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
376+
}
377+
371378
if (StringUtils.equals(pulsar.getBrokerServiceUrl(), otherData.getNativeUrl())
372379
|| StringUtils.equals(pulsar.getBrokerServiceUrlTls(), otherData.getNativeUrlTls())) {
373380
if (nsFullBundle != null) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,15 @@
8989
import org.apache.pulsar.broker.namespace.LookupOptions;
9090
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
9191
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
92+
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
9293
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
9394
import org.apache.pulsar.client.admin.PulsarAdminException;
9495
import org.apache.pulsar.client.impl.TableViewImpl;
9596
import org.apache.pulsar.common.naming.NamespaceBundle;
9697
import org.apache.pulsar.common.naming.NamespaceName;
9798
import org.apache.pulsar.common.naming.ServiceUnitId;
9899
import org.apache.pulsar.common.naming.TopicName;
100+
import org.apache.pulsar.common.naming.TopicVersion;
99101
import org.apache.pulsar.common.policies.data.BundlesData;
100102
import org.apache.pulsar.common.policies.data.ClusterData;
101103
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -1035,6 +1037,24 @@ public void testListTopic() throws Exception {
10351037
admin.namespaces().deleteNamespace(namespace, true);
10361038
}
10371039

1040+
@Test(timeOut = 30 * 1000)
1041+
public void testTryAcquiringOwnership()
1042+
throws PulsarAdminException, ExecutionException, InterruptedException {
1043+
final String namespace = "public/testTryAcquiringOwnership";
1044+
admin.namespaces().createNamespace(namespace, 3);
1045+
String topic = "persistent://" + namespace + "/test";
1046+
admin.topics().createNonPartitionedTopic(topic);
1047+
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
1048+
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
1049+
assertEquals(namespaceEphemeralData.getNativeUrl(), pulsar1.getBrokerServiceUrl());
1050+
admin.namespaces().deleteNamespace(namespace, true);
1051+
}
1052+
1053+
@Test(timeOut = 30 * 1000)
1054+
public void testHealthcheck() throws PulsarAdminException {
1055+
admin.brokers().healthcheck(TopicVersion.V2);
1056+
}
1057+
10381058
private static abstract class MockBrokerFilter implements BrokerFilter {
10391059

10401060
@Override

0 commit comments

Comments
 (0)