diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index aef0f2e49c6c6..bb7bee7e04722 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -568,6 +568,14 @@
test-jar
test
+
+
+ ${project.groupId}
+ pulsar-metadata
+ ${project.version}
+ test-jar
+ test
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index adcb504c699a5..dc8f38390a4fe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -454,6 +454,9 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
// The leader election service was not initialized yet. This can happen because the broker service is
// initialized first and it might start receiving lookup requests before the leader election service is
// fully initialized.
+ LOG.warn("Leader election service isn't initialized yet. "
+ + "Returning empty result to lookup. NamespaceBundle[{}]",
+ bundle);
lookupFuture.complete(Optional.empty());
return;
}
@@ -480,23 +483,45 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getSafeWebServiceAddress();
- } else if (!this.loadManager.get().isCentralized()
- || pulsar.getLeaderElectionService().isLeader()
- || !currentLeader.isPresent()
-
+ } else {
+ LoadManager loadManager = this.loadManager.get();
+ boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
+ if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
- || !isBrokerActive(currentLeader.get().getServiceUrl())
- ) {
- Optional availableBroker = getLeastLoadedFromLoadManager(bundle);
- if (!availableBroker.isPresent()) {
- lookupFuture.complete(Optional.empty());
- return;
+ boolean leaderBrokerActive = currentLeader.isPresent()
+ && isBrokerActive(currentLeader.get().getServiceUrl());
+ if (!leaderBrokerActive) {
+ makeLoadManagerDecisionOnThisBroker = true;
+ if (!currentLeader.isPresent()) {
+ LOG.warn(
+ "The information about the current leader broker wasn't available. "
+ + "Handling load manager decisions in a decentralized way. "
+ + "NamespaceBundle[{}]",
+ bundle);
+ } else {
+ LOG.warn(
+ "The current leader broker {} isn't active. "
+ + "Handling load manager decisions in a decentralized way. "
+ + "NamespaceBundle[{}]",
+ currentLeader.get(), bundle);
+ }
+ }
+ }
+ if (makeLoadManagerDecisionOnThisBroker) {
+ Optional availableBroker = getLeastLoadedFromLoadManager(bundle);
+ if (!availableBroker.isPresent()) {
+ LOG.warn("Load manager didn't return any available broker. "
+ + "Returning empty result to lookup. NamespaceBundle[{}]",
+ bundle);
+ lookupFuture.complete(Optional.empty());
+ return;
+ }
+ candidateBroker = availableBroker.get();
+ authoritativeRedirect = true;
+ } else {
+ // forward to leader broker to make assignment
+ candidateBroker = currentLeader.get().getServiceUrl();
}
- candidateBroker = availableBroker.get();
- authoritativeRedirect = true;
- } else {
- // forward to leader broker to make assignment
- candidateBroker = currentLeader.get().getServiceUrl();
}
}
} catch (Exception e) {
@@ -577,19 +602,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}
protected CompletableFuture createLookupResult(String candidateBroker, boolean authoritativeRedirect,
- final String advertisedListenerName)
- throws Exception {
+ final String advertisedListenerName) {
CompletableFuture lookupFuture = new CompletableFuture<>();
try {
- checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
- URI uri = new URI(candidateBroker);
- String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
- uri.getPort());
+ checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
+ String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);
localBrokerDataCache.get(path).thenAccept(reportData -> {
if (reportData.isPresent()) {
- LocalBrokerData lookupData = (LocalBrokerData) reportData.get();
+ LocalBrokerData lookupData = reportData.get();
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
if (listener == null) {
@@ -622,22 +644,36 @@ protected CompletableFuture createLookupResult(String candidateBro
}
private boolean isBrokerActive(String candidateBroker) {
- List brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
- for (String brokerHostPort : brokers) {
- if (candidateBroker.equals("http://" + brokerHostPort)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
- }
- return true;
+ String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
+ Set availableBrokers = getAvailableBrokers();
+ if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
}
+ return true;
+ } else {
+ LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
+ candidateBroker, candidateBrokerHostAndPort,
+ availableBrokers.stream().collect(Collectors.joining(",")));
+ return false;
}
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Broker not found for SLA Monitoring Namespace {}",
- candidateBroker + ":" + config.getWebServicePort());
+ private static String parseHostAndPort(String candidateBroker) {
+ int uriSeparatorPos = candidateBroker.indexOf("://");
+ if (uriSeparatorPos == -1) {
+ throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
+ }
+ String candidateBrokerHostAndPort = candidateBroker.substring(uriSeparatorPos + 3);
+ return candidateBrokerHostAndPort;
+ }
+
+ private Set getAvailableBrokers() {
+ try {
+ return loadManager.get().getAvailableBrokers();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- return false;
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 9fbcf17a78b8c..586e3b39aec86 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -31,6 +31,7 @@
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -46,6 +47,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -64,6 +66,7 @@ public class NamespaceBundleFactory {
private final PulsarService pulsar;
private final MetadataCache policiesCache;
+ private final Duration maxRetryDuration = Duration.ofSeconds(10);
public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;
@@ -90,22 +93,27 @@ private CompletableFuture loadBundles(NamespaceName namespace,
}
CompletableFuture future = new CompletableFuture<>();
+ doLoadBundles(namespace, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
+ return future;
+ }
+
+ private void doLoadBundles(NamespaceName namespace, CompletableFuture future,
+ Backoff backoff, long retryDeadline) {
// Read the static bundle data from the policies
pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {
-
if (result.isPresent()) {
try {
future.complete(readBundles(namespace,
result.get().getValue(), result.get().getStat().getVersion()));
} catch (IOException e) {
- future.completeExceptionally(e);
+ handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
}
} else {
// If no local policies defined for namespace, copy from global config
copyToLocalPolicies(namespace)
.thenAccept(b -> future.complete(b))
.exceptionally(ex -> {
- future.completeExceptionally(ex);
+ handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, ex);
return null;
});
}
@@ -113,7 +121,23 @@ private CompletableFuture loadBundles(NamespaceName namespace,
future.completeExceptionally(ex);
return null;
});
- return future;
+ }
+
+ private void handleLoadBundlesRetry(NamespaceName namespace,
+ CompletableFuture future,
+ Backoff backoff, long retryDeadline, Throwable e) {
+ if (e instanceof Error || System.nanoTime() > retryDeadline) {
+ future.completeExceptionally(e);
+ } else {
+ LOG.warn("Error loading bundle for {}. Retrying exception", namespace, e);
+ long retryDelay = backoff.next();
+ pulsar.getExecutor().schedule(() ->
+ doLoadBundles(namespace, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private static Backoff createBackoff() {
+ return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
}
private NamespaceBundles readBundles(NamespaceName namespace, LocalPolicies localPolicies, long version)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index f4d106da67f0b..c00ae8cd0d39d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -27,6 +27,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.MockZooKeeperSession;
import org.testng.annotations.AfterClass;
@@ -82,13 +84,13 @@ protected PulsarService createAdditionalBroker(int additionalBrokerIndex) throws
}
@Override
- protected ZKMetadataStore createLocalMetadataStore() {
+ protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
}
@Override
- protected ZKMetadataStore createConfigurationMetadataStore() {
+ protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index a3f7166cace8e..c50b477b5d313 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -60,6 +60,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -245,6 +247,11 @@ protected final void internalCleanup() throws Exception {
}
bkExecutor = null;
}
+ onCleanup();
+ }
+
+ protected void onCleanup() {
+
}
protected abstract void setup() throws Exception;
@@ -332,11 +339,11 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
}
}
- protected ZKMetadataStore createLocalMetadataStore() {
+ protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeper);
}
- protected ZKMetadataStore createConfigurationMetadataStore() {
+ protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeperGlobal);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
index 0045dddeb8161..462b640c17511 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -20,15 +20,68 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.metadata.TestZKServer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
+@Slf4j
@Test(groups = "broker")
public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
+ @Override
+ protected int numberOfAdditionalBrokers() {
+ return 9;
+ }
+
+ TestZKServer testZKServer;
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ testZKServer = new TestZKServer();
+ }
+
+ @Override
+ protected void onCleanup() {
+ super.onCleanup();
+ if (testZKServer != null) {
+ try {
+ testZKServer.close();
+ } catch (Exception e) {
+ log.error("Error in stopping ZK server", e);
+ }
+ }
+ }
+
+ @Override
+ protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+ return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+ }
+
+ @Override
+ protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+ return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+ }
@Test
public void shouldElectOneLeader() {
@@ -68,4 +121,48 @@ public void shouldAllBrokersBeAbleToGetTheLeader() {
}
});
}
+
+ @Test
+ public void shouldProvideConsistentAnswerToTopicLookups()
+ throws PulsarAdminException, ExecutionException, InterruptedException {
+ String topicNameBase = "persistent://public/default/lookuptest" + UUID.randomUUID() + "-";
+ List topicNames = IntStream.range(0, 500).mapToObj(i -> topicNameBase + i)
+ .collect(Collectors.toList());
+ List allAdmins = getAllAdmins();
+ @Cleanup("shutdown")
+ ExecutorService executorService = Executors.newFixedThreadPool(allAdmins.size());
+ List>> resultFutures = new ArrayList<>();
+ String leaderBrokerUrl = admin.brokers().getLeaderBroker().getServiceUrl();
+ log.info("LEADER is {}", leaderBrokerUrl);
+ // use Phaser to increase the chances of a race condition by triggering all threads once
+ // they are waiting just before the lookupTopic call
+ final Phaser phaser = new Phaser(1);
+ for (PulsarAdmin brokerAdmin : allAdmins) {
+ if (!leaderBrokerUrl.equals(brokerAdmin.getServiceUrl())) {
+ phaser.register();
+ log.info("Doing lookup to broker {}", brokerAdmin.getServiceUrl());
+ resultFutures.add(executorService.submit(() -> {
+ phaser.arriveAndAwaitAdvance();
+ return topicNames.stream().map(topicName -> {
+ try {
+ return brokerAdmin.lookups().lookupTopic(topicName);
+ } catch (PulsarAdminException e) {
+ log.error("Error looking up topic {} in {}", topicName, brokerAdmin.getServiceUrl());
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }));
+ }
+ }
+ phaser.arriveAndAwaitAdvance();
+ List firstResult = null;
+ for (Future> resultFuture : resultFutures) {
+ List result = resultFuture.get();
+ if (firstResult == null) {
+ firstResult = result;
+ } else {
+ assertEquals(result, firstResult, "The lookup results weren't consistent.");
+ }
+ }
+ }
}
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 7ce5a6b778a6a..c98d9514797a7 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -77,6 +77,17 @@
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
com.github.spotbugs
spotbugs-maven-plugin