Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Add large topic count filter #19613

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.LargeTopicCountFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
Expand Down Expand Up @@ -132,6 +133,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
*/
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
this.brokerFilterPipeline.add(new LargeTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class BrokerLoadData {
private double msgRateIn; // messages/sec
private double msgRateOut; // messages/sec
private int bundleCount;
private int topics;

// Load data features computed from the above resources.
private double maxResourceUsage; // max of resource usages
Expand Down Expand Up @@ -113,13 +114,15 @@ public void update(final SystemResourceUsage usage,
double msgRateIn,
double msgRateOut,
int bundleCount,
int topics,
ServiceConfiguration conf) {
updateSystemResourceUsage(usage.cpu, usage.memory, usage.directMemory, usage.bandwidthIn, usage.bandwidthOut);
this.msgThroughputIn = msgThroughputIn;
this.msgThroughputOut = msgThroughputOut;
this.msgRateIn = msgRateIn;
this.msgRateOut = msgRateOut;
this.bundleCount = bundleCount;
this.topics = topics;
updateFeatures(conf);
updatedAt = System.currentTimeMillis();
}
Expand All @@ -137,6 +140,7 @@ public void update(final BrokerLoadData other) {
msgRateIn = other.msgRateIn;
msgRateOut = other.msgRateOut;
bundleCount = other.bundleCount;
topics = other.topics;
weightedMaxEMA = other.weightedMaxEMA;
maxResourceUsage = other.maxResourceUsage;
updatedAt = other.updatedAt;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;

public class LargeTopicCountFilter implements BrokerFilter {

public static final String FILTER_NAME = "large_topic_count_filter";

@Override
public String name() {
return FILTER_NAME;
}

@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
LoadManagerContext context) throws BrokerFilterException {
int loadBalancerBrokerMaxTopics = context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
Optional<BrokerLoadData> brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0);
// TODO: The broker load data might be delayed, so the max topic check might not accurate.
return topics >= loadBalancerBrokerMaxTopics;
});
return brokers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public BrokerLoadData generateLoadData() {
brokerStats.msgRateIn,
brokerStats.msgRateOut,
brokerStats.bundleCount,
brokerStats.topics,
pulsar.getConfiguration());

}
return this.localData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public synchronized void updateStats(
k -> new NamespaceBundleStats());
currentBundleStats.reset();
currentBundleStats.topics = topics.size();
brokerStats.topics += topics.size();

topicStatsStream.startObject(NamespaceBundle.getBundleRange(bundle));

