Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP 95][Issue 12040][broker] Decouple advertised listeners from bind addresses #12079

Merged
merged 5 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.Set;

/**
* the validator for pulsar multiple listeners.
* Validates multiple listener address configurations.
*/
public final class MultipleListenerValidator {

Expand Down Expand Up @@ -110,21 +110,6 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` configure is invalid");
}
}
if (!config.getBrokerServicePortTls().isPresent()) {
if (pulsarSslAddress != null) {
throw new IllegalArgumentException("If pulsar do not start ssl port, there is no need to configure " +
" `pulsar+ssl` in `" + entry.getKey() + "` listener.");
}
} else {
if (pulsarSslAddress == null) {
throw new IllegalArgumentException("the `" + entry.getKey() + "` listener in the `advertisedListeners` "
+ " do not specify `pulsar+ssl` address.");
}
}
if (pulsarAddress == null) {
throw new IllegalArgumentException("the `" + entry.getKey() + "` listener in the `advertisedListeners` "
+ " do not specify `pulsar` address.");
}
result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress).brokerServiceUrlTls(pulsarSslAddress).build());
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ public void testDifferentListenerWithSameHostPort() {
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testListenerWithoutTLSPort() {
ServiceConfiguration config = new ServiceConfiguration();
config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651");
config.setInternalListenerName("internal");
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
}

@Test
public void testListenerWithTLSPort() {
ServiceConfiguration config = new ServiceConfiguration();
Expand All @@ -110,15 +102,6 @@ public void testListenerWithTLSPort() {
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testListenerWithoutNonTLSAddress() {
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerServicePortTls(Optional.of(6651));
config.setAdvertisedListeners(" internal:pulsar+ssl://127.0.0.1:6651");
config.setInternalListenerName("internal");
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
}

@Test(expectedExceptions = IllegalArgumentException.class)
public void testWithoutListenerNameInAdvertisedListeners() {
ServiceConfiguration config = new ServiceConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,7 @@ public String getSafeWebServiceAddress() {
return webServiceAddress != null ? webServiceAddress : webServiceAddressTls;
}

@Deprecated
public String getSafeBrokerServiceUrl() {
return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void start() throws PulsarServerException {

LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,14 +855,14 @@ public void start() throws PulsarServerException {
Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();

lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
lastData.setProtocols(protocolData);
// configure broker-topic mode
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());

localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(protocolData);
localData.setBrokerVersionString(pulsar.getBrokerVersion());
// configure broker-topic mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void initialize(final PulsarService pulsar) {
}
this.policies = new SimpleResourceAllocationPolicies(pulsar);
lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
lastLoadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
Expand Down Expand Up @@ -1045,7 +1045,7 @@ private LoadReport generateLoadReportForcefully() throws Exception {
synchronized (bundleGainsCache) {
try {
LoadReport loadReport = new LoadReport(pulsar.getSafeWebServiceAddress(),
pulsar.getWebServiceAddressTls(), pulsar.getSafeBrokerServiceUrl(),
pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls());
loadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,19 +319,15 @@ public void registerBootstrapNamespaces() throws PulsarServerException {
* @throws Exception
*/
public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) throws PulsarServerException {

String myUrl = pulsar.getSafeBrokerServiceUrl();

try {
String otherUrl = null;
NamespaceBundle nsFullBundle = null;

// all pre-registered namespace is assumed to have bundles disabled
nsFullBundle = bundleFactory.getFullBundle(nsname);
// v2 namespace will always use full bundle object
otherUrl = ownershipCache.tryAcquiringOwnership(nsFullBundle).get().getNativeUrl();

if (myUrl.equals(otherUrl)) {
NamespaceEphemeralData otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
if (StringUtils.equals(pulsar.getBrokerServiceUrl(), otherData.getNativeUrl())
|| StringUtils.equals(pulsar.getBrokerServiceUrlTls(), otherData.getNativeUrlTls())) {
if (nsFullBundle != null) {
// preload heartbeat namespace
pulsar.loadNamespaceTopics(nsFullBundle);
Expand All @@ -340,7 +336,9 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
}

String msg = String.format("namespace already owned by other broker : ns=%s expected=%s actual=%s",
nsname, myUrl, otherUrl);
nsname,
StringUtils.defaultString(pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()),
StringUtils.defaultString(otherData.getNativeUrl(), otherData.getNativeUrlTls()));

// ignore if not be owned for now
if (!ensureOwned) {
Expand Down Expand Up @@ -415,9 +413,10 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
new PulsarServerException("the broker do not have "
+ options.getAdvertisedListenerName() + " listener"));
} else {
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
future.complete(Optional.of(new LookupResult(nsData.get(),
listener.getBrokerServiceUrl().toString(),
url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString())));
}
return;
Expand Down Expand Up @@ -535,9 +534,11 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
+ options.getAdvertisedListenerName() + " listener"));
return;
} else {
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
lookupFuture.complete(Optional.of(
new LookupResult(ownerInfo, listener.getBrokerServiceUrl().toString(),
new LookupResult(ownerInfo,
url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString())));
return;
}
Expand Down Expand Up @@ -596,9 +597,10 @@ protected CompletableFuture<LookupResult> createLookupResult(String candidateBro
new PulsarServerException(
"the broker do not have " + advertisedListenerName + " listener"));
} else {
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
lookupData.getWebServiceUrlTls(), listener.getBrokerServiceUrl().toString(),
lookupData.getWebServiceUrlTls(), url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString(), authoritativeRedirect));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
NamespaceService namespaceService) {
this.namespaceService = namespaceService;
this.pulsar = pulsar;
this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl();
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
Expand Down Expand Up @@ -316,11 +316,9 @@ public void invalidateLocalOwnerCache() {
}

public synchronized boolean refreshSelfOwnerInfo() {
if (selfOwnerInfo.getNativeUrl() == null) {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getSafeBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
}
return selfOwnerInfo.getNativeUrl() != null;
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testSplitAndOwnBundles() throws Exception {
byte[] data = this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path(b)).join().get().getValue();
NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
NamespaceEphemeralData.class);
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
} catch (Exception e) {
fail("failed to setup ownership", e);
}
Expand Down Expand Up @@ -437,7 +437,7 @@ public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception {
byte[] data = this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path(b)).join().get().getValue();
NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
NamespaceEphemeralData.class);
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getSafeBrokerServiceUrl());
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
} catch (Exception e) {
fail("failed to setup ownership", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void setup() throws Exception {
doReturn(Optional.of(port)).when(config).getBrokerServicePort();
doReturn(Optional.empty()).when(config).getWebServicePort();
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(selfBrokerUrl).when(pulsar).getSafeBrokerServiceUrl();
doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl();
}

@AfterMethod(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ public void shutdown() throws Exception {
@Test
public void testAdvertisedAddress() throws Exception {
Assert.assertEquals( pulsar.getAdvertisedAddress(), advertisedAddress );
Assert.assertEquals( pulsar.getSafeBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, pulsar.getBrokerListenPort().get()) );
Assert.assertEquals( pulsar.getBrokerServiceUrl(), String.format("pulsar://%s:%d", advertisedAddress, pulsar.getBrokerListenPort().get()) );
Assert.assertEquals( pulsar.getSafeWebServiceAddress(), String.format("http://%s:%d", advertisedAddress, pulsar.getListenPortHTTP().get()) );
String brokerZkPath = String.format("/loadbalance/brokers/%s:%d", pulsar.getAdvertisedAddress(), pulsar.getListenPortHTTP().get());
String bkBrokerData = new String(bkEnsemble.getZkClient().getData(brokerZkPath, false, new Stat()), StandardCharsets.UTF_8);
JsonObject jsonBkBrokerData = new Gson().fromJson(bkBrokerData, JsonObject.class);
Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl").getAsString(), pulsar.getSafeBrokerServiceUrl() );
Assert.assertEquals( jsonBkBrokerData.get("pulsarServiceUrl").getAsString(), pulsar.getBrokerServiceUrl() );
Assert.assertEquals( jsonBkBrokerData.get("webServiceUrl").getAsString(), pulsar.getSafeWebServiceAddress() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,19 @@ protected void setup() throws Exception {
admin1.clusters().createCluster("r1", ClusterData.builder()
.serviceUrl(url1.toString())
.serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getSafeBrokerServiceUrl())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
.build());
admin1.clusters().createCluster("r2", ClusterData.builder()
.serviceUrl(url2.toString())
.serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getSafeBrokerServiceUrl())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
.build());
admin1.clusters().createCluster("r3", ClusterData.builder()
.serviceUrl(url3.toString())
.serviceUrlTls(urlTls3.toString())
.brokerServiceUrl(pulsar3.getSafeBrokerServiceUrl())
.brokerServiceUrl(pulsar3.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls())
.build());

Expand All @@ -184,9 +184,9 @@ protected void setup() throws Exception {
assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getSafeBrokerServiceUrl());
assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getSafeBrokerServiceUrl());
assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getSafeBrokerServiceUrl());
assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());

// Also create V1 namespace for compatibility check
admin1.clusters().createCluster("global", ClusterData.builder()
Expand Down
Loading