Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Add large topic count filter #19613

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
@@ -132,6 +133,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
*/
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ public class BrokerLoadData {
private double msgRateIn; // messages/sec
private double msgRateOut; // messages/sec
private int bundleCount;
private int topics;

// Load data features computed from the above resources.
private double maxResourceUsage; // max of resource usages
@@ -113,13 +114,15 @@ public void update(final SystemResourceUsage usage,
double msgRateIn,
double msgRateOut,
int bundleCount,
int topics,
ServiceConfiguration conf) {
updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory, usage.bandwidthIn, usage.bandwidthOut);
this.msgThroughputIn = msgThroughputIn;
this.msgThroughputOut = msgThroughputOut;
this.msgRateIn = msgRateIn;
this.msgRateOut = msgRateOut;
this.bundleCount = bundleCount;
this.topics = topics;
updateFeatures(conf);
updatedAt = System.currentTimeMillis();
}
@@ -137,6 +140,7 @@ public void update(final BrokerLoadData other) {
msgRateIn = other.msgRateIn;
msgRateOut = other.msgRateOut;
bundleCount = other.bundleCount;
topics = other.topics;
weightedMaxEMA = other.weightedMaxEMA;
maxResourceUsage = other.maxResourceUsage;
updatedAt = other.updatedAt;
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;

public class BrokerMaxTopicCountFilter implements BrokerFilter {

public static final String FILTER_NAME = "broker_max_topic_count_filter";

@Override
public String name() {
return FILTER_NAME;
}

@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
LoadManagerContext context) throws BrokerFilterException {
int loadBalancerBrokerMaxTopics = context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
Optional<BrokerLoadData> brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0);
// TODO: The broker load data might be delayed, so the max topic check might not accurate.
return topics >= loadBalancerBrokerMaxTopics;
});
return brokers;
}
}
Original file line number Diff line number Diff line change
@@ -83,8 +83,8 @@ public BrokerLoadData generateLoadData() {
brokerStats.msgRateIn,
brokerStats.msgRateOut,
brokerStats.bundleCount,
brokerStats.topics,
pulsar.getConfiguration());

}
return this.localData;
}
Original file line number Diff line number Diff line change
@@ -132,6 +132,7 @@ public synchronized void updateStats(
k -> new NamespaceBundleStats());
currentBundleStats.reset();
currentBundleStats.topics = topics.size();
brokerStats.topics += topics.size();

topicStatsStream.startObject(NamespaceBundle.getBundleRange(bundle));

