Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Support broker isolation policy #19592

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
Expand Down Expand Up @@ -134,6 +135,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
Expand Down Expand Up @@ -210,6 +212,7 @@ public void start() throws PulsarServerException {
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerFilterPipeline.forEach(brokerFilter -> brokerFilter.initialize(pulsar));
}

@Override
Expand Down Expand Up @@ -272,7 +275,6 @@ private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
// TODO: Support isolation policies
LoadManagerContext context = this.getContext();

Map<String, BrokerLookupData> availableBrokerCandidates = new HashMap<>(availableBrokers);
Expand All @@ -281,11 +283,13 @@ private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
for (final BrokerFilter filter : filterPipeline) {
try {
filter.filter(availableBrokerCandidates, context);
filter.filter(availableBrokerCandidates, bundle, context);
// Preserve the filter successes result.
availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
} catch (BrokerFilterException e) {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers.", e);
availableBrokerCandidates = availableBrokers;
availableBrokerCandidates = new HashMap<>(availableBrokers);
}
}
if (availableBrokerCandidates.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.naming.ServiceUnitId;

/**
* Filter out unqualified Brokers, which are not entered into LoadBalancer for decision-making.
Expand All @@ -33,14 +35,22 @@ public interface BrokerFilter {
*/
String name();

/**
* Initialize this broker filter using the given pulsar service.
*/
void initialize(PulsarService pulsar);

/**
* Filter out unqualified brokers based on implementation.
*
* @param brokers The full broker and lookup data.
* @param serviceUnit The current serviceUnit.
* @param context The load manager context.
* @return Filtered broker list.
*/
Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers, LoadManagerContext context)
Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
LoadManagerContext context)
throws BrokerFilterException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.ServiceUnitId;


@Slf4j
public class BrokerIsolationPoliciesFilter implements BrokerFilter {

public static final String FILTER_NAME = "broker_isolation_policies_filter";

private IsolationPoliciesHelper isolationPoliciesHelper;

@Override
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
this.isolationPoliciesHelper = new IsolationPoliciesHelper(new SimpleResourceAllocationPolicies(pulsar));
}

@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> availableBrokers,
ServiceUnitId serviceUnit,
LoadManagerContext context)
throws BrokerFilterException {
Set<String> brokerCandidateCache =
isolationPoliciesHelper.applyIsolationPolicies(availableBrokers, serviceUnit);
availableBrokers.keySet().retainAll(brokerCandidateCache);
return availableBrokers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.naming.ServiceUnitId;

public class BrokerMaxTopicCountFilter implements BrokerFilter {

Expand All @@ -34,8 +36,14 @@ public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}

@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
LoadManagerContext context) throws BrokerFilterException {
int loadBalancerBrokerMaxTopics = context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.naming.ServiceUnitId;

/**
* Filter by broker version.
Expand All @@ -45,7 +47,9 @@ public class BrokerVersionFilter implements BrokerFilter {
*
*/
@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers, LoadManagerContext context)
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
LoadManagerContext context)
throws BrokerFilterException {
ServiceConfiguration conf = context.brokerConfiguration();
if (!conf.isPreferLaterVersions() || brokers.isEmpty()) {
Expand Down Expand Up @@ -144,4 +148,9 @@ public Version getLatestVersionNumber(Map<String, BrokerLookupData> brokerMap)
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
// No-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.policies;

import io.netty.util.concurrent.FastThreadLocal;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.ServiceUnitId;

@Slf4j
public class IsolationPoliciesHelper {

private final SimpleResourceAllocationPolicies policies;

public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
this.policies = policies;
}

private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
@Override
protected Set<String> initialValue() {
return new HashSet<>();
}
};

public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,
Copy link
Contributor

@heesung-sn heesung-sn Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to consider the isolation policy here in this shedding strategy. However, it might be better if we just do not automatically unload/transfer bundles that configure any isolation policy or anti-affinity group.

Reasoning: These bundles configure limited sets of brokers to transfer/unload, which is not an ideal target to move around regarding load balance. (Hopefully, not all of the top k loaded bundles comply with isolation policy) It would be better to move other bundles that don't comply with any policies. "Fix bundles that have hard limits but move other free ones."

For this reason, we could probably filter out those bundles with isolation or anti-affinity group policy in the TopKBundles, just like we filter out the system namespace bundles. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is harmful to keep this logic here. I can add those bundle filtering logic(filtering out bundles with isolation and anti-affinity-group in topk bundles) in my next PR(anti-affinity group support).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's keep the logic here temporarily, since your proposal will change the original behavior. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. sounds good to me.

ServiceUnitId serviceUnit) {
Set<String> brokerCandidateCache = localBrokerCandidateCache.get();
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
availableBrokers.keySet(), new LoadManagerShared.BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
BrokerLookupData lookupData = availableBrokers.get(brokerUrl.replace("http://", ""));
return lookupData != null && lookupData.persistentTopicsEnabled();
}

@Override
public boolean isEnableNonPersistentTopics(String brokerUrl) {
BrokerLookupData lookupData = availableBrokers.get(brokerUrl.replace("http://", ""));
return lookupData != null && lookupData.nonPersistentTopicsEnabled();
}
});
return brokerCandidateCache;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.policies;
Loading