Skip to content

Commit 38539b3

Browse files
authored
[PIP 95][Issue 12040][broker] Decouple advertised listeners from bind addresses (#12079)
1 parent 1f8945a commit 38539b3

File tree

16 files changed

+60
-88
lines changed

16 files changed

+60
-88
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java

+1-16
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import java.util.Set;
3434

3535
/**
36-
* the validator for pulsar multiple listeners.
36+
* Validates multiple listener address configurations.
3737
*/
3838
public final class MultipleListenerValidator {
3939

@@ -110,21 +110,6 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
110110
throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` configure is invalid");
111111
}
112112
}
113-
if (!config.getBrokerServicePortTls().isPresent()) {
114-
if (pulsarSslAddress != null) {
115-
throw new IllegalArgumentException("If pulsar do not start ssl port, there is no need to configure " +
116-
" `pulsar+ssl` in `" + entry.getKey() + "` listener.");
117-
}
118-
} else {
119-
if (pulsarSslAddress == null) {
120-
throw new IllegalArgumentException("the `" + entry.getKey() + "` listener in the `advertisedListeners` "
121-
+ " do not specify `pulsar+ssl` address.");
122-
}
123-
}
124-
if (pulsarAddress == null) {
125-
throw new IllegalArgumentException("the `" + entry.getKey() + "` listener in the `advertisedListeners` "
126-
+ " do not specify `pulsar` address.");
127-
}
128113
result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress).brokerServiceUrlTls(pulsarSslAddress).build());
129114
}
130115
return result;

pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java

-17
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,6 @@ public void testDifferentListenerWithSameHostPort() {
9393
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
9494
}
9595

96-
@Test(expectedExceptions = IllegalArgumentException.class)
97-
public void testListenerWithoutTLSPort() {
98-
ServiceConfiguration config = new ServiceConfiguration();
99-
config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651");
100-
config.setInternalListenerName("internal");
101-
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
102-
}
103-
10496
@Test
10597
public void testListenerWithTLSPort() {
10698
ServiceConfiguration config = new ServiceConfiguration();
@@ -110,15 +102,6 @@ public void testListenerWithTLSPort() {
110102
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
111103
}
112104

113-
@Test(expectedExceptions = IllegalArgumentException.class)
114-
public void testListenerWithoutNonTLSAddress() {
115-
ServiceConfiguration config = new ServiceConfiguration();
116-
config.setBrokerServicePortTls(Optional.of(6651));
117-
config.setAdvertisedListeners(" internal:pulsar+ssl://127.0.0.1:6651");
118-
config.setInternalListenerName("internal");
119-
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
120-
}
121-
122105
@Test(expectedExceptions = IllegalArgumentException.class)
123106
public void testWithoutListenerNameInAdvertisedListeners() {
124107
ServiceConfiguration config = new ServiceConfiguration();

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+1
Original file line numberDiff line numberDiff line change
@@ -1453,6 +1453,7 @@ public String getSafeWebServiceAddress() {
14531453
return webServiceAddress != null ? webServiceAddress : webServiceAddressTls;
14541454
}
14551455

1456+
@Deprecated
14561457
public String getSafeBrokerServiceUrl() {
14571458
return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
14581459
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void start() throws PulsarServerException {
5757

5858
LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
5959
pulsar.getWebServiceAddressTls(),
60-
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
60+
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
6161
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
6262
String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
6363

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -855,14 +855,14 @@ public void start() throws PulsarServerException {
855855
Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();
856856

857857
lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
858-
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
858+
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
859859
lastData.setProtocols(protocolData);
860860
// configure broker-topic mode
861861
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
862862
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
863863

864864
localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
865-
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
865+
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
866866
localData.setProtocols(protocolData);
867867
localData.setBrokerVersionString(pulsar.getBrokerVersion());
868868
// configure broker-topic mode

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public void initialize(final PulsarService pulsar) {
227227
}
228228
this.policies = new SimpleResourceAllocationPolicies(pulsar);
229229
lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
230-
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
230+
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
231231
lastLoadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
232232
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
233233
lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
@@ -1045,7 +1045,7 @@ private LoadReport generateLoadReportForcefully() throws Exception {
10451045
synchronized (bundleGainsCache) {
10461046
try {
10471047
LoadReport loadReport = new LoadReport(pulsar.getSafeWebServiceAddress(),
1048-
pulsar.getWebServiceAddressTls(), pulsar.getSafeBrokerServiceUrl(),
1048+
pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(),
10491049
pulsar.getBrokerServiceUrlTls());
10501050
loadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
10511051
loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+13-11
Original file line numberDiff line numberDiff line change
@@ -319,19 +319,15 @@ public void registerBootstrapNamespaces() throws PulsarServerException {
319319
* @throws Exception
320320
*/
321321
public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) throws PulsarServerException {
322-
323-
String myUrl = pulsar.getSafeBrokerServiceUrl();
324-
325322
try {
326-
String otherUrl = null;
327323
NamespaceBundle nsFullBundle = null;
328324

329325
// all pre-registered namespace is assumed to have bundles disabled
330326
nsFullBundle = bundleFactory.getFullBundle(nsname);
331327
// v2 namespace will always use full bundle object
332-
otherUrl = ownershipCache.tryAcquiringOwnership(nsFullBundle).get().getNativeUrl();
333-
334-
if (myUrl.equals(otherUrl)) {
328+
NamespaceEphemeralData otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
329+
if (StringUtils.equals(pulsar.getBrokerServiceUrl(), otherData.getNativeUrl())
330+
|| StringUtils.equals(pulsar.getBrokerServiceUrlTls(), otherData.getNativeUrlTls())) {
335331
if (nsFullBundle != null) {
336332
// preload heartbeat namespace
337333
pulsar.loadNamespaceTopics(nsFullBundle);
@@ -340,7 +336,9 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
340336
}
341337

342338
String msg = String.format("namespace already owned by other broker : ns=%s expected=%s actual=%s",
343-
nsname, myUrl, otherUrl);
339+
nsname,
340+
StringUtils.defaultString(pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()),
341+
StringUtils.defaultString(otherData.getNativeUrl(), otherData.getNativeUrlTls()));
344342

345343
// ignore if not be owned for now
346344
if (!ensureOwned) {
@@ -415,9 +413,10 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
415413
new PulsarServerException("the broker do not have "
416414
+ options.getAdvertisedListenerName() + " listener"));
417415
} else {
416+
URI url = listener.getBrokerServiceUrl();
418417
URI urlTls = listener.getBrokerServiceUrlTls();
419418
future.complete(Optional.of(new LookupResult(nsData.get(),
420-
listener.getBrokerServiceUrl().toString(),
419+
url == null ? null : url.toString(),
421420
urlTls == null ? null : urlTls.toString())));
422421
}
423422
return;
@@ -535,9 +534,11 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
535534
+ options.getAdvertisedListenerName() + " listener"));
536535
return;
537536
} else {
537+
URI url = listener.getBrokerServiceUrl();
538538
URI urlTls = listener.getBrokerServiceUrlTls();
539539
lookupFuture.complete(Optional.of(
540-
new LookupResult(ownerInfo, listener.getBrokerServiceUrl().toString(),
540+
new LookupResult(ownerInfo,
541+
url == null ? null : url.toString(),
541542
urlTls == null ? null : urlTls.toString())));
542543
return;
543544
}
@@ -596,9 +597,10 @@ protected CompletableFuture<LookupResult> createLookupResult(String candidateBro
596597
new PulsarServerException(
597598
"the broker do not have " + advertisedListenerName + " listener"));
598599
} else {
600+
URI url = listener.getBrokerServiceUrl();
599601
URI urlTls = listener.getBrokerServiceUrlTls();
600602
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
601-
lookupData.getWebServiceUrlTls(), listener.getBrokerServiceUrl().toString(),
603+
lookupData.getWebServiceUrlTls(), url == null ? null : url.toString(),
602604
urlTls == null ? null : urlTls.toString(), authoritativeRedirect));
603605
}
604606
} else {

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
123123
NamespaceService namespaceService) {
124124
this.namespaceService = namespaceService;
125125
this.pulsar = pulsar;
126-
this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl();
126+
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
127127
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
128128
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
129129
pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
@@ -316,11 +316,9 @@ public void invalidateLocalOwnerCache() {
316316
}
317317

318318
public synchronized boolean refreshSelfOwnerInfo() {
319-
if (selfOwnerInfo.getNativeUrl() == null) {
320-
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getSafeBrokerServiceUrl(),
321-
pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
322-
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
323-
}
324-
return selfOwnerInfo.getNativeUrl() != null;
319+
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
320+
pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
321+
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
322+
return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
325323
}
326324
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public void testSplitAndOwnBundles() throws Exception {
154154
byte[] data = this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path(b)).join().get().getValue();
155155
NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
156156
NamespaceEphemeralData.class);
157-
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
157+
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
158158
} catch (Exception e) {
159159
fail("failed to setup ownership", e);
160160
}
@@ -441,7 +441,7 @@ public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception {
441441
byte[] data = this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path(b)).join().get().getValue();
442442
NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
443443
NamespaceEphemeralData.class);
444-
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
444+
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
445445
} catch (Exception e) {
446446
fail("failed to setup ownership", e);
447447
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void setup() throws Exception {
109109
doReturn(Optional.of(port)).when(config).getBrokerServicePort();
110110
doReturn(Optional.empty()).when(config).getWebServicePort();
111111
doReturn(brokerService).when(pulsar).getBrokerService();
112-
doReturn(selfBrokerUrl).when(pulsar).getSafeBrokerServiceUrl();
112+
doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl();
113113
}
114114

115115
@AfterMethod(alwaysRun = true)

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ public void shutdown() throws Exception {
7272
@Test
7373
public void testAdvertisedAddress() throws Exception {
7474
Assert.assertEquals( pulsar.getAdvertisedAddress(), advertisedAddress );
75-
Assert.assertEquals( pulsar.getSafeBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, pulsar.getBrokerListenPort().get()) );
75+
Assert.assertEquals( pulsar.getBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, pulsar.getBrokerListenPort().get()) );
7676
Assert.assertEquals( pulsar.getSafeWebServiceAddress(), String.format("http://%s:%d", advertisedAddress, pulsar.getListenPortHTTP().get()) );
7777
String brokerZkPath = String.format("/loadbalance/brokers/%s:%d", pulsar.getAdvertisedAddress(), pulsar.getListenPortHTTP().get());
7878
String bkBrokerData = new String(bkEnsemble.getZkClient().getData(brokerZkPath, false, new Stat()), StandardCharsets.UTF_8);
7979
JsonObject jsonBkBrokerData = new Gson().fromJson(bkBrokerData, JsonObject.class);
80-
Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl").getAsString(), pulsar.getSafeBrokerServiceUrl() );
80+
Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl").getAsString(), pulsar.getBrokerServiceUrl() );
8181
Assert.assertEquals( jsonBkBrokerData.get("webServiceUrl").getAsString(), pulsar.getSafeWebServiceAddress() );
8282
}
8383

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -160,19 +160,19 @@ protected void setup() throws Exception {
160160
admin1.clusters().createCluster("r1", ClusterData.builder()
161161
.serviceUrl(url1.toString())
162162
.serviceUrlTls(urlTls1.toString())
163-
.brokerServiceUrl(pulsar1.getSafeBrokerServiceUrl())
163+
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
164164
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
165165
.build());
166166
admin1.clusters().createCluster("r2", ClusterData.builder()
167167
.serviceUrl(url2.toString())
168168
.serviceUrlTls(urlTls2.toString())
169-
.brokerServiceUrl(pulsar2.getSafeBrokerServiceUrl())
169+
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
170170
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
171171
.build());
172172
admin1.clusters().createCluster("r3", ClusterData.builder()
173173
.serviceUrl(url3.toString())
174174
.serviceUrlTls(urlTls3.toString())
175-
.brokerServiceUrl(pulsar3.getSafeBrokerServiceUrl())
175+
.brokerServiceUrl(pulsar3.getBrokerServiceUrl())
176176
.brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls())
177177
.build());
178178

@@ -184,9 +184,9 @@ protected void setup() throws Exception {
184184
assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
185185
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
186186
assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
187-
assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getSafeBrokerServiceUrl());
188-
assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getSafeBrokerServiceUrl());
189-
assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getSafeBrokerServiceUrl());
187+
assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
188+
assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
189+
assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
190190

191191
// Also create V1 namespace for compatibility check
192192
admin1.clusters().createCluster("global", ClusterData.builder()

0 commit comments

Comments
 (0)