@@ -280,4 +281,4 @@ public void recordConnectionCreateSuccess() {
public void recordConnectionCreateFail() {
brokerOperabilityMetrics.recordConnectionCreateFail();
}
}
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
public class BrokerStats extends NamespaceStats {

public int bundleCount;
public int topics;
public BrokerStats(int ratePeriodInSeconds) {
super(ratePeriodInSeconds);
}
@@ -29,5 +30,6 @@ public BrokerStats(int ratePeriodInSeconds) {
public void reset() {
super.reset();
bundleCount = 0;
topics = 0;
}
}
Original file line number Diff line number Diff line change
@@ -330,7 +330,7 @@ public void testGetMetrics() throws Exception {
usage.setDirectMemory(directMemory);
usage.setBandwidthIn(bandwidthIn);
usage.setBandwidthOut(bandwidthOut);
loadData.update(usage, 1, 2, 3, 4, 5, conf);
loadData.update(usage, 1, 2, 3, 4, 5, 6, conf);
brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
}
{
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ public void testUpdateBySystemResourceUsage() {
usage1.setDirectMemory(directMemory);
usage1.setBandwidthIn(bandwidthIn);
usage1.setBandwidthOut(bandwidthOut);
data.update(usage1, 1, 2, 3, 4, 5, conf);
data.update(usage1, 1, 2, 3, 4, 5, 6, conf);

assertEquals(data.getCpu(), cpu);
assertEquals(data.getMemory(), memory);
@@ -71,6 +71,7 @@ public void testUpdateBySystemResourceUsage() {
assertEquals(data.getMsgRateIn(), 3.0);
assertEquals(data.getMsgRateOut(), 4.0);
assertEquals(data.getBundleCount(), 5);
assertEquals(data.getTopics(), 6);
assertEquals(data.getMaxResourceUsage(), 0.04); // skips memory usage
assertEquals(data.getWeightedMaxEMA(), 2);
assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
@@ -87,7 +88,7 @@ public void testUpdateBySystemResourceUsage() {
usage2.setDirectMemory(directMemory);
usage2.setBandwidthIn(bandwidthIn);
usage2.setBandwidthOut(bandwidthOut);
data.update(usage2, 5, 6, 7, 8, 9, conf);
data.update(usage2, 5, 6, 7, 8, 9, 10, conf);

assertEquals(data.getCpu(), cpu);
assertEquals(data.getMemory(), memory);
@@ -99,6 +100,7 @@ public void testUpdateBySystemResourceUsage() {
assertEquals(data.getMsgRateIn(), 7.0);
assertEquals(data.getMsgRateOut(), 8.0);
assertEquals(data.getBundleCount(), 9);
assertEquals(data.getTopics(), 10);
assertEquals(data.getMaxResourceUsage(), 3.0);
assertEquals(data.getWeightedMaxEMA(), 1.875);
assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
@@ -137,7 +139,7 @@ public void testUpdateByBrokerLoadData() {
usage1.setDirectMemory(directMemory);
usage1.setBandwidthIn(bandwidthIn);
usage1.setBandwidthOut(bandwidthOut);
other.update(usage1, 1, 2, 3, 4, 5, conf);
other.update(usage1, 1, 2, 3, 4, 5, 6, conf);
data.update(other);

assertEquals(data, other);
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;

public class BrokerFilterTestBase {

public LoadManagerContext getContext() {
LoadManagerContext mockContext = mock(LoadManagerContext.class);
ServiceConfiguration configuration = new ServiceConfiguration();
var brokerLoadDataStore = new LoadDataStore<BrokerLoadData>() {
Map<String, BrokerLoadData> map = new HashMap<>();
@Override
public void close() throws IOException {

}

@Override
public CompletableFuture<Void> pushAsync(String key, BrokerLoadData loadData) {
map.put(key, loadData);
return null;
}

@Override
public CompletableFuture<Void> removeAsync(String key) {
return null;
}

@Override
public Optional<BrokerLoadData> get(String key) {
var val = map.get(key);
if (val == null) {
return Optional.empty();
}
return Optional.of(val);
}

@Override
public void forEach(BiConsumer<String, BrokerLoadData> action) {

}

@Override
public Set<Map.Entry<String, BrokerLoadData>> entrySet() {
return map.entrySet();
}

@Override
public int size() {
return map.size();
}
};
configuration.setPreferLaterVersions(true);
doReturn(configuration).when(mockContext).brokerConfiguration();
doReturn(brokerLoadDataStore).when(mockContext).brokerLoadDataStore();
return mockContext;
}

public BrokerLookupData getLookupData() {
return getLookupData("3.0.0");
}

public BrokerLookupData getLookupData(String version) {
String webServiceUrl = "http://localhost:8080";
String webServiceUrlTls = "https://localhoss:8081";
String pulsarServiceUrl = "pulsar://localhost:6650";
String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
Map<String, String> protocols = new HashMap<>(){{
put("kafka", "9092");
}};
return new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true, true, version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.Map;

import static org.testng.Assert.assertEquals;

/**
* Unit test for {@link BrokerMaxTopicCountFilter}.
*/
@Test(groups = "broker")
public class BrokerMaxTopicCountFilterTest extends BrokerFilterTestBase {

@Test
public void test() throws IllegalAccessException, BrokerFilterException {
LoadManagerContext context = getContext();
LoadDataStore<BrokerLoadData> store = context.brokerLoadDataStore();
BrokerLoadData maxTopicLoadData = new BrokerLoadData();
FieldUtils.writeDeclaredField(maxTopicLoadData, "topics",
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics(), true);
BrokerLoadData exceedMaxTopicLoadData = new BrokerLoadData();
FieldUtils.writeDeclaredField(exceedMaxTopicLoadData, "topics",
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics() * 2, true);
store.pushAsync("broker1", maxTopicLoadData);
store.pushAsync("broker2", new BrokerLoadData());
store.pushAsync("broker3", exceedMaxTopicLoadData);

BrokerMaxTopicCountFilter filter = new BrokerMaxTopicCountFilter();
Map<String, BrokerLookupData> originalBrokers = Map.of(
"broker1", getLookupData(),
"broker2", getLookupData(),
"broker3", getLookupData(),
"broker4", getLookupData()
);
Map<String, BrokerLookupData> result = filter.filter(new HashMap<>(originalBrokers), context);
assertEquals(result, Map.of(
"broker2", getLookupData(),
"broker4", getLookupData()
));
}

}
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@@ -30,14 +29,13 @@
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.testng.annotations.Test;

/**
* Unit test for {@link BrokerVersionFilter}.
*/
@Test(groups = "broker")
public class BrokerVersionFilterTest {
public class BrokerVersionFilterTest extends BrokerFilterTestBase {


@Test
@@ -115,26 +113,4 @@ public void testInvalidVersionString() throws BrokerFilterException {
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
brokerVersionFilter.filter(new HashMap<>(originalBrokers), getContext());
}

public LoadManagerContext getContext() {
LoadManagerContext mockContext = mock(LoadManagerContext.class);
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setPreferLaterVersions(true);
doReturn(configuration).when(mockContext).brokerConfiguration();
return mockContext;
}

public BrokerLookupData getLookupData(String version) {
String webServiceUrl = "http://localhost:8080";
String webServiceUrlTls = "https://localhoss:8081";
String pulsarServiceUrl = "pulsar://localhost:6650";
String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
Map<String, String> protocols = new HashMap<>(){{
put("kafka", "9092");
}};
return new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true, true, version);
}
}
Original file line number Diff line number Diff line change
@@ -65,6 +65,7 @@ void setup() {
doReturn(Executors.newSingleThreadScheduledExecutor()).when(pulsar).getLoadManagerExecutor();
doReturn(pulsarStats).when(brokerService).getPulsarStats();
brokerStats = new BrokerStats(0);
brokerStats.topics = 6;
brokerStats.bundleCount = 5;
brokerStats.msgRateIn = 3;
brokerStats.msgRateOut = 4;
@@ -88,7 +89,7 @@ public void testGenerate() throws IllegalAccessException {
doReturn(0l).when(pulsarStats).getUpdatedAt();
var target = new BrokerLoadDataReporter(pulsar, "", store);
var expected = new BrokerLoadData();
expected.update(usage, 1, 2, 3, 4, 5, config);
expected.update(usage, 1, 2, 3, 4, 5, 6, config);
FieldUtils.writeDeclaredField(expected, "updatedAt", 0l, true);
var actual = target.generateLoadData();
FieldUtils.writeDeclaredField(actual, "updatedAt", 0l, true);
@@ -103,7 +104,7 @@ public void testReport() throws IllegalAccessException {
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
localData.setReportedAt(System.currentTimeMillis());
var lastData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "lastData", true);
lastData.update(usage, 1, 2, 3, 4, 5, config);
lastData.update(usage, 1, 2, 3, 4, 5, 6, config);
target.reportAsync(false);
verify(store, times(0)).pushAsync(any(), any());

@@ -117,7 +118,7 @@ public void testReport() throws IllegalAccessException {
target.reportAsync(false);
verify(store, times(2)).pushAsync(eq("broker-1"), any());

lastData.update(usage, 10000, 2, 3, 4, 5, config);
lastData.update(usage, 10000, 2, 3, 4, 5, 6, config);
target.reportAsync(false);
verify(store, times(3)).pushAsync(eq("broker-1"), any());
}
Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@ public BrokerLoadData getCpuLoad(LoadManagerContext ctx, int load) {
usage1.setDirectMemory(directMemory);
usage1.setBandwidthIn(bandwidthIn);
usage1.setBandwidthOut(bandwidthOut);
loadData.update(usage1, 1,2,3,4,5,
loadData.update(usage1, 1,2,3,4,5,6,
ctx.brokerConfiguration());
return loadData;
}
Original file line number Diff line number Diff line change
@@ -168,7 +168,7 @@ public void testNoLoadDataBrokers() {
private BrokerLoadData createBrokerData(LoadManagerContext ctx, double usage, double limit) {
var brokerLoadData = new BrokerLoadData();
SystemResourceUsage usages = createUsage(usage, limit);
brokerLoadData.update(usages, 1, 1, 1, 1, 1,
brokerLoadData.update(usages, 1, 1, 1, 1, 1, 1,
ctx.brokerConfiguration());
return brokerLoadData;
}
@@ -185,7 +185,7 @@ private SystemResourceUsage createUsage(double usage, double limit) {

private void updateLoad(LoadManagerContext ctx, String broker, double usage) {
ctx.brokerLoadDataStore().get(broker).get().update(createUsage(usage, 100.0),
1, 1, 1, 1, 1, ctx.brokerConfiguration());
1, 1, 1, 1, 1, 1, ctx.brokerConfiguration());
}

public static LoadManagerContext getContext() {