Expand Down Expand Up @@ -280,4 +281,4 @@ public void recordConnectionCreateSuccess() {
public void recordConnectionCreateFail() {
brokerOperabilityMetrics.recordConnectionCreateFail();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public class BrokerStats extends NamespaceStats {

public int bundleCount;
public int topics;
public BrokerStats(int ratePeriodInSeconds) {
super(ratePeriodInSeconds);
}
Expand All @@ -29,5 +30,6 @@ public BrokerStats(int ratePeriodInSeconds) {
public void reset() {
super.reset();
bundleCount = 0;
topics = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public void testGetMetrics() throws Exception {
usage.setDirectMemory(directMemory);
usage.setBandwidthIn(bandwidthIn);
usage.setBandwidthOut(bandwidthOut);
loadData.update(usage, 1, 2, 3, 4, 5, conf);
loadData.update(usage, 1, 2, 3, 4, 5, 6, conf);
brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
}
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testUpdateBySystemResourceUsage() {
usage1.setDirectMemory(directMemory);
usage1.setBandwidthIn(bandwidthIn);
usage1.setBandwidthOut(bandwidthOut);
data.update(usage1, 1, 2, 3, 4, 5, conf);
data.update(usage1, 1, 2, 3, 4, 5, 6, conf);

assertEquals(data.getCpu(), cpu);
assertEquals(data.getMemory(), memory);
Expand All @@ -71,6 +71,7 @@ public void testUpdateBySystemResourceUsage() {
assertEquals(data.getMsgRateIn(), 3.0);
assertEquals(data.getMsgRateOut(), 4.0);
assertEquals(data.getBundleCount(), 5);
assertEquals(data.getTopics(), 6);
assertEquals(data.getMaxResourceUsage(), 0.04); // skips memory usage
assertEquals(data.getWeightedMaxEMA(), 2);
assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
Expand All @@ -87,7 +88,7 @@ public void testUpdateBySystemResourceUsage() {
usage2.setDirectMemory(directMemory);
usage2.setBandwidthIn(bandwidthIn);
usage2.setBandwidthOut(bandwidthOut);
data.update(usage2, 5, 6, 7, 8, 9, conf);
data.update(usage2, 5, 6, 7, 8, 9, 10, conf);

assertEquals(data.getCpu(), cpu);
assertEquals(data.getMemory(), memory);
Expand All @@ -99,6 +100,7 @@ public void testUpdateBySystemResourceUsage() {
assertEquals(data.getMsgRateIn(), 7.0);
assertEquals(data.getMsgRateOut(), 8.0);
assertEquals(data.getBundleCount(), 9);
assertEquals(data.getTopics(), 10);
assertEquals(data.getMaxResourceUsage(), 3.0);
assertEquals(data.getWeightedMaxEMA(), 1.875);
assertThat(data.getUpdatedAt(), greaterThanOrEqualTo(now));
Expand Down Expand Up @@ -137,7 +139,7 @@ public void testUpdateByBrokerLoadData() {
usage1.setDirectMemory(directMemory);
usage1.setBandwidthIn(bandwidthIn);
usage1.setBandwidthOut(bandwidthOut);
other.update(usage1, 1, 2, 3, 4, 5, conf);
other.update(usage1, 1, 2, 3, 4, 5, 6, conf);
data.update(other);

assertEquals(data, other);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;

public class BrokerFilterTestBase {

public LoadManagerContext getContext() {
LoadManagerContext mockContext = mock(LoadManagerContext.class);
ServiceConfiguration configuration = new ServiceConfiguration();
var brokerLoadDataStore = new LoadDataStore<BrokerLoadData>() {
Map<String, BrokerLoadData> map = new HashMap<>();
@Override
public void close() throws IOException {

}

@Override
public CompletableFuture<Void> pushAsync(String key, BrokerLoadData loadData) {
map.put(key, loadData);
return null;
}

@Override
public CompletableFuture<Void> removeAsync(String key) {
return null;
}

@Override
public Optional<BrokerLoadData> get(String key) {
var val = map.get(key);
if (val == null) {
return Optional.empty();
}
return Optional.of(val);
}

@Override
public void forEach(BiConsumer<String, BrokerLoadData> action) {

}

@Override
public Set<Map.Entry<String, BrokerLoadData>> entrySet() {
return map.entrySet();
}

@Override
public int size() {
return map.size();
}
};
configuration.setPreferLaterVersions(true);
doReturn(configuration).when(mockContext).brokerConfiguration();
doReturn(brokerLoadDataStore).when(mockContext).brokerLoadDataStore();
return mockContext;
}

public BrokerLookupData getLookupData() {
return getLookupData("3.0.0");
}

public BrokerLookupData getLookupData(String version) {
String webServiceUrl = "http://localhost:8080";
String webServiceUrlTls = "https://localhoss:8081";
String pulsarServiceUrl = "pulsar://localhost:6650";
String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
Map<String, String> protocols = new HashMap<>(){{
put("kafka", "9092");
}};
return new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true, true, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

Expand All @@ -30,14 +29,13 @@
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.testng.annotations.Test;

/**
* Unit test for {@link BrokerVersionFilter}.
*/
@Test(groups = "broker")
public class BrokerVersionFilterTest {
public class BrokerVersionFilterTest extends BrokerFilterTestBase {


@Test
Expand Down Expand Up @@ -115,26 +113,4 @@ public void testInvalidVersionString() throws BrokerFilterException {
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
brokerVersionFilter.filter(new HashMap<>(originalBrokers), getContext());
}

public LoadManagerContext getContext() {
LoadManagerContext mockContext = mock(LoadManagerContext.class);
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setPreferLaterVersions(true);
doReturn(configuration).when(mockContext).brokerConfiguration();
return mockContext;
}

public BrokerLookupData getLookupData(String version) {
String webServiceUrl = "http://localhost:8080";
String webServiceUrlTls = "https://localhoss:8081";
String pulsarServiceUrl = "pulsar://localhost:6650";
String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
Map<String, String> protocols = new HashMap<>(){{
put("kafka", "9092");
}};
return new BrokerLookupData(
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
pulsarServiceUrlTls, advertisedListeners, protocols, true, true, version);
}
}
Loading