Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit aaec1c8

Browse files
committedMar 5, 2023
Reuse LoadManagerShared
1 parent 6ca9b9b commit aaec1c8

File tree

1 file changed

+14
-105
lines changed

1 file changed

+14
-105
lines changed
 

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java

+14-105
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,13 @@
1919
package org.apache.pulsar.broker.loadbalance.extensions.policies;
2020

2121
import io.netty.util.concurrent.FastThreadLocal;
22-
import java.net.MalformedURLException;
23-
import java.net.URL;
2422
import java.util.HashSet;
2523
import java.util.Map;
2624
import java.util.Set;
2725
import lombok.extern.slf4j.Slf4j;
2826
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
27+
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
2928
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
30-
import org.apache.pulsar.common.naming.NamespaceBundle;
31-
import org.apache.pulsar.common.naming.NamespaceName;
3229
import org.apache.pulsar.common.naming.ServiceUnitId;
3330

3431
@Slf4j
@@ -40,22 +37,6 @@ public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
4037
this.policies = policies;
4138
}
4239

43-
// Cache for primary brokers according to policies.
44-
private static final FastThreadLocal<Set<String>> localPrimariesCache = new FastThreadLocal<>() {
45-
@Override
46-
protected Set<String> initialValue() {
47-
return new HashSet<>();
48-
}
49-
};
50-
51-
// Cache for shard brokers according to policies.
52-
private static final FastThreadLocal<Set<String>> localSecondaryCache = new FastThreadLocal<>() {
53-
@Override
54-
protected Set<String> initialValue() {
55-
return new HashSet<>();
56-
}
57-
};
58-
5940
private static final FastThreadLocal<Set<String>> localBrokerCandidateCache = new FastThreadLocal<>() {
6041
@Override
6142
protected Set<String> initialValue() {
@@ -65,94 +46,22 @@ protected Set<String> initialValue() {
6546

6647
public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> availableBrokers,
6748
ServiceUnitId serviceUnit) {
68-
Set<String> primariesCache = localPrimariesCache.get();
69-
primariesCache.clear();
70-
71-
Set<String> secondaryCache = localSecondaryCache.get();
72-
secondaryCache.clear();
73-
74-
NamespaceName namespace = serviceUnit.getNamespaceObject();
75-
boolean isIsolationPoliciesPresent = policies.areIsolationPoliciesPresent(namespace);
76-
boolean isNonPersistentTopic = serviceUnit instanceof NamespaceBundle
77-
&& ((NamespaceBundle) serviceUnit).hasNonPersistentTopic();
78-
if (isIsolationPoliciesPresent) {
79-
log.debug("Isolation Policies Present for namespace - [{}]", namespace.toString());
80-
}
81-
82-
availableBrokers.forEach((broker, lookupData) -> {
83-
final String brokerUrlString = String.format("http://%s", broker);
84-
URL brokerUrl;
85-
try {
86-
brokerUrl = new URL(brokerUrlString);
87-
} catch (MalformedURLException e) {
88-
log.error("Unable to parse brokerUrl from ResourceUnitId", e);
89-
return;
90-
}
91-
// todo: in future check if the resource unit has resources to take the namespace
92-
if (isIsolationPoliciesPresent) {
93-
// note: serviceUnitID is namespace name and ResourceID is brokerName
94-
if (policies.isPrimaryBroker(namespace, brokerUrl.getHost())) {
95-
primariesCache.add(broker);
96-
if (log.isDebugEnabled()) {
97-
log.debug("Added Primary Broker - [{}] as possible Candidates for"
98-
+ " namespace - [{}] with policies", brokerUrl.getHost(), namespace.toString());
99-
}
100-
} else if (policies.isSecondaryBroker(namespace, brokerUrl.getHost())) {
101-
secondaryCache.add(broker);
102-
if (log.isDebugEnabled()) {
103-
log.debug(
104-
"Added Shared Broker - [{}] as possible "
105-
+ "Candidates for namespace - [{}] with policies",
106-
brokerUrl.getHost(), namespace);
107-
}
108-
} else {
109-
if (log.isDebugEnabled()) {
110-
log.debug("Skipping Broker - [{}] not primary broker and not shared" + " for namespace - [{}] ",
111-
brokerUrl.getHost(), namespace);
49+
Set<String> brokerCandidateCache = localBrokerCandidateCache.get();
50+
brokerCandidateCache.clear();
51+
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
52+
availableBrokers.keySet(), new LoadManagerShared.BrokerTopicLoadingPredicate() {
53+
@Override
54+
public boolean isEnablePersistentTopics(String brokerUrl) {
55+
BrokerLookupData lookupData = availableBrokers.get(brokerUrl.replace("http://", ""));
56+
return lookupData != null && lookupData.persistentTopicsEnabled();
11257
}
11358

114-
}
115-
} else {
116-
// non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic
117-
if (isNonPersistentTopic && !lookupData.nonPersistentTopicsEnabled()) {
118-
if (log.isDebugEnabled()) {
119-
log.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]",
120-
brokerUrl.getHost(), namespace.toString());
59+
@Override
60+
public boolean isEnableNonPersistentTopics(String brokerUrl) {
61+
BrokerLookupData lookupData = availableBrokers.get(brokerUrl.replace("http://", ""));
62+
return lookupData != null && lookupData.nonPersistentTopicsEnabled();
12163
}
122-
} else if (!isNonPersistentTopic && !lookupData.persistentTopicsEnabled()) {
123-
// persistent topic can be assigned to only brokers that enabled for persistent-topic
124-
if (log.isDebugEnabled()) {
125-
log.debug("Filter broker- [{}] because broker only supports persistent namespace - [{}]",
126-
brokerUrl.getHost(), namespace.toString());
127-
}
128-
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
129-
secondaryCache.add(broker);
130-
if (log.isDebugEnabled()) {
131-
log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
132-
brokerUrl.getHost(), namespace.toString());
133-
}
134-
}
135-
}
136-
});
137-
138-
Set<String> brokerCandidateCache = localBrokerCandidateCache.get();
139-
brokerCandidateCache.clear();
140-
if (isIsolationPoliciesPresent) {
141-
brokerCandidateCache.addAll(primariesCache);
142-
if (policies.shouldFailoverToSecondaries(namespace, primariesCache.size())) {
143-
log.debug(
144-
"Not enough of primaries [{}] available for namespace - [{}], "
145-
+ "adding shared [{}] as possible candidate owners",
146-
primariesCache.size(), namespace, secondaryCache.size());
147-
brokerCandidateCache.addAll(secondaryCache);
148-
}
149-
} else {
150-
log.debug(
151-
"Policies not present for namespace - [{}] so only "
152-
+ "considering shared [{}] brokers for possible owner",
153-
namespace.toString(), secondaryCache.size());
154-
brokerCandidateCache.addAll(secondaryCache);
155-
}
64+
});
15665
return brokerCandidateCache;
15766
}
15867

0 commit comments

Comments
 (0)
Please sign in to comment.