Skip to content

Commit 011c7dd

Browse files
Demogorgon314lifepuzzlefun
authored andcommitted
[improve][broker] PIP-192: Define new load manager base interfaces (apache#18084)
Master Issue: apache#16691 ### Motivation We will start raising PRs to implement PIP-192, apache#16691 ### Modifications The PR adds base classes for the new broker load balance project and does not integrate with the existing load balance logic. This PR should not impact the existing broker load balance behavior. For the pip-192 project, this PR * defines the base interface under `org.apache.pulsar.broker.loadbalance.extensible` package. * defines this `BrokerRegistry` public interface and its expected behaviors. * defines `BrokerFilter` interfaces. * defines `LoadDataReporter` interfaces. * defines `NamespaceBundleSplitStrategy` interfaces. * defines `LoadManagerScheduler` interfaces. * defines `NamespaceUnloadStrategy` interfaces. * defines `LoadDataStore` interfaces. * defines `ExtensibleLoadManager` interfaces. * defines `LoadManagerContext` interfaces. * defines `BrokerLoadData` and `BrokerLookupData` data classes.
1 parent a87e961 commit 011c7dd

23 files changed

+1067
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions;
20+
21+
import java.util.List;
22+
import java.util.Optional;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.function.BiConsumer;
25+
import org.apache.pulsar.broker.PulsarServerException;
26+
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
27+
import org.apache.pulsar.metadata.api.MetadataStoreException;
28+
import org.apache.pulsar.metadata.api.NotificationType;
29+
30+
/**
31+
* Responsible for registering the current Broker lookup info to
32+
* the distributed store (e.g. Zookeeper) for broker discovery.
33+
*/
34+
public interface BrokerRegistry extends AutoCloseable {
35+
36+
/**
37+
* Start broker registry.
38+
*/
39+
void start() throws PulsarServerException;
40+
41+
/**
42+
* Register local broker to metadata store.
43+
*/
44+
void register() throws MetadataStoreException;
45+
46+
/**
47+
* Unregister the broker.
48+
*
49+
* Same as {@link org.apache.pulsar.broker.loadbalance.ModularLoadManager#disableBroker()}
50+
*/
51+
void unregister() throws MetadataStoreException;
52+
53+
/**
54+
* Get the current broker lookup service address.
55+
*
56+
* @return The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
57+
*/
58+
String getLookupServiceAddress();
59+
60+
/**
61+
* Async get available brokers.
62+
*
63+
* @return The brokers service url list.
64+
*/
65+
CompletableFuture<List<String>> getAvailableBrokersAsync();
66+
67+
/**
68+
* Get the broker lookup data.
69+
*
70+
* @param broker The service url without the protocol prefix, 'http://'. e.g. broker-xyz:port
71+
*/
72+
CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker);
73+
74+
/**
75+
* For each the broker lookup data.
76+
* The key is lookupServiceAddress{@link BrokerRegistry#getLookupServiceAddress()}
77+
*/
78+
void forEach(BiConsumer<String, BrokerLookupData> action);
79+
80+
/**
81+
* Listen the broker register change.
82+
*
83+
* @param listener Key is lookup service address{@link BrokerRegistry#getLookupServiceAddress()}
84+
* Value is notification type.
85+
*/
86+
void listen(BiConsumer<String, NotificationType> listener);
87+
88+
/**
89+
* Close the broker registry.
90+
*
91+
* @throws PulsarServerException if it fails to close the broker registry.
92+
*/
93+
@Override
94+
void close() throws PulsarServerException;
95+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions;
20+
21+
import java.io.Closeable;
22+
import java.util.Optional;
23+
import java.util.concurrent.CompletableFuture;
24+
import org.apache.pulsar.broker.PulsarServerException;
25+
import org.apache.pulsar.broker.PulsarService;
26+
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
27+
import org.apache.pulsar.broker.namespace.LookupOptions;
28+
import org.apache.pulsar.broker.namespace.NamespaceService;
29+
import org.apache.pulsar.common.naming.NamespaceBundle;
30+
import org.apache.pulsar.common.naming.ServiceUnitId;
31+
32+
/**
33+
* Find the appropriate broker for service unit (e.g. bundle) through different load balancer Implementation.
34+
*/
35+
public interface ExtensibleLoadManager extends Closeable {
36+
37+
/**
38+
* Start the extensible load manager.
39+
*
40+
* 1. Start the broker registry.
41+
* 2. Register self to registry.
42+
* 3. Start the load data store.
43+
* 4. Init the load manager context.
44+
* 5. Start load data reporter.
45+
* 6. Start the namespace unload scheduler.
46+
* 7. Start the namespace split scheduler.
47+
* 8. Listen the broker up or down, so we can split immediately.
48+
*/
49+
void start() throws PulsarServerException;
50+
51+
/**
52+
* Initialize this load manager using the given pulsar service.
53+
*/
54+
void initialize(PulsarService pulsar);
55+
56+
/**
57+
* The incoming service unit (e.g. bundle) selects the appropriate broker through strategies.
58+
*
59+
* @param topic The optional topic, some method won't provide topic var in this param
60+
* (e.g. {@link NamespaceService#internalGetWebServiceUrl(NamespaceBundle, LookupOptions)}),
61+
* So the topic is optional.
62+
* @param serviceUnit service unit (e.g. bundle).
63+
* @return The broker lookup data.
64+
*/
65+
CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
66+
67+
/**
68+
* Close the load manager.
69+
*
70+
* @throws PulsarServerException if it fails to stop the load manager.
71+
*/
72+
void close() throws PulsarServerException;
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions;
20+
21+
import org.apache.pulsar.broker.ServiceConfiguration;
22+
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
23+
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
24+
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
25+
26+
/**
27+
* The filter and load balance context, use for delivering context between filter, scheduler and strategy.
28+
*/
29+
public interface LoadManagerContext {
30+
31+
/**
32+
* The broker configuration.
33+
*/
34+
ServiceConfiguration brokerConfiguration();
35+
36+
/**
37+
* Broker load data store, each component use the context to access the load data store.
38+
*/
39+
LoadDataStore<BrokerLoadData> brokerLoadDataStore();
40+
41+
/**
42+
* Top bundle load data store.
43+
*/
44+
LoadDataStore<TopBundlesLoadData> topBundleLoadDataStore();
45+
46+
/**
47+
* The broker register.
48+
*/
49+
BrokerRegistry brokerRegistry();
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.data;
20+
21+
import lombok.Data;
22+
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
23+
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
24+
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
25+
26+
/**
27+
* Contains all the data that is maintained locally on each broker.
28+
*
29+
* Migrate from {@link org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData}.
30+
* And removed the lookup data, see {@link BrokerLookupData}
31+
*/
32+
@Data
33+
public class BrokerLoadData {
34+
35+
// Most recently available system resource usage.
36+
private ResourceUsage cpu;
37+
private ResourceUsage memory;
38+
private ResourceUsage directMemory;
39+
40+
private ResourceUsage bandwidthIn;
41+
private ResourceUsage bandwidthOut;
42+
43+
// Message data from the most recent namespace bundle stats.
44+
private double msgThroughputIn;
45+
private ResourceUsage msgThroughputInUsage;
46+
private double msgThroughputOut;
47+
private ResourceUsage msgThroughputOutUsage;
48+
private double msgRateIn;
49+
private double msgRateOut;
50+
51+
public BrokerLoadData() {
52+
cpu = new ResourceUsage();
53+
memory = new ResourceUsage();
54+
directMemory = new ResourceUsage();
55+
bandwidthIn = new ResourceUsage();
56+
bandwidthOut = new ResourceUsage();
57+
msgThroughputInUsage = new ResourceUsage();
58+
msgThroughputOutUsage = new ResourceUsage();
59+
}
60+
61+
/**
62+
* Using the system resource usage and bundle stats acquired from the Pulsar client, update this LocalBrokerData.
63+
*
64+
* @param systemResourceUsage
65+
* System resource usage (cpu, memory, and direct memory).
66+
*/
67+
public void update(final SystemResourceUsage systemResourceUsage) {
68+
updateSystemResourceUsage(systemResourceUsage);
69+
}
70+
71+
/**
72+
* Using another LocalBrokerData, update this.
73+
*
74+
* @param other
75+
* LocalBrokerData to update from.
76+
*/
77+
public void update(final BrokerLoadData other) {
78+
updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
79+
}
80+
81+
// Set the cpu, memory, and direct memory to that of the new system resource usage data.
82+
private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
83+
updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
84+
systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
85+
}
86+
87+
// Update resource usage given each individual usage.
88+
private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
89+
final ResourceUsage directMemory, final ResourceUsage bandwidthIn,
90+
final ResourceUsage bandwidthOut) {
91+
this.cpu = cpu;
92+
this.memory = memory;
93+
this.directMemory = directMemory;
94+
this.bandwidthIn = bandwidthIn;
95+
this.bandwidthOut = bandwidthOut;
96+
}
97+
98+
public double getMaxResourceUsage() {
99+
return LocalBrokerData.max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
100+
bandwidthOut.percentUsage()) / 100;
101+
}
102+
103+
public double getMaxResourceUsageWithWeight(final double cpuWeight, final double memoryWeight,
104+
final double directMemoryWeight, final double bandwidthInWeight,
105+
final double bandwidthOutWeight) {
106+
return LocalBrokerData.max(cpu.percentUsage() * cpuWeight, memory.percentUsage() * memoryWeight,
107+
directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight,
108+
bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
109+
}
110+
111+
}

0 commit comments

Comments
 (0)