Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Handle heartbeat namespace in ExtensibleLoadManager #20551

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -84,6 +85,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
Expand Down Expand Up @@ -365,56 +367,99 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit

final String bundle = serviceUnit.toString();

CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(bundle, k -> {
return dedupeLookupRequest(bundle, k -> {
final CompletableFuture<Optional<String>> owner;
// Assign the bundle to channel owner if is internal topic, to avoid circular references.
if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
owner = serviceUnitStateChannel.getChannelOwnerAsync();
} else {
owner = serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event to channel.
if (broker.isEmpty()) {
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
.thenApply(Optional::of);
} else {
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
}
});
owner = getOwnerAsync(serviceUnit, bundle, false).thenApply(Optional::ofNullable);
}
return getBrokerLookupData(owner, bundle);
});
}

private CompletableFuture<String> getOwnerAsync(
ServiceUnitId serviceUnit, String bundle, boolean ownByLocalBrokerIfAbsent) {
return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event to channel.
if (broker.isEmpty()) {
CompletableFuture<Optional<String>> selectedBroker;
if (ownByLocalBrokerIfAbsent) {
String brokerId = this.brokerRegistry.getBrokerId();
selectedBroker = CompletableFuture.completedFuture(Optional.of(brokerId));
} else {
selectedBroker = this.selectAsync(serviceUnit);
}
return selectedBroker.thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
}
assignCounter.incrementSkip();
// Already assigned, return it.
return CompletableFuture.completedFuture(broker);
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
});
}
assignCounter.incrementSkip();
// Already assigned, return it.
return CompletableFuture.completedFuture(broker.get());
});
}

return owner.thenCompose(broker -> {
if (broker.isEmpty()) {
String errorMsg = String.format(
"Failed to get or assign the owner for bundle:%s", bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(broker.get());
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(brokerLookupData);
}));
private CompletableFuture<Optional<BrokerLookupData>> getBrokerLookupData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the same idea with @mattisonchao

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serviceUnitStateChannel.getChannelOwnerAsync() will return Optional, so we can change this method.

CompletableFuture<Optional<String>> owner,
String bundle) {
return owner.thenCompose(broker -> {
if (broker.isEmpty()) {
String errorMsg = String.format(
"Failed to get or assign the owner for bundle:%s", bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(broker.get());
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to look up a broker registry:%s for bundle:%s", broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(brokerLookupData);
}));
}

/**
* Method to get the current owner of the <code>NamespaceBundle</code>
* or set the local broker as the owner if absent.
*
* @param namespaceBundle the <code>NamespaceBundle</code>
* @return The ephemeral node data showing the current ownership info in <code>ServiceUnitStateChannel</code>
*/
public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId());
final String bundle = namespaceBundle.toString();
return dedupeLookupRequest(bundle, k -> {
final CompletableFuture<String> owner =
this.getOwnerAsync(namespaceBundle, bundle, true);
return getBrokerLookupData(owner.thenApply(Optional::ofNullable), bundle);
}).thenApply(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
throw new IllegalStateException(
"Failed to get the broker lookup data for bundle: " + bundle);
}
return brokerLookupData.get().toNamespaceEphemeralData();
});
}

private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
String key, Function<String, CompletableFuture<Optional<BrokerLookupData>>> provider) {
CompletableFuture<Optional<BrokerLookupData>> future = lookupRequests.computeIfAbsent(key, provider);
future.whenComplete((r, t) -> {
if (t != null) {
assignCounter.incrementFailure();
}
lookupRequests.remove(bundle);
lookupRequests.remove(key);
}
);
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,11 @@ private synchronized void doCleanup(String broker) {
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
String heartbeatNamespace =
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
.toString();
String heartbeatNamespaceV2 = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(),
pulsar.getConfiguration()).toString();

Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
for (var etr : tableview.entrySet()) {
Expand All @@ -1202,6 +1207,19 @@ private synchronized void doCleanup(String broker) {
if (isActiveState(state)) {
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
orphanSystemServiceUnits.put(serviceUnit, stateData);
} else if (serviceUnit.startsWith(heartbeatNamespace)
|| serviceUnit.startsWith(heartbeatNamespaceV2)) {
// Skip the heartbeat namespace
log.info("Skip override heartbeat namespace bundle"
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, "
+ "stateData:{}, cleanupErrorCnt:{}.",
serviceUnit, stateData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
}
});
} else {
overrideOwnership(serviceUnit, stateData, broker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,14 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
// all pre-registered namespace is assumed to have bundles disabled
nsFullBundle = bundleFactory.getFullBundle(nsname);
// v2 namespace will always use full bundle object
NamespaceEphemeralData otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
final NamespaceEphemeralData otherData;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
} else {
otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
}

if (StringUtils.equals(pulsar.getBrokerServiceUrl(), otherData.getNativeUrl())
|| StringUtils.equals(pulsar.getBrokerServiceUrlTls(), otherData.getNativeUrlTls())) {
if (nsFullBundle != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,16 @@
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -1038,19 +1041,48 @@ public void testListTopic() throws Exception {
}

@Test(timeOut = 30 * 1000)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws PulsarAdminException {
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exception {
NamespaceName heartbeatNamespacePulsar1V1 =
NamespaceService.getHeartbeatNamespace(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration());
NamespaceName heartbeatNamespacePulsar1V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar1.getAdvertisedAddress(), pulsar1.getConfiguration());

NamespaceName heartbeatNamespacePulsar2V1 =
NamespaceService.getHeartbeatNamespace(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration());
NamespaceName heartbeatNamespacePulsar2V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getAdvertisedAddress(), pulsar2.getConfiguration());

NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar1V1);
NamespaceBundle bundle2 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar1V2);

NamespaceBundle bundle3 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar2V1);
NamespaceBundle bundle4 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar2V2);

Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
assertTrue(ownedServiceUnitsByPulsar1.isEmpty());
// heartbeat namespace bundle will own by pulsar1
assertEquals(ownedServiceUnitsByPulsar1.size(), 2);
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
assertTrue(ownedServiceUnitsByPulsar2.isEmpty());
assertEquals(ownedServiceUnitsByPulsar2.size(), 2);
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
assertTrue(ownedNamespacesByPulsar1.isEmpty());
assertTrue(ownedNamespacesByPulsar2.isEmpty());
assertEquals(ownedNamespacesByPulsar1.size(), 2);
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
assertEquals(ownedNamespacesByPulsar2.size(), 2);
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));

String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units";
admin.topics().createPartitionedTopic(topic, 1);
Expand Down Expand Up @@ -1083,6 +1115,23 @@ private void assertOwnedServiceUnits(
assertEquals(status.broker_assignment, BrokerAssignment.shared);
}

@Test(timeOut = 30 * 1000)
public void testTryAcquiringOwnership()
throws PulsarAdminException, ExecutionException, InterruptedException {
final String namespace = "public/testTryAcquiringOwnership";
admin.namespaces().createNamespace(namespace, 1);
String topic = "persistent://" + namespace + "/test";
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
assertEquals(namespaceEphemeralData.getNativeUrl(), pulsar1.getBrokerServiceUrl());
admin.namespaces().deleteNamespace(namespace, true);
}

@Test(timeOut = 30 * 1000)
public void testHealthcheck() throws PulsarAdminException {
admin.brokers().healthcheck(TopicVersion.V2);
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down