Skip to content

Commit

Permalink
[fix][broker] Support lookup options for extensible load manager (apa…
Browse files Browse the repository at this point in the history
…che#22487)

(cherry picked from commit 7fe92ac)
(cherry picked from commit d0c075f)
(cherry picked from commit 2b84dff)
Demogorgon314 authored and srinath-ctds committed Jul 1, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent fb72821 commit 64682a7
Showing 13 changed files with 175 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.Reflections;
@@ -63,7 +64,7 @@ public interface LoadManager {
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;

default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
Optional<ServiceUnitId> topic, ServiceUnitId bundle, LookupOptions options) {
throw new UnsupportedOperationException();
}

Original file line number Diff line number Diff line change
@@ -60,9 +60,12 @@ public interface ExtensibleLoadManager extends Closeable {
* (e.g. {@link NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}),
* So the topic is optional.
* @param serviceUnit service unit (e.g. bundle).
* @param options The lookup options.
* @return The broker lookup data.
*/
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
ServiceUnitId serviceUnit,
LookupOptions options);

/**
* Check the incoming service unit is owned by the current broker.
Original file line number Diff line number Diff line change
@@ -86,6 +86,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -451,7 +452,8 @@ public void initialize(PulsarService pulsar) {

@Override
public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
ServiceUnitId serviceUnit) {
ServiceUnitId serviceUnit,
LookupOptions options) {

final String bundle = serviceUnit.toString();

@@ -465,7 +467,7 @@ public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnit
if (candidateBrokerId != null) {
return CompletableFuture.completedFuture(Optional.of(candidateBrokerId));
}
return getOrSelectOwnerAsync(serviceUnit, bundle).thenApply(Optional::ofNullable);
return getOrSelectOwnerAsync(serviceUnit, bundle, options).thenApply(Optional::ofNullable);
});
}
return getBrokerLookupData(owner, bundle);
@@ -478,18 +480,18 @@ private CompletableFuture<String> getHeartbeatOrSLAMonitorBrokerId(ServiceUnitId
}

private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId serviceUnit,
String bundle) {
String bundle,
LookupOptions options) {
return serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
// If the bundle not assign yet, select and publish assign event to channel.
if (broker.isEmpty()) {
return this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
return this.selectAsync(serviceUnit, Collections.emptySet(), options).thenCompose(brokerOpt -> {
if (brokerOpt.isPresent()) {
assignCounter.incrementSuccess();
log.info("Selected new owner broker: {} for bundle: {}.", brokerOpt.get(), bundle);
return serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get());
}
throw new IllegalStateException(
"Failed to select the new owner broker for bundle: " + bundle);
return CompletableFuture.completedFuture(null);
});
}
assignCounter.incrementSkip();
@@ -503,22 +505,19 @@ private CompletableFuture<Optional<BrokerLookupData>> getBrokerLookupData(
String bundle) {
return owner.thenCompose(broker -> {
if (broker.isEmpty()) {
String errorMsg = String.format(
"Failed to get or assign the owner for bundle:%s", bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(broker.get());
}).thenCompose(broker -> this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to lookup broker:%s for bundle:%s, the broker has not been registered.",
broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
return CompletableFuture.completedFuture(Optional.empty());
}
return CompletableFuture.completedFuture(brokerLookupData);
}));
return this.getBrokerRegistry().lookupAsync(broker.get()).thenCompose(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
"Failed to lookup broker:%s for bundle:%s, the broker has not been registered.",
broker, bundle);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
return CompletableFuture.completedFuture(brokerLookupData);
});
});
}

/**
@@ -531,7 +530,7 @@ private CompletableFuture<Optional<BrokerLookupData>> getBrokerLookupData(
public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle namespaceBundle) {
log.info("Try acquiring ownership for bundle: {} - {}.", namespaceBundle, brokerRegistry.getBrokerId());
final String bundle = namespaceBundle.toString();
return assign(Optional.empty(), namespaceBundle)
return assign(Optional.empty(), namespaceBundle, LookupOptions.builder().readOnly(false).build())
.thenApply(brokerLookupData -> {
if (brokerLookupData.isEmpty()) {
String errorMsg = String.format(
@@ -564,12 +563,12 @@ private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
}
}

public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
return selectAsync(bundle, Collections.emptySet());
}

public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
Set<String> excludeBrokerSet) {
Set<String> excludeBrokerSet,
LookupOptions options) {
if (options.isReadOnly()) {
return CompletableFuture.completedFuture(Optional.empty());
}
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenComposeAsync(availableBrokers -> {
Original file line number Diff line number Diff line change
@@ -28,10 +28,11 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;

public class ExtensibleLoadManagerWrapper implements LoadManager {
@@ -62,9 +63,15 @@ public boolean isCentralized() {

@Override
public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
return loadManager.assign(topic, bundle)
.thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
Optional<ServiceUnitId> topic, ServiceUnitId bundle, LookupOptions options) {
return loadManager.assign(topic, bundle, options)
.thenApply(lookupData -> lookupData.map(data -> {
try {
return data.toLookupResult(options);
} catch (PulsarServerException ex) {
throw FutureUtil.wrapToCompletionException(ex);
}
}));
}

@Override
Original file line number Diff line number Diff line change
@@ -83,6 +83,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.CompressionType;
@@ -1413,7 +1414,8 @@ private synchronized void doCleanup(String broker) {
private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
try {
return loadManager.selectAsync(
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), Set.of(inactiveBroker))
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit),
Set.of(inactiveBroker), LookupOptions.builder().build())
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
} catch (Throwable e) {
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);
Original file line number Diff line number Diff line change
@@ -18,9 +18,12 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.data;

import java.net.URI;
import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
@@ -79,7 +82,19 @@ public long getStartTimestamp() {
return this.startTimestamp;
}

public LookupResult toLookupResult() {
public LookupResult toLookupResult(LookupOptions options) throws PulsarServerException {
if (options.hasAdvertisedListenerName()) {
AdvertisedListener listener = advertisedListeners.get(options.getAdvertisedListenerName());
if (listener == null) {
throw new PulsarServerException("the broker do not have "
+ options.getAdvertisedListenerName() + " listener");
}
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
return new LookupResult(webServiceUrl, webServiceUrlTls,
url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString(), LookupResult.Type.BrokerUrl, false);
}
return new LookupResult(webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls,
LookupResult.Type.BrokerUrl, false);
}
Original file line number Diff line number Diff line change
@@ -195,7 +195,7 @@ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicN
return CompletableFuture.completedFuture(optResult);
}
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle, options);
} else {
// TODO: Add unit tests cover it.
return findBrokerServiceUrl(bundle, options);
@@ -311,7 +311,7 @@ private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable Serv
}
CompletableFuture<Optional<LookupResult>> future =
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) :
? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle, options) :
findBrokerServiceUrl(bundle, options);

return future.thenApply(lookupResult -> {
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -61,7 +62,8 @@ protected String getLoadManagerClassName() {

protected String selectBroker(ServiceUnitId serviceUnit, Object loadManager) {
try {
return ((ExtensibleLoadManagerImpl) loadManager).assign(Optional.empty(), serviceUnit).get()
return ((ExtensibleLoadManagerImpl) loadManager)
.assign(Optional.empty(), serviceUnit, LookupOptions.builder().build()).get()
.get().getPulsarServiceUrl();
} catch (Throwable e) {
throw new RuntimeException(e);
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import com.google.common.collect.Sets;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -69,6 +71,8 @@ protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setWebServicePortTls(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
return conf;
}

Loading

0 comments on commit 64682a7

Please sign in to comment.