Skip to content

Commit 5f44f74

Browse files
committed
Add extensible load manager impl
1 parent 826a293 commit 5f44f74

15 files changed

+900
-54
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+52-5
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import static org.apache.commons.lang3.StringUtils.isBlank;
2323
import static org.apache.commons.lang3.StringUtils.isNotBlank;
2424
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
25+
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
2526
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
2627
import com.google.common.annotations.VisibleForTesting;
28+
import com.google.common.collect.Sets;
2729
import io.netty.channel.ChannelInitializer;
2830
import io.netty.channel.EventLoopGroup;
2931
import io.netty.channel.socket.SocketChannel;
@@ -87,14 +89,17 @@
8789
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
8890
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
8991
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
92+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
9093
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
9194
import org.apache.pulsar.broker.namespace.NamespaceService;
9295
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
9396
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
9497
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
9598
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
9699
import org.apache.pulsar.broker.resources.ClusterResources;
100+
import org.apache.pulsar.broker.resources.NamespaceResources;
97101
import org.apache.pulsar.broker.resources.PulsarResources;
102+
import org.apache.pulsar.broker.resources.TenantResources;
98103
import org.apache.pulsar.broker.rest.Topics;
99104
import org.apache.pulsar.broker.service.BrokerService;
100105
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
@@ -137,8 +142,11 @@
137142
import org.apache.pulsar.common.naming.NamespaceBundle;
138143
import org.apache.pulsar.common.naming.NamespaceName;
139144
import org.apache.pulsar.common.naming.TopicName;
145+
import org.apache.pulsar.common.policies.data.ClusterData;
140146
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
141147
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
148+
import org.apache.pulsar.common.policies.data.Policies;
149+
import org.apache.pulsar.common.policies.data.TenantInfo;
142150
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
143151
import org.apache.pulsar.common.util.FutureUtil;
144152
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
@@ -794,6 +802,12 @@ public void start() throws PulsarServerException {
794802
}
795803
brokerService.start();
796804

805+
if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) {
806+
// Init system namespace for extensible load manager
807+
this.createNamespaceIfNotExists(this.getConfiguration().getClusterName(),
808+
SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE);
809+
}
810+
797811
// Load additional servlets
798812
this.brokerAdditionalServlets = AdditionalServlets.load(config);
799813

@@ -825,6 +839,11 @@ public void start() throws PulsarServerException {
825839
this.webSocketService.setLocalCluster(clusterData);
826840
}
827841

842+
// By starting the Load manager service, the broker will also become visible
843+
// to the rest of the broker by creating the registration z-node. This needs
844+
// to be done only when the broker is fully operative.
845+
this.startLoadManagementService();
846+
828847
// Initialize namespace service, after service url assigned. Should init zk and refresh self owner info.
829848
this.nsService.initialize();
830849

@@ -866,11 +885,6 @@ public void start() throws PulsarServerException {
866885

867886
this.metricsGenerator = new MetricsGenerator(this);
868887

869-
// By starting the Load manager service, the broker will also become visible
870-
// to the rest of the broker by creating the registration z-node. This needs
871-
// to be done only when the broker is fully operative.
872-
this.startLoadManagementService();
873-
874888
// Initialize the message protocol handlers.
875889
// start the protocol handlers only after the broker is ready,
876890
// so that the protocol handlers can access broker service properly.
@@ -925,6 +939,36 @@ public void start() throws PulsarServerException {
925939
}
926940
}
927941

942+
protected void createNamespaceIfNotExists(String cluster, String publicTenant, NamespaceName ns) throws Exception {
943+
ClusterResources cr = this.getPulsarResources().getClusterResources();
944+
TenantResources tr = this.getPulsarResources().getTenantResources();
945+
NamespaceResources nsr = this.getPulsarResources().getNamespaceResources();
946+
947+
if (!cr.clusterExists(cluster)) {
948+
cr.createCluster(cluster,
949+
ClusterData.builder()
950+
.serviceUrl(this.getWebServiceAddress())
951+
.serviceUrlTls(this.getWebServiceAddressTls())
952+
.brokerServiceUrl(this.getBrokerServiceUrl())
953+
.brokerServiceUrlTls(this.getBrokerServiceUrlTls())
954+
.build());
955+
}
956+
957+
if (!tr.tenantExists(publicTenant)) {
958+
tr.createTenant(publicTenant,
959+
TenantInfo.builder()
960+
.adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
961+
.allowedClusters(Sets.newHashSet(cluster))
962+
.build());
963+
}
964+
965+
if (!nsr.namespaceExists(ns)) {
966+
Policies nsp = new Policies();
967+
nsp.replication_clusters = Collections.singleton(config.getClusterName());
968+
nsr.createPolicies(ns, nsp);
969+
}
970+
}
971+
928972
private synchronized void createMetricsServlet() {
929973
this.metricsServlet = new PulsarPrometheusMetricsServlet(
930974
this, config.isExposeTopicLevelMetricsInPrometheus(),
@@ -1085,6 +1129,9 @@ protected void closeLocalMetadataStore() throws Exception {
10851129
}
10861130

10871131
protected void startLeaderElectionService() {
1132+
if (this.loadManager.get() instanceof ExtensibleLoadManagerWrapper) {
1133+
return;
1134+
}
10881135
this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
10891136
state -> {
10901137
if (state == LeaderElectionState.Leading) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java

+17
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@
2525
import org.apache.pulsar.broker.PulsarServerException;
2626
import org.apache.pulsar.broker.PulsarService;
2727
import org.apache.pulsar.broker.ServiceConfiguration;
28+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
29+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
2830
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
2931
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
32+
import org.apache.pulsar.broker.lookup.LookupResult;
3033
import org.apache.pulsar.common.naming.ServiceUnitId;
3134
import org.apache.pulsar.common.stats.Metrics;
3235
import org.apache.pulsar.common.util.Reflections;
@@ -58,6 +61,15 @@ public interface LoadManager {
5861
*/
5962
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
6063

64+
default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
65+
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
66+
return null;
67+
}
68+
69+
default CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
70+
return null;
71+
}
72+
6173
/**
6274
* Generate the load report.
6375
*/
@@ -143,6 +155,11 @@ static LoadManager create(final PulsarService pulsar) {
143155
final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
144156
casted.initialize(pulsar);
145157
return casted;
158+
} else if (loadManagerInstance instanceof ExtensibleLoadManagerImpl) {
159+
final LoadManager casted =
160+
new ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance);
161+
casted.initialize(pulsar);
162+
return casted;
146163
}
147164
} catch (Exception e) {
148165
LOG.warn("Error when trying to create load manager: ", e);

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public BrokerRegistryImpl(PulsarService pulsar) {
8282
this.listeners = new ArrayList<>();
8383
this.brokerId = pulsar.getLookupServiceAddress();
8484
this.brokerLookupData = new BrokerLookupData(
85-
pulsar.getSafeWebServiceAddress(),
85+
pulsar.getWebServiceAddress(),
8686
pulsar.getWebServiceAddressTls(),
8787
pulsar.getBrokerServiceUrl(),
8888
pulsar.getBrokerServiceUrlTls(),

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java

+9
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ public interface ExtensibleLoadManager extends Closeable {
6464
*/
6565
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
6666

67+
/**
68+
* Check the incoming service unit is owned by the current broker.
69+
*
70+
* @param topic The optional topic, some method won't provide topic var in this param.
71+
* @param serviceUnit The service unit (e.g. bundle).
72+
* @return The broker lookup data.
73+
*/
74+
CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
75+
6776
/**
6877
* Close the load manager.
6978
*

0 commit comments

Comments
 (0)