Skip to content

Commit 5d34dad

Browse files
Demogorgon314poorbarcode
authored andcommitted
[fix][broker] Fix broker load manager class filter NPE (#20350)
PIP: #16691 ### Motivation When upgrading the pulsar version and changing the pulsar load manager to `ExtensibleLoadManagerImpl` it might cause NPE. The root cause is the old version of pulsar does not contain the `loadManagerClassName` field. ``` 2023-05-18T05:42:50,557+0000 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:51345] connected with role=[pulsarinstance-v3-0-n@test.dev](mailto:pulsarinstance-v3-0-n@test.dev) using authMethod=token, clientVersion=Pulsar Go 0.9.0, clientProtocolVersion=18, proxyVersion=null 2023-05-18T05:42:50,558+0000 [pulsar-io-4-1] WARN org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup [pulsarinstance-v3-0-n@test.dev](mailto:pulsarinstance-v3-0-n@test.dev) for topic persistent://xxx with error java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?] at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1194) ~[?:?] at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:385) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$6(ExtensibleLoadManagerImpl.java:336) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?] at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$10(ExtensibleLoadManagerImpl.java:333) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:409) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:243) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:327) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper.findBrokerServiceUrl(ExtensibleLoadManagerWrapper.java:66) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.namespace.NamespaceService.lambda$getBrokerServiceUrlAsync$0(NamespaceService.java:191) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] ``` ### Modifications * Add null check when using`getLoadManagerClassName`. * Add test to cover this case. * Add `RedirectManager` unit test. (cherry picked from commit b7f0004)
1 parent 4cee3c3 commit 5d34dad

File tree

6 files changed

+139
-8
lines changed

6 files changed

+139
-8
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.loadbalance.extensions.filter;
2020

2121
import java.util.Map;
22+
import java.util.Objects;
2223
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
2324
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
2425
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
@@ -43,7 +44,9 @@ public Map<String, BrokerLookupData> filter(
4344
}
4445
brokers.entrySet().removeIf(entry -> {
4546
BrokerLookupData v = entry.getValue();
46-
return !v.getLoadManagerClassName().equals(context.brokerConfiguration().getLoadManagerClassName());
47+
// The load manager class name can be null if the cluster has old version of broker.
48+
return !Objects.equals(v.getLoadManagerClassName(),
49+
context.brokerConfiguration().getLoadManagerClassName());
4750
});
4851
return brokers;
4952
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManager.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.apache.pulsar.broker.loadbalance.extensions.manager;
2020

2121
import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
22+
import com.google.common.annotations.VisibleForTesting;
2223
import java.util.ArrayList;
2324
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Objects;
2527
import java.util.Optional;
2628
import java.util.concurrent.CompletableFuture;
2729
import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +50,12 @@ public RedirectManager(PulsarService pulsar) {
4850
this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
4951
}
5052

53+
@VisibleForTesting
54+
public RedirectManager(PulsarService pulsar, LockManager<BrokerLookupData> brokerLookupDataLockManager) {
55+
this.pulsar = pulsar;
56+
this.brokerLookupDataLockManager = brokerLookupDataLockManager;
57+
}
58+
5159
public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
5260
return brokerLookupDataLockManager.listLocks(LOADBALANCE_BROKERS_ROOT).thenCompose(availableBrokers -> {
5361
Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
@@ -69,7 +77,7 @@ public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookup
6977

7078
public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
7179
String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
72-
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log);
80+
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
7381
return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
7482
if (lookupDataMap.isEmpty()) {
7583
String errorMsg = "No available broker found.";
@@ -89,17 +97,18 @@ public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync()
8997
log.warn(errorMsg);
9098
throw new IllegalStateException(errorMsg);
9199
}
92-
if (latestServiceLookupData.get().getLoadManagerClassName().equals(currentLMClassName)) {
100+
101+
if (Objects.equals(latestServiceLookupData.get().getLoadManagerClassName(), currentLMClassName)) {
93102
if (debug) {
94-
log.info("We don't need to redirect, current load manager class name: {}",
103+
log.info("No need to redirect, current load manager class name: {}",
95104
currentLMClassName);
96105
}
97106
return Optional.empty();
98107
}
99108
var serviceLookupDataObj = latestServiceLookupData.get();
100109
var candidateBrokers = new ArrayList<ServiceLookupData>();
101110
lookupDataMap.forEach((key, value) -> {
102-
if (value.getLoadManagerClassName().equals(serviceLookupDataObj.getLoadManagerClassName())) {
111+
if (Objects.equals(value.getLoadManagerClassName(), serviceLookupDataObj.getLoadManagerClassName())) {
103112
candidateBrokers.add(value);
104113
}
105114
});

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilter.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.loadbalance.impl;
2020

21+
import java.util.Objects;
2122
import java.util.Set;
2223
import org.apache.pulsar.broker.ServiceConfiguration;
2324
import org.apache.pulsar.broker.loadbalance.BrokerFilter;
@@ -32,8 +33,8 @@ public void filter(Set<String> brokers, BundleData bundleToAssign,
3233
LoadData loadData,
3334
ServiceConfiguration conf) throws BrokerFilterException {
3435
loadData.getBrokerData().forEach((key, value) -> {
35-
if (!value.getLocalData().getLoadManagerClassName()
36-
.equals(conf.getLoadManagerClassName())) {
36+
// The load manager class name can be null if the cluster has old version of broker.
37+
if (!Objects.equals(value.getLocalData().getLoadManagerClassName(), conf.getLoadManagerClassName())) {
3738
brokers.remove(key);
3839
}
3940
});

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ public void test() throws BrokerFilterException {
4444
"broker1", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName()),
4545
"broker2", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName()),
4646
"broker3", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()),
47-
"broker4", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName())
47+
"broker4", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()),
48+
"broker5", getLookupData("3.0.0", null)
4849
);
4950

5051
Map<String, BrokerLookupData> result = filter.filter(new HashMap<>(originalBrokers), null, context);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.manager;
20+
21+
import static org.mockito.Mockito.doReturn;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.spy;
24+
import static org.mockito.Mockito.when;
25+
import static org.testng.Assert.assertFalse;
26+
import static org.testng.Assert.assertTrue;
27+
28+
import org.apache.pulsar.broker.PulsarService;
29+
import org.apache.pulsar.broker.ServiceConfiguration;
30+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
31+
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
32+
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
33+
import org.apache.pulsar.broker.lookup.LookupResult;
34+
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
35+
import org.testng.annotations.Test;
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
import java.util.Optional;
39+
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.ExecutionException;
41+
42+
43+
/**
44+
* Unit test {@link RedirectManager}.
45+
*/
46+
public class RedirectManagerTest {
47+
48+
@Test
49+
public void testFindRedirectLookupResultAsync() throws ExecutionException, InterruptedException {
50+
PulsarService pulsar = mock(PulsarService.class);
51+
ServiceConfiguration configuration = new ServiceConfiguration();
52+
when(pulsar.getConfiguration()).thenReturn(configuration);
53+
RedirectManager redirectManager = spy(new RedirectManager(pulsar, null));
54+
55+
configuration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
56+
configuration.setLoadBalancerDebugModeEnabled(true);
57+
58+
// Test 1: No load manager class name found.
59+
doReturn(CompletableFuture.completedFuture(
60+
new HashMap<>(){{
61+
put("broker-1", getLookupData("broker-1", null, 10));
62+
put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 1));
63+
}}
64+
)).when(redirectManager).getAvailableBrokerLookupDataAsync();
65+
66+
// Should redirect to broker-1, since broker-1 has the latest load manager, even though the class name is null.
67+
Optional<LookupResult> lookupResult = redirectManager.findRedirectLookupResultAsync().get();
68+
assertTrue(lookupResult.isPresent());
69+
assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1"));
70+
71+
// Test 2: Should redirect to broker-1, since the latest broker are using ExtensibleLoadManagerImpl
72+
doReturn(CompletableFuture.completedFuture(
73+
new HashMap<>(){{
74+
put("broker-1", getLookupData("broker-1", ExtensibleLoadManagerImpl.class.getName(), 10));
75+
put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 1));
76+
}}
77+
)).when(redirectManager).getAvailableBrokerLookupDataAsync();
78+
79+
lookupResult = redirectManager.findRedirectLookupResultAsync().get();
80+
assertTrue(lookupResult.isPresent());
81+
assertTrue(lookupResult.get().getLookupData().getBrokerUrl().contains("broker-1"));
82+
83+
84+
// Test 3: Should not redirect, since current broker are using ModularLoadManagerImpl
85+
doReturn(CompletableFuture.completedFuture(
86+
new HashMap<>(){{
87+
put("broker-1", getLookupData("broker-1", ExtensibleLoadManagerImpl.class.getName(), 10));
88+
put("broker-2", getLookupData("broker-2", ModularLoadManagerImpl.class.getName(), 100));
89+
}}
90+
)).when(redirectManager).getAvailableBrokerLookupDataAsync();
91+
92+
lookupResult = redirectManager.findRedirectLookupResultAsync().get();
93+
assertFalse(lookupResult.isPresent());
94+
}
95+
96+
97+
public BrokerLookupData getLookupData(String broker, String loadManagerClassName, long startTimeStamp) {
98+
String webServiceUrl = "http://" + broker + ":8080";
99+
String webServiceUrlTls = "https://" + broker + ":8081";
100+
String pulsarServiceUrl = "pulsar://" + broker + ":6650";
101+
String pulsarServiceUrlTls = "pulsar+ssl://" + broker + ":6651";
102+
Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
103+
Map<String, String> protocols = new HashMap<>(){{
104+
put("kafka", "9092");
105+
}};
106+
return new BrokerLookupData(
107+
webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
108+
pulsarServiceUrlTls, advertisedListeners, protocols, true, true,
109+
loadManagerClassName, startTimeStamp, "3.0.0");
110+
}
111+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BrokerLoadManagerClassFilterTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,20 @@ public void test() throws BrokerFilterException {
4646

4747
LocalBrokerData localBrokerData1 = new LocalBrokerData();
4848
localBrokerData1.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
49+
50+
LocalBrokerData localBrokerData2 = new LocalBrokerData();
51+
localBrokerData2.setLoadManagerClassName(null);
4952
loadData.getBrokerData().put("broker1", new BrokerData(localBrokerData));
5053
loadData.getBrokerData().put("broker2", new BrokerData(localBrokerData1));
54+
loadData.getBrokerData().put("broker3", new BrokerData(localBrokerData2));
5155

5256
ServiceConfiguration conf = new ServiceConfiguration();
5357
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
5458

5559
Set<String> brokers = new HashSet<>(){{
5660
add("broker1");
5761
add("broker2");
62+
add("broker3");
5863
}};
5964
filter.filter(brokers, null, loadData, conf);
6065

@@ -64,6 +69,7 @@ public void test() throws BrokerFilterException {
6469
brokers = new HashSet<>(){{
6570
add("broker1");
6671
add("broker2");
72+
add("broker3");
6773
}};
6874
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
6975
filter.filter(brokers, null, loadData, conf);

0 commit comments

Comments
 (0)