Skip to content

Commit

Permalink
[Broker] Fix and improve topic ownership assignment (apache#13069)
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Dec 3, 2021
1 parent 7cc29cb commit e1fbccd
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
// The leader election service was not initialized yet. This can happen because the broker service is
// initialized first and it might start receiving lookup requests before the leader election service is
// fully initialized.
LOG.warn("Leader election service isn't initialized yet. Returning empty result to lookup.");
lookupFuture.complete(Optional.empty());
return;
}
Expand All @@ -484,23 +485,40 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getSafeWebServiceAddress();
} else if (!this.loadManager.get().isCentralized()
|| pulsar.getLeaderElectionService().isLeader()
|| !currentLeader.isPresent()

} else {
LoadManager loadManager = this.loadManager.get();
boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
|| !isBrokerActive(currentLeader.get().getServiceUrl())
) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
return;
boolean leaderBrokerActive = currentLeader.isPresent()
&& isBrokerActive(currentLeader.get().getServiceUrl());
if (!leaderBrokerActive) {
makeLoadManagerDecisionOnThisBroker = true;
if (!currentLeader.isPresent()) {
LOG.warn(
"The information about the current leader broker wasn't available. "
+ "Handling load manager decisions in a decentralized way for {}",
bundle);
} else {
LOG.warn(
"The current leader broker {} isn't active. "
+ "Handling load manager decisions in a decentralized way for {}",
currentLeader.get(), bundle);
}
}
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
return;
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getServiceUrl();
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getServiceUrl();
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -583,19 +601,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}

protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
final String advertisedListenerName)
throws Exception {
final String advertisedListenerName) {

CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
uri.getPort());
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);

localBrokerDataCache.get(path).thenAccept(reportData -> {
if (reportData.isPresent()) {
LocalBrokerData lookupData = (LocalBrokerData) reportData.get();
LocalBrokerData lookupData = reportData.get();
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
if (listener == null) {
Expand Down Expand Up @@ -627,22 +642,36 @@ protected CompletableFuture<LookupResult> createLookupResult(String candidateBro
}

private boolean isBrokerActive(String candidateBroker) {
List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();

for (String brokerHostPort : brokers) {
if (candidateBroker.equals("http://" + brokerHostPort)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
}
return true;
String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
Set<String> availableBrokers = getAvailableBrokers();
if (availableBrokers.contains(candidateBrokerHostAndPort)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
}
return true;
} else {
LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
candidateBroker, candidateBrokerHostAndPort,
availableBrokers.stream().collect(Collectors.joining(",")));
return false;
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Broker not found for SLA Monitoring Namespace {}",
candidateBroker + ":" + config.getWebServicePort());
private String parseHostAndPort(String candidateBroker) {
int uriSeparatorPos = candidateBroker.indexOf("://");
if (uriSeparatorPos == -1) {
throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
}
String candidateBrokerHostAndPort = candidateBroker.substring(uriSeparatorPos + 3);
return candidateBrokerHostAndPort;
}

private Set<String> getAvailableBrokers() {
try {
return loadManager.get().getAvailableBrokers();
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,35 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
@Override
protected int numberOfAdditionalBrokers() {
return 9;
}

@Test
public void shouldElectOneLeader() {
Expand Down Expand Up @@ -68,4 +88,49 @@ public void shouldAllBrokersBeAbleToGetTheLeader() {
}
});
}

@Test
public void shouldProvideConsistentAnswerToTopicLookups()
throws PulsarAdminException, ExecutionException, InterruptedException {
String topicNameBase = "persistent://public/default/lookuptest" + UUID.randomUUID() + "-";
List<String> topicNames = IntStream.range(0, 50).mapToObj(i -> topicNameBase + i)
.collect(Collectors.toList());
List<PulsarAdmin> allAdmins = getAllAdmins();
@Cleanup("shutdown")
ExecutorService executorService = Executors.newFixedThreadPool(allAdmins.size());
List<Future<List<String>>> resultFutures = new ArrayList<>();
String leaderBrokerUrl = admin.brokers().getLeaderBroker().getServiceUrl();
log.info("LEADER is {}", leaderBrokerUrl);
// use Phaser to increase the chances of a race condition by triggering all threads once
// they are waiting just before the lookupTopic call
final Phaser phaser = new Phaser(1);
for (PulsarAdmin brokerAdmin : allAdmins) {
if (!leaderBrokerUrl.equals(brokerAdmin.getServiceUrl())) {
phaser.register();
log.info("Doing lookup to broker {}", brokerAdmin.getServiceUrl());
resultFutures.add(executorService.submit(() -> {
phaser.arriveAndAwaitAdvance();
return topicNames.stream().map(topicName -> {
try {
return brokerAdmin.lookups().lookupTopic(topicName);
} catch (PulsarAdminException e) {
log.error("Error looking up topic {} in {}", topicName, brokerAdmin.getServiceUrl());
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}));
}
}
phaser.arriveAndAwaitAdvance();
List<String> firstResult = null;
for (Future<List<String>> resultFuture : resultFutures) {
List<String> result = resultFuture.get();
log.info("LOOKUP RESULT {}", result);
if (firstResult == null) {
firstResult = result;
} else {
assertEquals(result, firstResult, "The lookup results weren't consistent.");
}
}
}
}

0 comments on commit e1fbccd

Please sign in to comment.