Skip to content

Commit 9404c68

Browse files
committed
Support broker isolation policy
1 parent e0b50c9 commit 9404c68

File tree

11 files changed

+804
-38
lines changed

11 files changed

+804
-38
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
4242
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
4343
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
44+
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
4445
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
4546
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
4647
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
@@ -132,6 +133,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
132133
*/
133134
public ExtensibleLoadManagerImpl() {
134135
this.brokerFilterPipeline = new ArrayList<>();
136+
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter());
135137
this.brokerFilterPipeline.add(new BrokerVersionFilter());
136138
// TODO: Make brokerSelectionStrategy configurable.
137139
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
@@ -208,6 +210,7 @@ public void start() throws PulsarServerException {
208210
public void initialize(PulsarService pulsar) {
209211
this.pulsar = pulsar;
210212
this.conf = pulsar.getConfiguration();
213+
this.brokerFilterPipeline.forEach(brokerFilter -> brokerFilter.initialize(pulsar));
211214
}
212215

213216
@Override
@@ -279,11 +282,13 @@ private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle) {
279282
List<BrokerFilter> filterPipeline = getBrokerFilterPipeline();
280283
for (final BrokerFilter filter : filterPipeline) {
281284
try {
282-
filter.filter(availableBrokerCandidates, context);
285+
filter.filter(availableBrokerCandidates, bundle, context);
286+
// Preserve the filter successes result.
287+
availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
283288
} catch (BrokerFilterException e) {
284289
// TODO: We may need to revisit this error case.
285290
log.error("Failed to filter out brokers.", e);
286-
availableBrokerCandidates = availableBrokers;
291+
availableBrokerCandidates = new HashMap<>(availableBrokers);
287292
}
288293
}
289294
if (availableBrokerCandidates.isEmpty()) {
@@ -326,6 +331,7 @@ public void close() throws PulsarServerException {
326331
this.brokerLoadDataStore.close();
327332
this.topBundlesLoadDataStore.close();
328333
this.unloadScheduler.close();
334+
this.brokerFilterPipeline.clear();
329335
} catch (IOException ex) {
330336
throw new PulsarServerException(ex);
331337
} finally {

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.apache.pulsar.broker.loadbalance.extensions.filter;
2020

2121
import java.util.Map;
22+
import org.apache.pulsar.broker.PulsarService;
2223
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
2324
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
2425
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
26+
import org.apache.pulsar.common.naming.ServiceUnitId;
2527

2628
/**
2729
* Filter out unqualified Brokers, which are not entered into LoadBalancer for decision-making.
@@ -33,14 +35,22 @@ public interface BrokerFilter {
3335
*/
3436
String name();
3537

38+
/**
39+
* Initialize this broker filter using the given pulsar service.
40+
*/
41+
void initialize(PulsarService pulsar);
42+
3643
/**
3744
* Filter out unqualified brokers based on implementation.
3845
*
3946
* @param brokers The full broker and lookup data.
47+
* @param serviceUnit The current serviceUnit.
4048
* @param context The load manager context.
4149
* @return Filtered broker list.
4250
*/
43-
Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers, LoadManagerContext context)
51+
Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
52+
ServiceUnitId serviceUnit,
53+
LoadManagerContext context)
4454
throws BrokerFilterException;
4555

4656
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.filter;
20+
21+
import java.util.Map;
22+
import java.util.Set;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.pulsar.broker.PulsarService;
25+
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
26+
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
27+
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
28+
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
29+
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
30+
import org.apache.pulsar.common.naming.ServiceUnitId;
31+
32+
33+
@Slf4j
34+
public class BrokerIsolationPoliciesFilter implements BrokerFilter {
35+
36+
public static final String FILTER_NAME = "broker_isolation_policies_filter";
37+
38+
private IsolationPoliciesHelper isolationPoliciesHelper;
39+
40+
@Override
41+
public String name() {
42+
return FILTER_NAME;
43+
}
44+
45+
@Override
46+
public void initialize(PulsarService pulsar) {
47+
this.isolationPoliciesHelper = new IsolationPoliciesHelper(new SimpleResourceAllocationPolicies(pulsar));
48+
}
49+
50+
@Override
51+
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> availableBrokers,
52+
ServiceUnitId serviceUnit,
53+
LoadManagerContext context)
54+
throws BrokerFilterException {
55+
Set<String> brokerCandidateCache =
56+
isolationPoliciesHelper.applyIsolationPolicies(availableBrokers, serviceUnit);
57+
availableBrokers.keySet().retainAll(brokerCandidateCache);
58+
return availableBrokers;
59+
}
60+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import java.util.Iterator;
2323
import java.util.Map;
2424
import lombok.extern.slf4j.Slf4j;
25+
import org.apache.pulsar.broker.PulsarService;
2526
import org.apache.pulsar.broker.ServiceConfiguration;
2627
import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
2728
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
2829
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
2930
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
31+
import org.apache.pulsar.common.naming.ServiceUnitId;
3032

3133
/**
3234
* Filter by broker version.
@@ -45,7 +47,9 @@ public class BrokerVersionFilter implements BrokerFilter {
4547
*
4648
*/
4749
@Override
48-
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers, LoadManagerContext context)
50+
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
51+
ServiceUnitId serviceUnit,
52+
LoadManagerContext context)
4953
throws BrokerFilterException {
5054
ServiceConfiguration conf = context.brokerConfiguration();
5155
if (!conf.isPreferLaterVersions() || brokers.isEmpty()) {
@@ -144,4 +148,9 @@ public Version getLatestVersionNumber(Map<String, BrokerLookupData> brokerMap)
144148
public String name() {
145149
return FILTER_NAME;
146150
}
151+
152+
@Override
153+
public void initialize(PulsarService pulsar) {
154+
// No-op
155+
}
147156
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.policies;
20+
21+
import io.netty.util.concurrent.FastThreadLocal;
22+
import java.net.MalformedURLException;
23+
import java.net.URL;
24+
import java.util.HashSet;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import lombok.extern.slf4j.Slf4j;
28+
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
29+
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
30+
import org.apache.pulsar.common.naming.NamespaceBundle;
31+
import org.apache.pulsar.common.naming.NamespaceName;
32+
import org.apache.pulsar.common.naming.ServiceUnitId;
33+
34+
@Slf4j
35+
public class IsolationPoliciesHelper {
36+
37+
private final SimpleResourceAllocationPolicies policies;
38+
39+
public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
40+
this.policies = policies;
41+
}
42+
43+
// Cache for primary brokers according to policies.
44+
private static final FastThreadLocal<Set<String>> localPrimariesCache = new FastThreadLocal<>() {
45+
@Override
46+
protected Set<String> initialValue() {
47+
return new HashSet<>();
48+
}
49+
};
50+
51+
// Cache for shard brokers according to policies.
52+
private static final FastThreadLocal<Set<String>> localSecondaryCache = new FastThreadLocal<>() {
53+
@Override
54+
protected Set<String> initialValue() {
55+
return new HashSet<>();
56+
}
57+
};
58+
59+
private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
60+
@Override
61+
protected Set<String> initialValue() {
62+
return new HashSet<>();
63+
}
64+
};
65+
66+
public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,
67+
ServiceUnitId serviceUnit) {
68+
Set<String> primariesCache = localPrimariesCache.get();
69+
primariesCache.clear();
70+
71+
Set<String> secondaryCache = localSecondaryCache.get();
72+
secondaryCache.clear();
73+
74+
NamespaceName namespace = serviceUnit.getNamespaceObject();
75+
boolean isIsolationPoliciesPresent = policies.areIsolationPoliciesPresent(namespace);
76+
boolean isNonPersistentTopic = serviceUnit instanceof NamespaceBundle
77+
&& ((NamespaceBundle) serviceUnit).hasNonPersistentTopic();
78+
if (isIsolationPoliciesPresent) {
79+
log.debug("Isolation Policies Present for namespace - [{}]", namespace.toString());
80+
}
81+
82+
availableBrokers.forEach((broker, lookupData) -> {
83+
final String brokerUrlString = String.format("http://%s", broker);
84+
URL brokerUrl;
85+
try {
86+
brokerUrl = new URL(brokerUrlString);
87+
} catch (MalformedURLException e) {
88+
log.error("Unable to parse brokerUrl from ResourceUnitId", e);
89+
return;
90+
}
91+
// todo: in future check if the resource unit has resources to take the namespace
92+
if (isIsolationPoliciesPresent) {
93+
// note: serviceUnitID is namespace name and ResourceID is brokerName
94+
if (policies.isPrimaryBroker(namespace, brokerUrl.getHost())) {
95+
primariesCache.add(broker);
96+
if (log.isDebugEnabled()) {
97+
log.debug("Added Primary Broker - [{}] as possible Candidates for"
98+
+ " namespace - [{}] with policies", brokerUrl.getHost(), namespace.toString());
99+
}
100+
} else if (policies.isSecondaryBroker(namespace, brokerUrl.getHost())) {
101+
secondaryCache.add(broker);
102+
if (log.isDebugEnabled()) {
103+
log.debug(
104+
"Added Shared Broker - [{}] as possible "
105+
+ "Candidates for namespace - [{}] with policies",
106+
brokerUrl.getHost(), namespace);
107+
}
108+
} else {
109+
if (log.isDebugEnabled()) {
110+
log.debug("Skipping Broker - [{}] not primary broker and not shared" + " for namespace - [{}] ",
111+
brokerUrl.getHost(), namespace);
112+
}
113+
114+
}
115+
} else {
116+
// non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic
117+
if (isNonPersistentTopic && !lookupData.nonPersistentTopicsEnabled()) {
118+
if (log.isDebugEnabled()) {
119+
log.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]",
120+
brokerUrl.getHost(), namespace.toString());
121+
}
122+
} else if (!isNonPersistentTopic && !lookupData.persistentTopicsEnabled()) {
123+
// persistent topic can be assigned to only brokers that enabled for persistent-topic
124+
if (log.isDebugEnabled()) {
125+
log.debug("Filter broker- [{}] because broker only supports persistent namespace - [{}]",
126+
brokerUrl.getHost(), namespace.toString());
127+
}
128+
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
129+
secondaryCache.add(broker);
130+
if (log.isDebugEnabled()) {
131+
log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
132+
brokerUrl.getHost(), namespace.toString());
133+
}
134+
}
135+
}
136+
});
137+
138+
Set<String> brokerCandidateCache = localBrokerCandidateCache.get();
139+
brokerCandidateCache.clear();
140+
if (isIsolationPoliciesPresent) {
141+
brokerCandidateCache.addAll(primariesCache);
142+
if (policies.shouldFailoverToSecondaries(namespace, primariesCache.size())) {
143+
log.debug(
144+
"Not enough of primaries [{}] available for namespace - [{}], "
145+
+ "adding shared [{}] as possible candidate owners",
146+
primariesCache.size(), namespace, secondaryCache.size());
147+
brokerCandidateCache.addAll(secondaryCache);
148+
}
149+
} else {
150+
log.debug(
151+
"Policies not present for namespace - [{}] so only "
152+
+ "considering shared [{}] brokers for possible owner",
153+
namespace.toString(), secondaryCache.size());
154+
brokerCandidateCache.addAll(secondaryCache);
155+
}
156+
return brokerCandidateCache;
157+
}
158+
159+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.policies;

0 commit comments

Comments
 (0)