Skip to content

Commit 3a337b8

Browse files
authoredOct 14, 2021
[PIP 95][Issue 12040][broker] Multiple bind addresses for Pulsar protocol (#12056)
Master Issue: #12040 ### Motivation Add a new configuration setting `bindAddresses` to open additional server ports on the broker. Note that these are in addition to `brokerServicePort` / `brokerServicePortTls` for compatibility reasons. Each new-style bind address is associated with an advertised listener, to be used as the default listener for topic lookup requests. See #12040 for details. A given listener may be associated with numerous bindings. The scheme indicates the protocol handler and whether to use TLS on the server channel. This PR is focused on the Pulsar protocol handler, but it is anticipated that other protocols may be supported in future. For example: ``` bindAddresses=external:pulsar://0.0.0.0:6652,external:pulsar+ssl://0.0.0.0:6653 bindAddress=0.0.0.0 brokerServicePort=6650 brokerServicePortTls=6651 advertisedListeners=cluster:pulsar://broker-1.local:6650,cluster:pulsar+ssl://broker-1.local:6651,external:pulsar://broker-1.example.dev:6652,external:pulsar+ssl://broker-1.example.dev:6653 internalListenerName=cluster ``` The above would produce three server sockets, with `6650` having no associated listener name (thus retaining existing lookup behavior of returning the internal listener), and with `6652` and `6653` having an association with listener name `external`. Given a lookup request on `6652` or `6653`, the `external` listener address would be returned. ### Modifications - added configuration property `bindAddresses` - implementing parsing and validation logic - factored some utility code for formatting broker/web addresses
1 parent 5c28686 commit 3a337b8

File tree

14 files changed

+416
-58
lines changed

14 files changed

+416
-58
lines changed
 

‎conf/broker.conf

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ webServicePortTls=
4040
# Hostname or IP address the service binds on, default is 0.0.0.0.
4141
bindAddress=0.0.0.0
4242

43+
# Extra bind addresses for the service: <listener_name>:<scheme>://<host>:<port>,[...]
44+
bindAddresses=
45+
4346
# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
4447
advertisedAddress=
4548

‎conf/standalone.conf

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ webServicePort=8080
3333
# Hostname or IP address the service binds on, default is 0.0.0.0.
3434
bindAddress=0.0.0.0
3535

36+
# Extra bind addresses for the service: <listener_name>:<scheme>://<host>:<port>,[...]
37+
bindAddresses=
38+
3639
# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
3740
advertisedAddress=
3841

‎pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
121121
private String configurationStoreServers;
122122
@FieldContext(
123123
category = CATEGORY_SERVER,
124-
doc = "The port for serving binary protobuf requests"
124+
doc = "The port for serving binary protobuf requests."
125+
+ " If set, defines a server binding for bindAddress:brokerServicePort."
126+
+ " The Default value is 6650."
125127
)
126128

127129
private Optional<Integer> brokerServicePort = Optional.of(6650);
128130
@FieldContext(
129131
category = CATEGORY_SERVER,
130-
doc = "The port for serving tls secured binary protobuf requests"
132+
doc = "The port for serving TLS-secured binary protobuf requests."
133+
+ " If set, defines a server binding for bindAddress:brokerServicePortTls."
131134
)
132135
private Optional<Integer> brokerServicePortTls = Optional.empty();
133136
@FieldContext(
@@ -168,6 +171,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
168171
+ "The Default value is absent, the broker uses the first listener as the internal listener.")
169172
private String internalListenerName;
170173

174+
@FieldContext(category=CATEGORY_SERVER,
175+
doc = "Used to specify additional bind addresses for the broker."
176+
+ " The value must format as <listener_name>:<scheme>://<host>:<port>,"
177+
+ " multiple bind addresses should be separated with commas."
178+
+ " Associates each bind address with an advertised listener and protocol handler."
179+
+ " Note that the brokerServicePort, brokerServicePortTls, webServicePort, and"
180+
+ " webServicePortTls properties define additional bindings.")
181+
private String bindAddresses;
182+
171183
@FieldContext(category=CATEGORY_SERVER,
172184
doc = "Enable or disable the proxy protocol.")
173185
private boolean haProxyProtocolEnabled;

‎pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java

+15
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,19 @@ public static String getAppliedAdvertisedAddress(ServiceConfiguration configurat
8080
return getDefaultOrConfiguredAddress(advertisedAddress);
8181
}
8282

83+
public static String brokerUrl(String host, int port) {
84+
return String.format("pulsar://%s:%d", host, port);
85+
}
86+
87+
public static String brokerUrlTls(String host, int port) {
88+
return String.format("pulsar+ssl://%s:%d", host, port);
89+
}
90+
91+
public static String webServiceUrl(String host, int port) {
92+
return String.format("http://%s:%d", host, port);
93+
}
94+
95+
public static String webServiceUrlTls(String host, int port) {
96+
return String.format("https://%s:%d", host, port);
97+
}
8398
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.validator;
20+
21+
import java.net.URI;
22+
import java.util.Arrays;
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.regex.Matcher;
26+
import java.util.regex.Pattern;
27+
import java.util.ArrayList;
28+
import org.apache.commons.lang3.StringUtils;
29+
import org.apache.pulsar.broker.ServiceConfiguration;
30+
import org.apache.pulsar.broker.ServiceConfigurationUtils;
31+
import org.apache.pulsar.common.configuration.BindAddress;
32+
33+
/**
34+
* Validates bind address configurations.
35+
*/
36+
public class BindAddressValidator {
37+
38+
private static final Pattern BIND_ADDRESSES_PATTERN = Pattern.compile("(?<name>\\w+):(?<url>.+)$");
39+
40+
/**
41+
* Validate the configuration of `bindAddresses`.
42+
* @param config the pulsar broker configure.
43+
* @param schemes a filter on the schemes of the bind addresses, or null to not apply a filter.
44+
* @return a list of bind addresses.
45+
*/
46+
public static List<BindAddress> validateBindAddresses(ServiceConfiguration config, Collection<String> schemes) {
47+
// migrate the existing configuration properties
48+
List<BindAddress> addresses = migrateBindAddresses(config);
49+
50+
// parse the list of additional bind addresses
51+
Arrays
52+
.stream(StringUtils.split(StringUtils.defaultString(config.getBindAddresses()), ","))
53+
.map(s -> {
54+
Matcher m = BIND_ADDRESSES_PATTERN.matcher(s);
55+
if (!m.matches()) {
56+
throw new IllegalArgumentException("bindAddresses: malformed: " + s);
57+
}
58+
return m;
59+
})
60+
.map(m -> new BindAddress(m.group("name"), URI.create(m.group("url"))))
61+
.forEach(addresses::add);
62+
63+
// apply the filter
64+
if (schemes != null) {
65+
addresses.removeIf(a -> !schemes.contains(a.getAddress().getScheme()));
66+
}
67+
68+
return addresses;
69+
}
70+
71+
/**
72+
* Generates bind addresses based on legacy configuration properties.
73+
*/
74+
private static List<BindAddress> migrateBindAddresses(ServiceConfiguration config) {
75+
List<BindAddress> addresses = new ArrayList<>(2);
76+
if (config.getBrokerServicePort().isPresent()) {
77+
addresses.add(new BindAddress(null, URI.create(
78+
ServiceConfigurationUtils.brokerUrl(config.getBindAddress(), config.getBrokerServicePort().get()))));
79+
}
80+
if (config.getBrokerServicePortTls().isPresent()) {
81+
addresses.add(new BindAddress(null, URI.create(
82+
ServiceConfigurationUtils.brokerUrlTls(config.getBindAddress(), config.getBrokerServicePortTls().get()))));
83+
}
84+
if (config.getWebServicePort().isPresent()) {
85+
addresses.add(new BindAddress(null, URI.create(
86+
ServiceConfigurationUtils.webServiceUrl(config.getBindAddress(), config.getWebServicePort().get()))));
87+
}
88+
if (config.getWebServicePortTls().isPresent()) {
89+
addresses.add(new BindAddress(null, URI.create(
90+
ServiceConfigurationUtils.webServiceUrlTls(config.getBindAddress(), config.getWebServicePortTls().get()))));
91+
}
92+
return addresses;
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.configuration;
20+
21+
import java.net.URI;
22+
import lombok.AllArgsConstructor;
23+
import lombok.Builder;
24+
import lombok.EqualsAndHashCode;
25+
import lombok.Getter;
26+
import lombok.NoArgsConstructor;
27+
import lombok.NonNull;
28+
import lombok.Setter;
29+
import lombok.ToString;
30+
31+
/**
32+
* A bind address for the broker as a non-TLS and TLS pair.
33+
*/
34+
@Builder
35+
@NoArgsConstructor
36+
@AllArgsConstructor
37+
@EqualsAndHashCode
38+
@ToString
39+
public class BindAddress {
40+
41+
/**
42+
* The listener name associated with the bind address, or null if no listener is associated.
43+
*/
44+
@Getter
45+
@Setter
46+
private String listenerName;
47+
48+
/**
49+
* The bind address.
50+
*/
51+
@Getter
52+
@Setter
53+
@NonNull
54+
private URI address;
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.validator;
20+
21+
import java.net.URI;
22+
import java.util.Arrays;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Optional;
26+
import org.apache.pulsar.broker.ServiceConfiguration;
27+
import org.apache.pulsar.common.configuration.BindAddress;
28+
import org.testng.annotations.Test;
29+
30+
import static org.testng.AssertJUnit.assertEquals;
31+
32+
/**
33+
* testcase for BindAddressValidator.
34+
*/
35+
public class BindAddressValidatorTest {
36+
37+
/**
38+
* Provides a configuration with no bind addresses specified.
39+
*/
40+
private ServiceConfiguration newEmptyConfiguration() {
41+
ServiceConfiguration config = new ServiceConfiguration();
42+
config.setBrokerServicePort(Optional.empty()); // default: 6650
43+
config.setBrokerServicePortTls(Optional.empty());
44+
config.setWebServicePort(Optional.empty()); // default: 8080
45+
config.setWebServicePortTls(Optional.empty());
46+
return config;
47+
}
48+
49+
@Test(expectedExceptions = IllegalArgumentException.class)
50+
public void testMalformed() {
51+
ServiceConfiguration config = newEmptyConfiguration();
52+
config.setBindAddresses("internal:");
53+
List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null);
54+
assertEquals(0, addresses.size());
55+
}
56+
57+
@Test
58+
public void testOneListenerMultipleAddresses() {
59+
ServiceConfiguration config = newEmptyConfiguration();
60+
config.setBindAddresses("internal:pulsar://0.0.0.0:6650,internal:pulsar+ssl://0.0.0.0:6651");
61+
List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null);
62+
assertEquals(Arrays.asList(
63+
new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")),
64+
new BindAddress("internal", URI.create("pulsar+ssl://0.0.0.0:6651"))), addresses);
65+
}
66+
67+
@Test
68+
public void testMultiListener() {
69+
ServiceConfiguration config = newEmptyConfiguration();
70+
config.setBindAddresses("internal:pulsar://0.0.0.0:6650,external:pulsar+ssl://0.0.0.0:6651");
71+
List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null);
72+
assertEquals(Arrays.asList(
73+
new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")),
74+
new BindAddress("external", URI.create("pulsar+ssl://0.0.0.0:6651"))), addresses);
75+
}
76+
77+
@Test
78+
public void testMigrationWithAllOptions() {
79+
ServiceConfiguration config = newEmptyConfiguration();
80+
config.setBrokerServicePort(Optional.of(6650));
81+
config.setBrokerServicePortTls(Optional.of(6651));
82+
config.setWebServicePort(Optional.of(8080));
83+
config.setWebServicePortTls(Optional.of(443));
84+
config.setBindAddress("0.0.0.0");
85+
List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null);
86+
assertEquals(Arrays.asList(
87+
new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
88+
new BindAddress(null, URI.create("pulsar+ssl://0.0.0.0:6651")),
89+
new BindAddress(null, URI.create("http://0.0.0.0:8080")),
90+
new BindAddress(null, URI.create("https://0.0.0.0:443"))), addresses);
91+
}
92+
93+
@Test
94+
public void testMigrationWithDefaults() {
95+
ServiceConfiguration config = new ServiceConfiguration();
96+
List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null);
97+
assertEquals(Arrays.asList(
98+
new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
99+
new BindAddress(null, URI.create("http://0.0.0.0:8080"))), addresses);
100+
}
101+
102+
@Test
103+
public void testMigrationWithExtra() {
104+
ServiceConfiguration config = newEmptyConfiguration();
105+
config.setBrokerServicePort(Optional.of(6650));
106+
config.setBindAddresses("extra:pulsar://0.0.0.0:6652");
107+
List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null);
108+
assertEquals(Arrays.asList(
109+
new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
110+
new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652"))), addresses);
111+
}
112+
113+
@Test
114+
public void testSchemeFilter() {
115+
ServiceConfiguration config = newEmptyConfiguration();
116+
config.setBrokerServicePort(Optional.of(6650));
117+
config.setBrokerServicePortTls(Optional.of(6651));
118+
config.setBindAddresses("extra:pulsar://0.0.0.0:6652,extra:http://0.0.0.0:8080");
119+
120+
List<BindAddress> addresses;
121+
addresses = BindAddressValidator.validateBindAddresses(config, null);
122+
assertEquals(Arrays.asList(
123+
new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
124+
new BindAddress(null, URI.create("pulsar+ssl://0.0.0.0:6651")),
125+
new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652")),
126+
new BindAddress("extra", URI.create("http://0.0.0.0:8080"))), addresses);
127+
128+
addresses = BindAddressValidator.validateBindAddresses(config, Arrays.asList("pulsar", "pulsar+ssl"));
129+
assertEquals(Arrays.asList(
130+
new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
131+
new BindAddress(null, URI.create("pulsar+ssl://0.0.0.0:6651")),
132+
new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652"))), addresses);
133+
134+
addresses = BindAddressValidator.validateBindAddresses(config, Collections.singletonList("http"));
135+
assertEquals(Collections.singletonList(
136+
new BindAddress("extra", URI.create("http://0.0.0.0:8080"))), addresses);
137+
}
138+
}

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1407,7 +1407,7 @@ protected String brokerUrl(ServiceConfiguration config) {
14071407
}
14081408

14091409
public static String brokerUrl(String host, int port) {
1410-
return String.format("pulsar://%s:%d", host, port);
1410+
return ServiceConfigurationUtils.brokerUrl(host, port);
14111411
}
14121412

14131413
public String brokerUrlTls(ServiceConfiguration config) {
@@ -1420,7 +1420,7 @@ public String brokerUrlTls(ServiceConfiguration config) {
14201420
}
14211421

14221422
public static String brokerUrlTls(String host, int port) {
1423-
return String.format("pulsar+ssl://%s:%d", host, port);
1423+
return ServiceConfigurationUtils.brokerUrlTls(host, port);
14241424
}
14251425

14261426
public String webAddress(ServiceConfiguration config) {

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+46-37
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.net.SocketAddress;
4646
import java.time.Duration;
4747
import java.util.ArrayList;
48+
import java.util.Arrays;
4849
import java.util.Collections;
4950
import java.util.HashMap;
5051
import java.util.HashSet;
@@ -116,6 +117,7 @@
116117
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
117118
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
118119
import org.apache.pulsar.broker.systopic.SystemTopicClient;
120+
import org.apache.pulsar.broker.validator.BindAddressValidator;
119121
import org.apache.pulsar.client.admin.PulsarAdmin;
120122
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
121123
import org.apache.pulsar.client.api.ClientBuilder;
@@ -124,6 +126,7 @@
124126
import org.apache.pulsar.client.impl.PulsarClientImpl;
125127
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
126128
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
129+
import org.apache.pulsar.common.configuration.BindAddress;
127130
import org.apache.pulsar.common.configuration.FieldContext;
128131
import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
129132
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
@@ -254,6 +257,8 @@ public class BrokerService implements Closeable {
254257
private final BundlesQuotas bundlesQuotas;
255258

256259
private PulsarChannelInitializer.Factory pulsarChannelInitFactory = PulsarChannelInitializer.DEFAULT_FACTORY;
260+
261+
private final List<Channel> listenChannels = new ArrayList<>(2);
257262
private Channel listenChannel;
258263
private Channel listenChannelTls;
259264

@@ -404,39 +409,46 @@ public void start() throws Exception {
404409
this.producerNameGenerator = new DistributedIdGenerator(pulsar.getCoordinationService(),
405410
PRODUCER_NAME_GENERATOR_PATH, pulsar.getConfiguration().getClusterName());
406411

407-
ServerBootstrap bootstrap = defaultServerBootstrap.clone();
408-
409412
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
410-
411-
bootstrap.childHandler(
412-
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, false));
413-
414-
Optional<Integer> port = serviceConfig.getBrokerServicePort();
415-
if (port.isPresent()) {
416-
// Bind and start to accept incoming connections.
417-
InetSocketAddress addr = new InetSocketAddress(pulsar.getBindAddress(), port.get());
413+
List<BindAddress> bindAddresses = BindAddressValidator.validateBindAddresses(serviceConfig,
414+
Arrays.asList("pulsar", "pulsar+ssl"));
415+
String internalListenerName = serviceConfig.getInternalListenerName();
416+
417+
// create a channel for each bind address
418+
if (bindAddresses.size() == 0) {
419+
throw new IllegalArgumentException("At least one broker bind address must be configured");
420+
}
421+
for (BindAddress a : bindAddresses) {
422+
InetSocketAddress addr = new InetSocketAddress(a.getAddress().getHost(), a.getAddress().getPort());
423+
boolean isTls = "pulsar+ssl".equals(a.getAddress().getScheme());
424+
PulsarChannelInitializer.PulsarChannelOptions opts = PulsarChannelInitializer.PulsarChannelOptions.builder()
425+
.enableTLS(isTls)
426+
.listenerName(a.getListenerName()).build();
427+
428+
ServerBootstrap b = defaultServerBootstrap.clone();
429+
b.childHandler(
430+
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, opts));
418431
try {
419-
listenChannel = bootstrap.bind(addr).sync().channel();
420-
log.info("Started Pulsar Broker service on {}", listenChannel.localAddress());
421-
} catch (Exception e) {
422-
throw new IOException("Failed to bind Pulsar broker on " + addr, e);
423-
}
424-
}
432+
Channel ch = b.bind(addr).sync().channel();
433+
listenChannels.add(ch);
434+
435+
// identify the primary channel. Note that the legacy bindings appear first and have no listener.
436+
if (StringUtils.isBlank(a.getListenerName())
437+
|| StringUtils.equalsIgnoreCase(a.getListenerName(), internalListenerName)) {
438+
if (this.listenChannel == null && !isTls) {
439+
this.listenChannel = ch;
440+
}
441+
if (this.listenChannelTls == null && isTls) {
442+
this.listenChannelTls = ch;
443+
}
444+
}
425445

426-
Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
427-
if (tlsPort.isPresent()) {
428-
ServerBootstrap tlsBootstrap = bootstrap.clone();
429-
tlsBootstrap.childHandler(
430-
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, true));
431-
try {
432-
listenChannelTls = tlsBootstrap.bind(new InetSocketAddress(
433-
pulsar.getBindAddress(), tlsPort.get())).sync()
434-
.channel();
435-
log.info("Started Pulsar Broker TLS service on {} - TLS provider: {}", listenChannelTls.localAddress(),
436-
SslContext.defaultServerProvider());
446+
log.info("Started Pulsar Broker service on {}, TLS: {}, listener: {}",
447+
ch.localAddress(),
448+
isTls ? SslContext.defaultServerProvider().toString() : "(none)",
449+
StringUtils.defaultString(a.getListenerName(), "(none)"));
437450
} catch (Exception e) {
438-
throw new IOException(String.format("Failed to start Pulsar Broker TLS service on %s:%d",
439-
pulsar.getBindAddress(), tlsPort.get()), e);
451+
throw new IOException("Failed to bind Pulsar broker on " + addr, e);
440452
}
441453
}
442454

@@ -700,14 +712,11 @@ public CompletableFuture<Void> closeAsync() {
700712
log.info("Continuing to second phase in shutdown.");
701713

702714
List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
703-
704-
if (listenChannel != null && listenChannel.isOpen()) {
705-
asyncCloseFutures.add(closeChannel(listenChannel));
706-
}
707-
708-
if (listenChannelTls != null && listenChannelTls.isOpen()) {
709-
asyncCloseFutures.add(closeChannel(listenChannelTls));
710-
}
715+
listenChannels.forEach(ch -> {
716+
if (ch.isOpen()) {
717+
asyncCloseFutures.add(closeChannel(ch));
718+
}
719+
});
711720

712721
if (interceptor != null) {
713722
interceptor.close();

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java

+29-10
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import io.netty.handler.ssl.SslHandler;
3131
import java.net.SocketAddress;
3232
import java.util.concurrent.TimeUnit;
33+
import lombok.Builder;
34+
import lombok.Data;
3335
import lombok.extern.slf4j.Slf4j;
3436
import org.apache.pulsar.broker.PulsarService;
3537
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -46,6 +48,7 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
4648
public static final String TLS_HANDLER = "tls";
4749

4850
private final PulsarService pulsar;
51+
private final String listenerName;
4952
private final boolean enableTls;
5053
private final boolean tlsEnabledWithKeyStore;
5154
private SslContextAutoRefreshBuilder<SslContext> sslCtxRefresher;
@@ -63,13 +66,14 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel>
6366
/**
6467
* @param pulsar
6568
* An instance of {@link PulsarService}
66-
* @param enableTLS
67-
* Enable tls or not
69+
* @param opts
70+
* Channel options
6871
*/
69-
public PulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception {
72+
public PulsarChannelInitializer(PulsarService pulsar, PulsarChannelOptions opts) throws Exception {
7073
super();
7174
this.pulsar = pulsar;
72-
this.enableTls = enableTLS;
75+
this.listenerName = opts.getListenerName();
76+
this.enableTls = opts.isEnableTLS();
7377
ServiceConfiguration serviceConfig = pulsar.getConfiguration();
7478
this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore();
7579
if (this.enableTls) {
@@ -130,7 +134,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
130134
// ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling
131135
// auto-read.
132136
ch.pipeline().addLast("flowController", new FlowControlHandler());
133-
ServerCnx cnx = newServerCnx(pulsar);
137+
ServerCnx cnx = newServerCnx(pulsar, listenerName);
134138
ch.pipeline().addLast("handler", cnx);
135139

136140
connections.put(ch.remoteAddress(), cnx);
@@ -147,14 +151,29 @@ private void refreshAuthenticationCredentials() {
147151
}
148152

149153
@VisibleForTesting
150-
protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
151-
return new ServerCnx(pulsar);
154+
protected ServerCnx newServerCnx(PulsarService pulsar, String listenerName) throws Exception {
155+
return new ServerCnx(pulsar, listenerName);
152156
}
153157

154158
public interface Factory {
155-
PulsarChannelInitializer newPulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception;
159+
PulsarChannelInitializer newPulsarChannelInitializer(
160+
PulsarService pulsar, PulsarChannelOptions opts) throws Exception;
156161
}
157162

158-
public static final Factory DEFAULT_FACTORY =
159-
(pulsar, tls) -> new PulsarChannelInitializer(pulsar, tls);
163+
public static final Factory DEFAULT_FACTORY = PulsarChannelInitializer::new;
164+
165+
@Data
166+
@Builder
167+
public static class PulsarChannelOptions {
168+
169+
/**
170+
* Indicates whether to enable TLS on the channel.
171+
*/
172+
private boolean enableTLS;
173+
174+
/**
175+
* The name of the listener to associate with the channel (optional).
176+
*/
177+
private String listenerName;
178+
}
160179
}

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@
154154
public class ServerCnx extends PulsarHandler implements TransportCnx {
155155
private final BrokerService service;
156156
private final SchemaRegistryService schemaService;
157+
private final String listenerName;
157158
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
158159
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
159160
private State state;
@@ -225,9 +226,14 @@ enum State {
225226
}
226227

227228
public ServerCnx(PulsarService pulsar) {
229+
this(pulsar, null);
230+
}
231+
232+
public ServerCnx(PulsarService pulsar, String listenerName) {
228233
super(pulsar.getBrokerService().getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
229234
this.service = pulsar.getBrokerService();
230235
this.schemaService = pulsar.getSchemaRegistryService();
236+
this.listenerName = listenerName;
231237
this.state = State.Start;
232238
ServiceConfiguration conf = pulsar.getConfiguration();
233239

@@ -400,8 +406,10 @@ private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName,
400406
protected void handleLookup(CommandLookupTopic lookup) {
401407
final long requestId = lookup.getRequestId();
402408
final boolean authoritative = lookup.isAuthoritative();
409+
410+
// use the connection-specific listener name by default.
403411
final String advertisedListenerName = lookup.hasAdvertisedListenerName() ? lookup.getAdvertisedListenerName()
404-
: null;
412+
: this.listenerName;
405413
if (log.isDebugEnabled()) {
406414
log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId);
407415
}

‎pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ protected PulsarService newPulsarService(ServiceConfiguration conf) throws Excep
7373
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
7474
BrokerService broker = new BrokerService(this, ioEventLoopGroup);
7575
broker.setPulsarChannelInitializerFactory(
76-
(_pulsar, tls) -> {
77-
return new PulsarChannelInitializer(_pulsar, tls) {
76+
(_pulsar, opts) -> {
77+
return new PulsarChannelInitializer(_pulsar, opts) {
7878
@Override
79-
protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
79+
protected ServerCnx newServerCnx(PulsarService pulsar, String listenerName) throws Exception {
8080
return new ServerCnx(pulsar) {
8181

8282
@Override

‎pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupRetryTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,10 @@ protected PulsarService newPulsarService(ServiceConfiguration conf) throws Excep
7979
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
8080
BrokerService broker = new BrokerService(this, ioEventLoopGroup);
8181
broker.setPulsarChannelInitializerFactory(
82-
(_pulsar, tls) -> {
83-
return new PulsarChannelInitializer(_pulsar, tls) {
82+
(_pulsar, opts) -> {
83+
return new PulsarChannelInitializer(_pulsar, opts) {
8484
@Override
85-
protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
85+
protected ServerCnx newServerCnx(PulsarService pulsar, String listenerName) throws Exception {
8686
connectionsCreated.incrementAndGet();
8787
return new ErrorByTopicServerCnx(pulsar, failureMap);
8888
}

‎site2/docs/reference-configuration.md

+2
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
161161
|exposeConsumerLevelMetricsInPrometheus|Whether to enable consumer level metrics.|false|
162162
|jvmGCMetricsLoggerClassName|Classname of Pluggable JVM GC metrics logger that can log GC specific metrics.|N/A|
163163
|bindAddress| Hostname or IP address the service binds on, default is 0.0.0.0. |0.0.0.0|
164+
|bindAddresses| Additional Hostname or IP addresses the service binds on: `listener_name:scheme://host:port,...`. ||
164165
|advertisedAddress| Hostname or IP address the service advertises to the outside world. If not set, the value of `InetAddress.getLocalHost().getHostName()` is used. ||
165166
|clusterName| Name of the cluster to which this broker belongs to ||
166167
|brokerDeduplicationEnabled| Sets the default behavior for message deduplication in the broker. If enabled, the broker will reject messages that were already stored in the topic. This setting can be overridden on a per-namespace basis. |false|
@@ -431,6 +432,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
431432
|brokerServicePort| The port on which the standalone broker listens for connections |6650|
432433
|webServicePort| The port used by the standalone broker for HTTP requests |8080|
433434
|bindAddress| The hostname or IP address on which the standalone service binds |0.0.0.0|
435+
|bindAddresses| Additional Hostname or IP addresses the service binds on: `listener_name:scheme://host:port,...`. ||
434436
|advertisedAddress| The hostname or IP address that the standalone service advertises to the outside world. If not set, the value of `InetAddress.getLocalHost().getHostName()` is used. ||
435437
| numAcceptorThreads | Number of threads to use for Netty Acceptor | 1 |
436438
| numIOThreads | Number of threads to use for Netty IO | 2 * Runtime.getRuntime().availableProcessors() |

0 commit comments

Comments
 (0)
Please sign in to comment.