Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 5d3c94e

Browse files
wangjialing218wangjialing
authored andcommitted
support multiple listeners with multiple endpoints (#742)
Fixes #669 Fixes #574 To support multiple listeners with multiple protocols, we need to set follow configration, and deprecated `kafkaAdvertisedListeners` and `kafkaListenerName`. > kafkaListeners=internal://0.0.0.0:9092,internal_ssl://0.0.0.0:9093,external://0.0.0.0:19002,external_ssl:0.0.0.0:19003 > kafkaProtocalMap=internal:PLAINTEXT,internal_ssl:SSL,external:PLAINTEXT,external_ssl:SSL > advertisedListeners={pulsar's listeners},internal:pulsar//192.168.1.10:9092,internal_ssl:pulsar//192.168.1.10:9093,external:pulsar//172.16.1.10:19002,external_ssl:pulsar://172.16.1.10:19003 1. Define the listenerNames and port of each listenerName in `kafkaListeners` 2. Define the listenerNames and protocol of each listenerName in `kafkaProtocalMap` 3. Add listenerNames and advertised address of each listenerName into `advertisedListeners` Kafka client should connect to the port of the listenerName according to KIP-103. Co-authored-by: wangjialing <wangjialing@cmss.chinamobile.com>
1 parent 046b9f9 commit 5d3c94e

File tree

10 files changed

+250
-74
lines changed

10 files changed

+250
-74
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java

+56-10
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,43 @@
3131
public class EndPoint {
3232

3333
private static final String END_POINT_SEPARATOR = ",";
34+
private static final String PROTOCOL_MAP_SEPARATOR = ",";
35+
private static final String PROTOCOL_SEPARATOR = ":";
3436
private static final String REGEX = "^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)";
3537
private static final Pattern PATTERN = Pattern.compile(REGEX);
3638

3739
@Getter
3840
private final String originalListener;
3941
@Getter
42+
private final String listenerName;
43+
@Getter
4044
private final SecurityProtocol securityProtocol;
4145
@Getter
4246
private final String hostname;
4347
@Getter
4448
private final int port;
49+
@Getter
50+
private final boolean multiListener;
4551

46-
public EndPoint(final String listener) {
52+
public EndPoint(final String listener, final Map<String, SecurityProtocol> protocolMap) {
4753
this.originalListener = listener;
4854
final String errorMessage = "listener '" + listener + "' is invalid";
4955
final Matcher matcher = PATTERN.matcher(listener);
5056
checkState(matcher.find(), errorMessage);
5157
checkState(matcher.groupCount() == 3, errorMessage);
5258

53-
this.securityProtocol = SecurityProtocol.forName(matcher.group(1));
59+
this.listenerName = matcher.group(1);
60+
if (protocolMap == null || protocolMap.isEmpty()) {
61+
multiListener = false;
62+
this.securityProtocol = SecurityProtocol.forName(matcher.group(1));
63+
} else {
64+
multiListener = true;
65+
this.securityProtocol = protocolMap.get(this.listenerName);
66+
if (this.securityProtocol == null) {
67+
throw new IllegalStateException(this.listenerName + " is not set in kafkaProtocolMap");
68+
}
69+
}
70+
5471
final String originalHostname = matcher.group(2);
5572
if (originalHostname.isEmpty()) {
5673
try {
@@ -65,19 +82,26 @@ public EndPoint(final String listener) {
6582
checkState(port >= 0 && port <= 65535, errorMessage + ": port " + port + " is invalid");
6683
}
6784

85+
86+
6887
public InetSocketAddress getInetAddress() {
6988
return new InetSocketAddress(hostname, port);
7089
}
7190

72-
public static Map<SecurityProtocol, EndPoint> parseListeners(final String listeners) {
73-
final Map<SecurityProtocol, EndPoint> endPointMap = new HashMap<>();
91+
public static Map<String, EndPoint> parseListeners(final String listeners) {
92+
return parseListeners(listeners, null);
93+
}
94+
95+
public static Map<String, EndPoint> parseListeners(final String listeners, final String kafkaProtocolMap) {
96+
final Map<String, EndPoint> endPointMap = new HashMap<>();
97+
final Map<String, SecurityProtocol> protocolMap = parseProtocolMap(kafkaProtocolMap);
7498
for (String listener : listeners.split(END_POINT_SEPARATOR)) {
75-
final EndPoint endPoint = new EndPoint(listener);
76-
if (endPointMap.containsKey(endPoint.securityProtocol)) {
99+
final EndPoint endPoint = new EndPoint(listener, protocolMap);
100+
if (endPointMap.containsKey(endPoint.listenerName)) {
77101
throw new IllegalStateException(
78-
listeners + " has multiple listeners whose protocol is " + endPoint.securityProtocol);
102+
listeners + " has multiple listeners whose listenerName is " + endPoint.listenerName);
79103
} else {
80-
endPointMap.put(endPoint.securityProtocol, endPoint);
104+
endPointMap.put(endPoint.listenerName, endPoint);
81105
}
82106
}
83107
return endPointMap;
@@ -87,7 +111,7 @@ public static EndPoint getPlainTextEndPoint(final String listeners) {
87111
for (String listener : listeners.split(END_POINT_SEPARATOR)) {
88112
if (listener.startsWith(SecurityProtocol.PLAINTEXT.name())
89113
|| listener.startsWith(SecurityProtocol.SASL_PLAINTEXT.name())) {
90-
return new EndPoint(listener);
114+
return new EndPoint(listener, null);
91115
}
92116
}
93117
throw new IllegalStateException(listeners + " has no plain text endpoint");
@@ -97,9 +121,31 @@ public static EndPoint getSslEndPoint(final String listeners) {
97121
for (String listener : listeners.split(END_POINT_SEPARATOR)) {
98122
if (listener.startsWith(SecurityProtocol.SSL.name())
99123
|| listener.startsWith(SecurityProtocol.SASL_SSL.name())) {
100-
return new EndPoint(listener);
124+
return new EndPoint(listener, null);
101125
}
102126
}
103127
throw new IllegalStateException(listeners + " has no ssl endpoint");
104128
}
129+
130+
public static Map<String, SecurityProtocol> parseProtocolMap(final String kafkaProtocolMap) {
131+
132+
final Map<String, SecurityProtocol> protocolMap = new HashMap<>();
133+
if (kafkaProtocolMap == null) {
134+
return protocolMap;
135+
}
136+
137+
for (String protocolSet : kafkaProtocolMap.split(PROTOCOL_MAP_SEPARATOR)) {
138+
String[] protocol = protocolSet.split(PROTOCOL_SEPARATOR);
139+
if (protocol.length != 2) {
140+
throw new IllegalStateException(
141+
"wrong format for kafkaProtocolMap " + kafkaProtocolMap);
142+
}
143+
if (protocolMap.containsKey(protocol[0])) {
144+
throw new IllegalStateException(
145+
kafkaProtocolMap + " has multiple listeners whose listenerName is " + protocol[0]);
146+
}
147+
protocolMap.put(protocol[0], SecurityProtocol.forName(protocol[1]));
148+
}
149+
return protocolMap;
150+
}
105151
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java

+10-15
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.commons.configuration.PropertiesConfiguration;
5252
import org.apache.kafka.common.internals.Topic;
5353
import org.apache.kafka.common.record.CompressionType;
54-
import org.apache.kafka.common.security.auth.SecurityProtocol;
5554
import org.apache.kafka.common.utils.Time;
5655
import org.apache.pulsar.broker.PulsarServerException;
5756
import org.apache.pulsar.broker.PulsarService;
@@ -312,7 +311,8 @@ public String getProtocolDataToAdvertise() {
312311
public void start(BrokerService service) {
313312
brokerService = service;
314313
kopBrokerLookupManager = new KopBrokerLookupManager(
315-
brokerService.getPulsar(), false, kafkaConfig.getKafkaAdvertisedListeners());
314+
brokerService.getPulsar(), false,
315+
kafkaConfig.getKafkaAdvertisedListeners());
316316

317317
log.info("Starting KafkaProtocolHandler, kop version is: '{}'", KopVersion.getVersion());
318318
log.info("Git Revision {}", KopVersion.getGitSha());
@@ -440,29 +440,24 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
440440

441441
try {
442442
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
443-
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder();
444-
445-
final Map<SecurityProtocol, EndPoint> advertisedEndpointMap =
446-
EndPoint.parseListeners(kafkaConfig.getKafkaAdvertisedListeners());
447-
EndPoint.parseListeners(kafkaConfig.getListeners()).forEach((protocol, endPoint) -> {
448-
EndPoint advertisedEndPoint = advertisedEndpointMap.get(protocol);
449-
if (advertisedEndPoint == null) {
450-
// Use the bind endpoint as the advertised endpoint.
451-
advertisedEndPoint = endPoint;
452-
}
453-
switch (protocol) {
443+
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder();
444+
445+
EndPoint.parseListeners(kafkaConfig.getListeners(), kafkaConfig.getKafkaProtocolMap()).
446+
forEach((listener, endPoint) -> {
447+
switch (endPoint.getSecurityProtocol()) {
454448
case PLAINTEXT:
455449
case SASL_PLAINTEXT:
456450
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
457451
kafkaConfig, this, adminManager, false,
458-
advertisedEndPoint, scopeStatsLogger, localBrokerDataCache));
452+
endPoint, scopeStatsLogger, localBrokerDataCache));
459453
break;
460454
case SSL:
461455
case SASL_SSL:
462456
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
463457
kafkaConfig, this, adminManager, true,
464-
advertisedEndPoint, scopeStatsLogger, localBrokerDataCache));
458+
endPoint, scopeStatsLogger, localBrokerDataCache));
465459
break;
460+
default:
466461
}
467462
});
468463
return builder.build();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

+7-10
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@
157157
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
158158
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
159159
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
160-
import org.apache.kafka.common.security.auth.SecurityProtocol;
161160
import org.apache.kafka.common.utils.SystemTime;
162161
import org.apache.kafka.common.utils.Time;
163162
import org.apache.kafka.common.utils.Utils;
@@ -179,7 +178,6 @@
179178
import org.apache.pulsar.metadata.api.MetadataCache;
180179
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
181180
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
182-
import org.eclipse.jetty.util.StringUtil;
183181

184182
/**
185183
* This class contains all the request handling methods.
@@ -2188,11 +2186,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
21882186
return future;
21892187
}
21902188

2191-
// if kafkaListenerName is set, the lookup result is the advertised address
2192-
if (!StringUtil.isBlank(kafkaConfig.getKafkaListenerName())) {
2193-
// TODO: should add SecurityProtocol according to which endpoint is handling the request.
2194-
// firstly we only support PLAINTEXT when lookup with kafkaListenerName
2195-
String kafkaAdvertisedAddress = String.format("%s://%s:%s", SecurityProtocol.PLAINTEXT.name(),
2189+
if (advertisedEndPoint.isMultiListener()) {
2190+
// if kafkaProtocolMap is set, the lookup result is the advertised address
2191+
String kafkaAdvertisedAddress = String.format("%s://%s:%s", advertisedEndPoint.getSecurityProtocol().name,
21962192
pulsarAddress.getHostName(), pulsarAddress.getPort());
21972193
KafkaTopicManager.KOP_ADDRESS_CACHE.put(topic.toString(), returnFuture);
21982194
returnFuture.complete(Optional.ofNullable(kafkaAdvertisedAddress));
@@ -2285,7 +2281,8 @@ public CompletableFuture<PartitionMetadata> findBroker(TopicName topic) {
22852281
}
22862282
CompletableFuture<PartitionMetadata> returnFuture = new CompletableFuture<>();
22872283

2288-
topicManager.getTopicBroker(topic.toString())
2284+
topicManager.getTopicBroker(topic.toString(),
2285+
advertisedEndPoint.isMultiListener() ? advertisedEndPoint.getListenerName() : null)
22892286
.thenApply(address -> getProtocolDataToAdvertise(address, topic))
22902287
.thenAccept(kopAddressFuture -> kopAddressFuture.thenAccept(listenersOptional -> {
22912288
if (!listenersOptional.isPresent()) {
@@ -2298,8 +2295,8 @@ public CompletableFuture<PartitionMetadata> findBroker(TopicName topic) {
22982295
// It's the `kafkaAdvertisedListeners` config that's written to ZK
22992296
final String listeners = listenersOptional.get();
23002297
final EndPoint endPoint =
2301-
(tlsEnabled ? EndPoint.getSslEndPoint(listeners)
2302-
: EndPoint.getPlainTextEndPoint(listeners));
2298+
(tlsEnabled ? EndPoint.getSslEndPoint(listeners) :
2299+
EndPoint.getPlainTextEndPoint(listeners));
23032300
final Node node = newNode(endPoint.getInetAddress());
23042301

23052302
if (log.isDebugEnabled()) {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -179,17 +179,23 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
179179
private String kafkaListeners;
180180

181181
@FieldContext(
182-
category = CATEGORY_KOP,
183-
doc = "Listeners to publish to ZooKeeper for clients to use.\n"
184-
+ "The format is the same as `kafkaListeners`.\n"
182+
category = CATEGORY_KOP,
183+
doc = "Comma-separated map of listener name and protocol.\n"
184+
+ "e.g. PRIVATE:PLAINTEXT,PRIVATE_SSL:SSL,PUBLIC:PLAINTEXT,PUBLIC_SSL:SSL.\n"
185+
)
186+
private String kafkaProtocolMap;
187+
188+
@Deprecated
189+
@FieldContext(
190+
category = CATEGORY_KOP,
191+
doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress instead."
185192
)
186193
private String kafkaAdvertisedListeners;
187194

195+
@Deprecated
188196
@FieldContext(
189197
category = CATEGORY_KOP,
190-
doc = "Specify the internal listener name for the broker.\n"
191-
+ "The listener name must be contained in the advertisedListeners.\n"
192-
+ "This config is used as the listener name in topic lookup."
198+
doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress instead."
193199
)
194200
private String kafkaListenerName;
195201

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java

+19-7
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class KafkaTopicManager {
6767

6868
private final AtomicBoolean closed = new AtomicBoolean(false);
6969

70-
public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>
70+
public static final ConcurrentHashMap<String, ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>>
7171
LOOKUP_CACHE = new ConcurrentHashMap<>();
7272

7373
public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>>
@@ -178,32 +178,44 @@ private Producer registerInPersistentTopic(PersistentTopic persistentTopic) {
178178
// call pulsarclient.lookup.getbroker to get and
179179
// own a topic.
180180
// // when error happens, the returned future will complete with null.
181-
public CompletableFuture<InetSocketAddress> getTopicBroker(String topicName) {
181+
public CompletableFuture<InetSocketAddress> getTopicBroker(String topicName, String listenerName) {
182182
if (closed.get()) {
183183
if (log.isDebugEnabled()) {
184184
log.debug("[{}] Return null for getTopicBroker({}) since channel closing",
185185
requestHandler.ctx.channel(), topicName);
186186
}
187187
return CompletableFuture.completedFuture(null);
188188
}
189-
return LOOKUP_CACHE.computeIfAbsent(topicName, t -> {
189+
190+
ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> topicLookupCache =
191+
LOOKUP_CACHE.computeIfAbsent(topicName, t-> {
192+
if (log.isDebugEnabled()) {
193+
log.debug("[{}] topic {} not in Lookup_cache, call lookupBroker",
194+
requestHandler.ctx.channel(), topicName);
195+
}
196+
ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> cache = new ConcurrentHashMap<>();
197+
cache.put(listenerName == null ? "" : listenerName, lookupBroker(topicName, listenerName));
198+
return cache;
199+
});
200+
201+
return topicLookupCache.computeIfAbsent(listenerName == null ? "" : listenerName, t-> {
190202
if (log.isDebugEnabled()) {
191203
log.debug("[{}] topic {} not in Lookup_cache, call lookupBroker",
192-
requestHandler.ctx.channel(), topicName);
204+
requestHandler.ctx.channel(), topicName);
193205
}
194-
return lookupBroker(topicName);
206+
return lookupBroker(topicName, listenerName);
195207
});
196208
}
197209

198-
private CompletableFuture<InetSocketAddress> lookupBroker(final String topic) {
210+
private CompletableFuture<InetSocketAddress> lookupBroker(final String topic, String listenerName) {
199211
if (closed.get()) {
200212
if (log.isDebugEnabled()) {
201213
log.debug("[{}] Return null for getTopic({}) since channel closing",
202214
requestHandler.ctx.channel(), topic);
203215
}
204216
return CompletableFuture.completedFuture(null);
205217
}
206-
return lookupClient.getBrokerAddress(TopicName.get(topic));
218+
return lookupClient.getBrokerAddress(TopicName.get(topic), listenerName);
207219
}
208220

209221
// A wrapper of `BrokerService#getTopic` that is to find the topic's associated `PersistentTopic` instance

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public class KopBrokerLookupManager {
5252
public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>>
5353
KOP_ADDRESS_CACHE = new ConcurrentHashMap<>();
5454

55-
public KopBrokerLookupManager(PulsarService pulsarService, Boolean tlsEnabled, String advertisedListeners) {
55+
public KopBrokerLookupManager(PulsarService pulsarService, Boolean tlsEnabled,
56+
String advertisedListeners) {
5657
this.pulsarService = pulsarService;
5758
this.tlsEnabled = tlsEnabled;
5859
this.advertisedListeners = advertisedListeners;
@@ -82,8 +83,8 @@ public CompletableFuture<InetSocketAddress> findBroker(String topic) {
8283

8384
// It's the `kafkaAdvertisedListeners` config that's written to ZK
8485
final EndPoint endPoint =
85-
tlsEnabled ? EndPoint.getSslEndPoint(listeners.get())
86-
: EndPoint.getPlainTextEndPoint(listeners.get());
86+
(tlsEnabled ? EndPoint.getSslEndPoint(listeners.get()) :
87+
EndPoint.getPlainTextEndPoint(listeners.get()));
8788

8889
// here we found topic broker: broker2, but this is in broker1,
8990
// how to clean the lookup cache?

0 commit comments

Comments
 (0)