Skip to content

Commit

Permalink
[fix][broker] Skip topic.close during unloading if the topic future f…
Browse files Browse the repository at this point in the history
…ails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (apache#22379)

(cherry picked from commit 3eb3b1c)
  • Loading branch information
heesung-sn committed Apr 2, 2024
1 parent e6501bd commit e762e33
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.manager;

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
Expand Down Expand Up @@ -88,14 +91,27 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,

@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
ServiceUnitState state = ServiceUnitStateData.state(data);

if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) {
if (log.isDebugEnabled()) {
log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit);
}
return;
}

if (t != null) {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t);
}
this.complete(serviceUnit, t);
return;
}
ServiceUnitState state = ServiceUnitStateData.state(data);

if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data, serviceUnit);
}

switch (state) {
case Free, Owned -> this.complete(serviceUnit, t);
default -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2277,9 +2277,18 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
}
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
: CompletableFuture.completedFuture(null))
.exceptionally(e -> {
if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException
&& e.getMessage().contains("Please redo the lookup")) {
log.warn("[{}] Topic ownership check failed. Skipping it", topicName);
return null;
}
throw FutureUtil.wrapToCompletionException(e);
}));
}
});

if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
TransactionMetadataStoreService metadataStoreService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,17 @@ protected CompletableFuture<Boolean> isBundleOwnedByAnyBroker(NamespaceName fqnn
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
NamespaceService nsService = pulsar().getNamespaceService();

if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return nsService.checkOwnershipPresentAsync(nsBundle);
}

LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();
return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(optionUrl -> optionUrl.isPresent());

return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent);
}

protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, BundlesData bundles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand All @@ -63,13 +62,10 @@ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {

protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
conf.setForceDeleteNamespaceAllowed(true);
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -236,6 +237,32 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
assertTrue(brokerLookupData.isPresent());
}

@Test(timeOut = 30 * 1000)
public void testUnloadUponTopicLookupFailure() throws Exception {
TopicName topicName =
TopicName.get("public/test/testUnloadUponTopicLookupFailure");
NamespaceBundle bundle = pulsar1.getNamespaceService().getBundle(topicName);
primaryLoadManager.assign(Optional.empty(), bundle).get();

CompletableFuture future1 = new CompletableFuture();
CompletableFuture future2 = new CompletableFuture();
try {
pulsar1.getBrokerService().getTopics().put(topicName.toString(), future1);
pulsar2.getBrokerService().getTopics().put(topicName.toString(), future2);
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS).execute(() -> {
future1.completeExceptionally(new CompletionException(
new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
future2.completeExceptionally(new CompletionException(
new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
});
admin.namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(), bundle.getBundleRange());
} finally {
pulsar1.getBrokerService().getTopics().remove(topicName.toString());
pulsar2.getBrokerService().getTopics().remove(topicName.toString());
}
}


@Test(timeOut = 30 * 1000)
public void testUnloadAdminAPI() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-unload");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,53 +94,59 @@ public void testTimeout() throws IllegalAccessException {
public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException {
UnloadCounter counter = new UnloadCounter();
UnloadManager manager = new UnloadManager(counter);
String dstBroker = "broker-2";
String srcBroker = "broker-1";
String bundle = "bundle-1";
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin);
new UnloadDecision(new Unload(srcBroker, bundle), Success, Admin);
CompletableFuture<Void> future =
manager.waitAsync(CompletableFuture.completedFuture(null),
"bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
bundle, unloadDecision, 5, TimeUnit.SECONDS);
Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);

assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Assigning, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Assigning, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Deleted, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Splitting, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Releasing, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Init, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);

// Success with Owned state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
"bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
bundle, unloadDecision, 5, TimeUnit.SECONDS);
inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), null);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequestMap.size(), 0);
future.get();

future.get();
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);
}

Expand All @@ -158,7 +164,7 @@ public void testFailedStage() throws IllegalAccessException {
assertEquals(inFlightUnloadRequestMap.size(), 1);

manager.handleEvent("bundle-1",
new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT),
new ServiceUnitStateData(ServiceUnitState.Owned, null, "broker-1", VERSION_ID_INIT),
new IllegalStateException("Failed stage."));

try {
Expand Down

0 comments on commit e762e33

Please sign in to comment.