Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Fix and improve topic ownership assignment #13069

Merged
merged 14 commits into from
Dec 3, 2021
Merged
8 changes: 8 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,14 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-metadata</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
// The leader election service was not initialized yet. This can happen because the broker service is
// initialized first and it might start receiving lookup requests before the leader election service is
// fully initialized.
LOG.warn("Leader election service isn't initialized yet. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
Expand All @@ -480,23 +483,45 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getSafeWebServiceAddress();
} else if (!this.loadManager.get().isCentralized()
|| pulsar.getLeaderElectionService().isLeader()
|| !currentLeader.isPresent()

} else {
LoadManager loadManager = this.loadManager.get();
boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
|| !isBrokerActive(currentLeader.get().getServiceUrl())
) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
return;
boolean leaderBrokerActive = currentLeader.isPresent()
&& isBrokerActive(currentLeader.get().getServiceUrl());
if (!leaderBrokerActive) {
makeLoadManagerDecisionOnThisBroker = true;
if (!currentLeader.isPresent()) {
LOG.warn(
"The information about the current leader broker wasn't available. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
bundle);
} else {
LOG.warn(
"The current leader broker {} isn't active. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
currentLeader.get(), bundle);
}
}
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
LOG.warn("Load manager didn't return any available broker. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getServiceUrl();
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getServiceUrl();
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -577,19 +602,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}

protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
final String advertisedListenerName)
throws Exception {
final String advertisedListenerName) {

CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
uri.getPort());
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + parseHostAndPort(candidateBroker);

localBrokerDataCache.get(path).thenAccept(reportData -> {
if (reportData.isPresent()) {
LocalBrokerData lookupData = (LocalBrokerData) reportData.get();
LocalBrokerData lookupData = reportData.get();
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
if (listener == null) {
Expand Down Expand Up @@ -622,22 +644,36 @@ protected CompletableFuture<LookupResult> createLookupResult(String candidateBro
}

private boolean isBrokerActive(String candidateBroker) {
List<String> brokers = pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();

for (String brokerHostPort : brokers) {
if (candidateBroker.equals("http://" + brokerHostPort)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
}
return true;
String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
Set<String> availableBrokers = getAvailableBrokers();
if (availableBrokers.contains(candidateBrokerHostAndPort)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} ({}) is available for.", candidateBroker, candidateBrokerHostAndPort);
}
return true;
} else {
LOG.warn("Broker {} ({}) couldn't be found in available brokers {}",
candidateBroker, candidateBrokerHostAndPort,
availableBrokers.stream().collect(Collectors.joining(",")));
return false;
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Broker not found for SLA Monitoring Namespace {}",
candidateBroker + ":" + config.getWebServicePort());
private static String parseHostAndPort(String candidateBroker) {
int uriSeparatorPos = candidateBroker.indexOf("://");
if (uriSeparatorPos == -1) {
throw new IllegalArgumentException("'" + candidateBroker + "' isn't an URI.");
}
String candidateBrokerHostAndPort = candidateBroker.substring(uriSeparatorPos + 3);
return candidateBrokerHostAndPort;
}

private Set<String> getAvailableBrokers() {
try {
return loadManager.get().getAvailableBrokers();
} catch (Exception e) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
throw new RuntimeException(e);
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -49,6 +50,7 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand All @@ -67,6 +69,7 @@ public class NamespaceBundleFactory {

private final PulsarService pulsar;
private final MetadataCache<Policies> policiesCache;
private final Duration maxRetryDuration = Duration.ofSeconds(10);

public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;
Expand All @@ -93,30 +96,51 @@ private CompletableFuture<NamespaceBundles> loadBundles(NamespaceName namespace,
}

CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
doLoadBundles(namespace, future, createBackoff(), System.nanoTime() + maxRetryDuration.toNanos());
return future;
}

private void doLoadBundles(NamespaceName namespace, CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline) {
// Read the static bundle data from the policies
pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result -> {

if (result.isPresent()) {
try {
future.complete(readBundles(namespace,
result.get().getValue(), result.get().getStat().getVersion()));
} catch (IOException e) {
future.completeExceptionally(e);
handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, e);
}
} else {
// If no local policies defined for namespace, copy from global config
copyToLocalPolicies(namespace)
.thenAccept(b -> future.complete(b))
.exceptionally(ex -> {
future.completeExceptionally(ex);
handleLoadBundlesRetry(namespace, future, backoff, retryDeadline, ex);
return null;
});
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}

private void handleLoadBundlesRetry(NamespaceName namespace,
CompletableFuture<NamespaceBundles> future,
Backoff backoff, long retryDeadline, Throwable e) {
if (e instanceof Error || System.nanoTime() > retryDeadline) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
future.completeExceptionally(e);
} else {
LOG.warn("Error loading bundle for {}. Retrying exception", namespace, e);
long retryDelay = backoff.next();
pulsar.getExecutor().schedule(() ->
doLoadBundles(namespace, future, backoff, retryDeadline), retryDelay, TimeUnit.MILLISECONDS);
}
}

private static Backoff createBackoff() {
return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
}

private NamespaceBundles readBundles(NamespaceName namespace, LocalPolicies localPolicies, long version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.MockZooKeeperSession;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -82,13 +84,13 @@ protected PulsarService createAdditionalBroker(int additionalBrokerIndex) throws
}

@Override
protected ZKMetadataStore createLocalMetadataStore() {
protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
}

@Override
protected ZKMetadataStore createConfigurationMetadataStore() {
protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
Expand Down Expand Up @@ -246,6 +247,11 @@ protected final void internalCleanup() throws Exception {
}
bkExecutor = null;
}
onCleanup();
}

protected void onCleanup() {

}

protected abstract void setup() throws Exception;
Expand Down Expand Up @@ -332,11 +338,11 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
}
}

protected ZKMetadataStore createLocalMetadataStore() {
protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeper);
}

protected ZKMetadataStore createConfigurationMetadataStore() {
protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
return new ZKMetadataStore(mockZooKeeperGlobal);
}

Expand Down
Loading