Skip to content

Commit

Permalink
[Broker] Fix and improve topic ownership assignment (#13069)
Browse files Browse the repository at this point in the history
* Add warning log message when leader broker isn't available

* Add more logging about load manager decisions

* Use cached information for available brokers

* Reproduce lookup race issue

* Use java.util.concurrent.Phaser to increase the chances of a race

* Address review feedback

* Increase concurrency of test case to reproduce race conditions

* Use real Zookeeper server in MultiBrokerLeaderElectionTest

* Add retry with backoff to loading namespace bundles

* Add more topics to test

* Address review comment

* Fix checkstyle

* Improve logging

* Address review comments

(cherry picked from commit 537dee1)
  • Loading branch information
lhotari committed Dec 3, 2021
1 parent a40eda0 commit b95c6f3
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 42 deletions.
8 changes: 8 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,14 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-metadata</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
// The leader election service was not initialized yet. This can happen because the broker service is
// initialized first and it might start receiving lookup requests before the leader election service is
// fully initialized.
LOG.warn("Leader election service isn't initialized yet. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
Expand All @@ -480,23 +483,45 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getSafeWebServiceAddress();
} else if (!this.loadManager.get().isCentralized()
|| pulsar.getLeaderElectionService().isLeader()
|| !currentLeader.isPresent()

} else {
LoadManager loadManager = this.loadManager.get();
boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
|| !isBrokerActive(currentLeader.get().getServiceUrl())
) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
return;
boolean leaderBrokerActive = currentLeader.isPresent()
&& isBrokerActive(currentLeader.get().getServiceUrl());
if (!leaderBrokerActive) {
makeLoadManagerDecisionOnThisBroker = true;
if (!currentLeader.isPresent()) {
LOG.warn(
"The information about the current leader broker wasn't available. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
bundle);
} else {
LOG.warn(
"The current leader broker {} isn't active. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
currentLeader.get(), bundle);
}
}
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
LOG.warn("Load manager didn't return any available broker. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getServiceUrl();
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getServiceUrl();
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -577,19 +602,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}

protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
final String advertisedListenerName)
throws Exception {
final String advertisedListenerName) {

CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
uri.getPort());
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);

localBrokerDataCache.get(path).thenAccept(reportData -> {
if (reportData.isPresent()) {
LocalBrokerData lookupData = (LocalBrokerData) reportData.get();
LocalBrokerData lookupData = reportData.get();
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
if (listener == null) {
Expand Down Expand Up @@ -622,22 +644,36 @@ protected CompletableFuture<LookupResult> createLookupResult(String candidateBro
}

private boolean isBrokerActive(String candidateBroker) {
List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();

for (String brokerHostPort : brokers) {
if (candidateBroker.equals("http://" + brokerHostPort)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
}
return true;
String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
Set<String> availableBrokers = getAvailableBrokers();
if (availableBrokers.contains(candidateBrokerHostAndPort)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
}
return true;
} else {
LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
candidateBroker, candidateBrokerHostAndPort,
availableBrokers.stream().collect(Collectors.joining(",")));
return false;
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Broker not found for SLA Monitoring Namespace {}",
candidateBroker + ":" + config.getWebServicePort());
private static String parseHostAndPort(String candidateBroker) {
int uriSeparatorPos = candidateBroker.indexOf("://");
if (uriSeparatorPos == -1) {
throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
}
String candidateBrokerHostAndPort = candidateBroker.substring(uriSeparatorPos + 3);
return candidateBrokerHostAndPort;
}

private Set<String> getAvailableBrokers() {
try {
return loadManager.get().getAvailableBrokers();
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -46,6 +47,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand All @@ -64,6 +66,7 @@ public class NamespaceBundleFactory {

private final PulsarService pulsar;
private final MetadataCache<Policies> policiesCache;
private final Duration maxRetryDuration = Duration.ofSeconds(10);

public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;
Expand All @@ -90,30 +93,51 @@ private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespace,
}

CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
doLoadBundles(namespace, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
return future;
}

private void doLoadBundles(NamespaceName namespace, CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline) {
// Read the static bundle data from the policies
pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {

if (result.isPresent()) {
try {
future.complete(readBundles(namespace,
result.get().getValue(), result.get().getStat().getVersion()));
} catch (IOException e) {
future.completeExceptionally(e);
handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
}
} else {
// If no local policies defined for namespace, copy from global config
copyToLocalPolicies(namespace)
.thenAccept(b -> future.complete(b))
.exceptionally(ex -> {
future.completeExceptionally(ex);
handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, ex);
return null;
});
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}

private void handleLoadBundlesRetry(NamespaceName namespace,
CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline, Throwable e) {
if (e instanceof Error || System.nanoTime() > retryDeadline) {
future.completeExceptionally(e);
} else {
LOG.warn("Error loading bundle for {}. Retrying exception", namespace, e);
long retryDelay = backoff.next();
pulsar.getExecutor().schedule(() ->
doLoadBundles(namespace, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
}
}

private static Backoff createBackoff() {
return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
}

private NamespaceBundles readBundles(NamespaceName namespace, LocalPolicies localPolicies, long version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.MockZooKeeperSession;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -82,13 +84,13 @@ protected PulsarService createAdditionalBroker(int additionalBrokerIndex) throws
}

@Override
protected ZKMetadataStore createLocalMetadataStore() {
protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
}

@Override
protected ZKMetadataStore createConfigurationMetadataStore() {
protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
Expand Down Expand Up @@ -245,6 +247,11 @@ protected final void internalCleanup() throws Exception {
}
bkExecutor = null;
}
onCleanup();
}

protected void onCleanup() {

}

protected abstract void setup() throws Exception;
Expand Down Expand Up @@ -332,11 +339,11 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
}
}

protected ZKMetadataStore createLocalMetadataStore() {
protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeper);
}

protected ZKMetadataStore createConfigurationMetadataStore() {
protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeperGlobal);
}

Expand Down
Loading

0 comments on commit b95c6f3

Please sign in to comment.