Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 53946d4

Browse files
committed
Support multiple listeners without configuring broker's advertisedListeners (#864)
Fixes #818 ### Motivation #742 adds the multiple listeners support for KoP 2.8.x. However, there're some flaws. First, the KoP advertised listeners must be added to `advertisedListeners` config. This config should be treated as the advertised listeners for broker, not protocol handlers. What `KafkaProtocolHandler#getProtocolDataToAdvertise` returns is not used though it has been written to ZK. It also makes `kafkaAdvertisedListeners` meaningless and it was marked as deprecated. ### Modification Benefit by [PIP 95](apache/pulsar#12040), Pulsar client doesn't need to configure listener name for topic lookup. So this PR simplifies the implementation of `LookupClient` by using a shared `PulsarClient` without configuring listener name. Then this PR uses the `MetadataStoreCacheLoader` introduced from #820 to get the `kafkaAdvertisedListeners` from ZK. Since it has already been cached, this PR removes the `KOP_ADDRESS_CACHE`. To verify the feature, this PR adds a test `testMetadataRequestForMultiListeners`. The KoP config is like ```properties kafkaListeners=PLAINTEXT://0.0.0.0:<port1>,GW://0.0.0.0:<port2> kafkaProtocolMap=PLAINTEXT:PLAINTEXT,GW:PLAINTEXT kafkaAdvertisedListeners=PLAINTEXT://192.168.0.1:<port3>,GW://192.168.0.1:<port4> ``` And verify that each `KafkaRequestHandler` can handle the METADATA requests well.
1 parent b61d44e commit 53946d4

File tree

11 files changed

+188
-219
lines changed

11 files changed

+188
-219
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/EndPoint.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,18 @@ public InetSocketAddress getInetAddress() {
9090
return new InetSocketAddress(hostname, port);
9191
}
9292

93+
// listeners must be enable to be split into at least 1 token
94+
private static String[] getListenerArray(final String listeners) {
95+
if (StringUtils.isEmpty(listeners)) {
96+
throw new IllegalStateException("listeners is empty");
97+
}
98+
final String[] listenerArray = listeners.split(END_POINT_SEPARATOR);
99+
if (listenerArray.length == 0) {
100+
throw new IllegalStateException(listeners + " is split into 0 tokens by " + END_POINT_SEPARATOR);
101+
}
102+
return listenerArray;
103+
}
104+
93105
@VisibleForTesting
94106
public static Map<String, EndPoint> parseListeners(final String listeners) {
95107
return parseListeners(listeners, "");
@@ -98,7 +110,7 @@ public static Map<String, EndPoint> parseListeners(final String listeners) {
98110
private static Map<String, EndPoint> parseListeners(final String listeners,
99111
final Map<String, SecurityProtocol> protocolMap) {
100112
final Map<String, EndPoint> endPointMap = new HashMap<>();
101-
for (String listener : listeners.split(END_POINT_SEPARATOR)) {
113+
for (String listener : getListenerArray(listeners)) {
102114
final EndPoint endPoint = new EndPoint(listener, protocolMap);
103115
if (endPointMap.containsKey(endPoint.listenerName)) {
104116
throw new IllegalStateException(
@@ -115,6 +127,22 @@ public static Map<String, EndPoint> parseListeners(final String listeners, final
115127
return parseListeners(listeners, parseProtocolMap(protocolMapString));
116128
}
117129

130+
public static String findListener(final String listeners, final String name) {
131+
if (name == null) {
132+
return null;
133+
}
134+
for (String listener : getListenerArray(listeners)) {
135+
if (listener.contains(":") && listener.substring(0, listener.indexOf(":")).equals(name)) {
136+
return listener;
137+
}
138+
}
139+
throw new IllegalStateException("listener \"" + name + "\" doesn't exist in " + listeners);
140+
}
141+
142+
public static String findFirstListener(String listeners) {
143+
return getListenerArray(listeners)[0];
144+
}
145+
118146
public static EndPoint getPlainTextEndPoint(final String listeners) {
119147
for (String listener : listeners.split(END_POINT_SEPARATOR)) {
120148
if (listener.startsWith(SecurityProtocol.PLAINTEXT.name())

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.TLS_HANDLER;
1717

18+
import com.google.common.annotations.VisibleForTesting;
1819
import io.netty.channel.ChannelInitializer;
1920
import io.netty.channel.socket.SocketChannel;
2021
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -92,10 +93,13 @@ protected void initChannel(SocketChannel ch) throws Exception {
9293
ch.pipeline().addLast(new LengthFieldPrepender(4));
9394
ch.pipeline().addLast("frameDecoder",
9495
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
95-
ch.pipeline().addLast("handler",
96-
new KafkaRequestHandler(pulsarService, kafkaConfig,
97-
tenantContextManager, kopBrokerLookupManager, adminManager,
98-
enableTls, advertisedEndPoint, statsLogger));
96+
ch.pipeline().addLast("handler", newCnx());
9997
}
10098

99+
@VisibleForTesting
100+
public KafkaRequestHandler newCnx() throws Exception {
101+
return new KafkaRequestHandler(pulsarService, kafkaConfig,
102+
tenantContextManager, kopBrokerLookupManager, adminManager,
103+
enableTls, advertisedEndPoint, statsLogger);
104+
}
101105
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
8585
private KopBrokerLookupManager kopBrokerLookupManager;
8686
private AdminManager adminManager = null;
8787
private SystemTopicClient txnTopicClient;
88+
@VisibleForTesting
89+
@Getter
90+
private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializerMap;
8891

8992
@Getter
9093
@VisibleForTesting
@@ -577,7 +580,8 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
577580
forEach((listener, endPoint) ->
578581
builder.put(endPoint.getInetAddress(), newKafkaChannelInitializer(endPoint))
579582
);
580-
return builder.build();
583+
channelInitializerMap = builder.build();
584+
return channelInitializerMap;
581585
} catch (Exception e){
582586
log.error("KafkaProtocolHandler newChannelInitializers failed with ", e);
583587
return null;

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
203203

204204
private final Boolean tlsEnabled;
205205
private final EndPoint advertisedEndPoint;
206-
private final String advertisedListeners;
207206
private final int defaultNumPartitions;
208207
public final int maxReadEntriesNum;
209208
private final int failedAuthenticationDelayMs;
@@ -308,7 +307,6 @@ public KafkaRequestHandler(PulsarService pulsarService,
308307
this.adminManager = adminManager;
309308
this.tlsEnabled = tlsEnabled;
310309
this.advertisedEndPoint = advertisedEndPoint;
311-
this.advertisedListeners = kafkaConfig.getKafkaAdvertisedListeners();
312310
this.topicManager = new KafkaTopicManager(this);
313311
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
314312
this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
@@ -2481,7 +2479,7 @@ public CompletableFuture<PartitionMetadata> findBroker(TopicName topic) {
24812479
if (log.isDebugEnabled()) {
24822480
log.debug("[{}] Handle Lookup for {}", ctx.channel(), topic);
24832481
}
2484-
return kopBrokerLookupManager.findBroker(topic, advertisedEndPoint)
2482+
return kopBrokerLookupManager.findBroker(topic.toString(), advertisedEndPoint)
24852483
.thenApply(listenerInetSocketAddressOpt -> listenerInetSocketAddressOpt
24862484
.map(inetSocketAddress -> newPartitionMetadata(topic, newNode(inetSocketAddress)))
24872485
.orElse(null)

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
184184
category = CATEGORY_KOP,
185185
doc = "Comma-separated list of URIs we will listen on and the listener names.\n"
186186
+ "e.g. PLAINTEXT://localhost:9092,SSL://localhost:9093.\n"
187+
+ "Each URI's scheme represents a listener name if `kafkaProtocolMap` is configured.\n"
188+
+ "Otherwise, the scheme must be a valid protocol in [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL].\n"
187189
+ "If hostname is not set, bind to the default interface."
188190
)
189191
private String kafkaListeners;
@@ -195,10 +197,10 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
195197
)
196198
private String kafkaProtocolMap;
197199

198-
@Deprecated
199200
@FieldContext(
200201
category = CATEGORY_KOP,
201-
doc = "Use kafkaProtocolMap, kafkaListeners and advertisedAddress instead."
202+
doc = "Listeners to publish to ZooKeeper for clients to use.\n"
203+
+ "The format is the same as `kafkaListeners`.\n"
202204
)
203205
private String kafkaAdvertisedListeners;
204206

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java

+48-118
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import java.util.concurrent.CompletableFuture;
2222
import java.util.concurrent.ConcurrentHashMap;
2323
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.regex.Matcher;
2425
import javax.annotation.Nullable;
25-
import lombok.NonNull;
2626
import lombok.extern.slf4j.Slf4j;
2727
import org.apache.commons.lang3.StringUtils;
2828
import org.apache.pulsar.broker.PulsarService;
@@ -38,149 +38,79 @@
3838
@Slf4j
3939
public class KopBrokerLookupManager {
4040

41-
private final String advertisedListeners;
4241
private final LookupClient lookupClient;
4342
private final MetadataStoreCacheLoader metadataStoreCacheLoader;
4443

4544
private final AtomicBoolean closed = new AtomicBoolean(false);
4645

47-
public static final ConcurrentHashMap<String, ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>>
46+
public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>
4847
LOOKUP_CACHE = new ConcurrentHashMap<>();
4948

50-
public static final ConcurrentHashMap<String, CompletableFuture<Optional<String>>>
51-
KOP_ADDRESS_CACHE = new ConcurrentHashMap<>();
52-
5349
public KopBrokerLookupManager(KafkaServiceConfiguration conf, PulsarService pulsarService) throws Exception {
54-
this.advertisedListeners = conf.getKafkaAdvertisedListeners();
5550
this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
5651
this.metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarService.getPulsarResources(),
5752
conf.getBrokerLookupTimeoutMs());
5853
}
5954

60-
public CompletableFuture<Optional<InetSocketAddress>> findBroker(@NonNull TopicName topic,
55+
public CompletableFuture<Optional<InetSocketAddress>> findBroker(String topic,
6156
@Nullable EndPoint advertisedEndPoint) {
62-
if (log.isDebugEnabled()) {
63-
log.debug("Handle Lookup for topic {}", topic);
64-
}
65-
CompletableFuture<Optional<InetSocketAddress>> returnFuture = new CompletableFuture<>();
66-
67-
getTopicBroker(topic.toString(),
68-
advertisedEndPoint != null && advertisedEndPoint.isValidInProtocolMap()
69-
? advertisedEndPoint.getListenerName() : null)
70-
.thenApply(address -> getProtocolDataToAdvertise(address, topic, advertisedEndPoint))
71-
.thenAccept(kopAddressFuture -> kopAddressFuture.thenAccept(listenersOptional -> {
72-
if (!listenersOptional.isPresent()) {
73-
log.error("Not get advertise data for Kafka topic:{}.", topic);
74-
removeTopicManagerCache(topic.toString());
75-
returnFuture.complete(Optional.empty());
76-
return;
57+
return getTopicBroker(topic)
58+
.thenApply(internalListenerAddress -> {
59+
if (internalListenerAddress == null) {
60+
log.error("[{}] failed get pulsar address, returned null.", topic);
61+
removeTopicManagerCache(topic);
62+
return Optional.empty();
63+
} else if (log.isDebugEnabled()) {
64+
log.debug("[{}] Found broker's internal listener address: {}",
65+
topic, internalListenerAddress);
7766
}
7867

79-
// It's the `kafkaAdvertisedListeners` config that's written to ZK
80-
final String listeners = listenersOptional.get();
81-
final EndPoint endPoint =
82-
(advertisedEndPoint != null && advertisedEndPoint.isTlsEnabled()
83-
? EndPoint.getSslEndPoint(listeners) : EndPoint.getPlainTextEndPoint(listeners));
84-
85-
if (log.isDebugEnabled()) {
86-
log.debug("Found broker localListeners: {} for topicName: {}, "
87-
+ "localListeners: {}, found Listeners: {}",
88-
listeners, topic, advertisedListeners, listeners);
68+
try {
69+
final String listener = getAdvertisedListener(
70+
internalListenerAddress, topic, advertisedEndPoint);
71+
if (log.isDebugEnabled()) {
72+
log.debug("Found listener {} for topic {}", listener, topic);
73+
}
74+
final Matcher matcher = EndPoint.matcherListener(listener,
75+
listener + " cannot be split into 3 parts");
76+
return Optional.of(new InetSocketAddress(matcher.group(2), Integer.parseInt(matcher.group(3))));
77+
} catch (IllegalStateException | NumberFormatException e) {
78+
log.error("Failed to find the advertised listener: {}", e.getMessage());
79+
removeTopicManagerCache(topic);
80+
return Optional.empty();
8981
}
90-
91-
// here we found topic broker: broker2, but this is in broker1,
92-
// how to clean the lookup cache?
93-
if (!advertisedListeners.contains(endPoint.getOriginalListener())) {
94-
removeTopicManagerCache(topic.toString());
95-
}
96-
returnFuture.complete(Optional.of(endPoint.getInetAddress()));
97-
})).exceptionally(throwable -> {
98-
log.error("Not get advertise data for Kafka topic:{}. throwable: [{}]",
99-
topic, throwable.getMessage());
100-
removeTopicManagerCache(topic.toString());
101-
returnFuture.complete(Optional.empty());
102-
return null;
10382
});
104-
return returnFuture;
10583
}
10684

107-
// call pulsarclient.lookup.getbroker to get and own a topic.
108-
// when error happens, the returned future will complete with null.
109-
public CompletableFuture<InetSocketAddress> getTopicBroker(String topicName, String listenerName) {
85+
public CompletableFuture<InetSocketAddress> getTopicBroker(String topicName) {
11086
if (closed.get()) {
11187
if (log.isDebugEnabled()) {
11288
log.debug("Return null for getTopicBroker({}) since channel closing", topicName);
11389
}
11490
return CompletableFuture.completedFuture(null);
11591
}
11692

117-
ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> topicLookupCache =
118-
LOOKUP_CACHE.computeIfAbsent(topicName, t-> {
119-
if (log.isDebugEnabled()) {
120-
log.debug("Topic {} not in Lookup_cache, call lookupBroker", topicName);
121-
}
122-
ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>> cache = new ConcurrentHashMap<>();
123-
cache.put(listenerName == null ? "" : listenerName, lookupBroker(topicName, listenerName));
124-
return cache;
125-
});
126-
127-
return topicLookupCache.computeIfAbsent(listenerName == null ? "" : listenerName, t-> {
128-
if (log.isDebugEnabled()) {
129-
log.debug("Topic {} not in Lookup_cache, call lookupBroker", topicName);
130-
}
131-
return lookupBroker(topicName, listenerName);
132-
});
93+
if (log.isDebugEnabled()) {
94+
log.debug("Handle Lookup for topic {}", topicName);
95+
}
96+
return LOOKUP_CACHE.computeIfAbsent(topicName, this::lookupBroker);
13397
}
13498

135-
private CompletableFuture<InetSocketAddress> lookupBroker(final String topic, String listenerName) {
99+
private CompletableFuture<InetSocketAddress> lookupBroker(final String topic) {
136100
if (closed.get()) {
137101
if (log.isDebugEnabled()) {
138102
log.debug("Return null for getTopic({}) since channel closing", topic);
139103
}
140104
return CompletableFuture.completedFuture(null);
141105
}
142-
return lookupClient.getBrokerAddress(TopicName.get(topic), listenerName);
106+
return lookupClient.getBrokerAddress(TopicName.get(topic));
143107
}
144108

145-
private CompletableFuture<Optional<String>> getProtocolDataToAdvertise(
146-
InetSocketAddress pulsarAddress, TopicName topic, @Nullable EndPoint advertisedEndPoint) {
147-
CompletableFuture<Optional<String>> returnFuture = new CompletableFuture<>();
148-
149-
if (pulsarAddress == null) {
150-
log.error("[{}] failed get pulsar address, returned null.", topic.toString());
151-
152-
// getTopicBroker returns null. topic should be removed from LookupCache.
153-
removeTopicManagerCache(topic.toString());
154-
155-
returnFuture.complete(Optional.empty());
156-
return returnFuture;
157-
}
109+
private String getAdvertisedListener(InetSocketAddress internalListenerAddress,
110+
String topic,
111+
@Nullable EndPoint advertisedEndPoint) {
158112

159-
if (log.isDebugEnabled()) {
160-
log.debug("Found broker for topic {} puslarAddress: {}",
161-
topic, pulsarAddress);
162-
}
163-
164-
// get kop address from cache to prevent query zk each time.
165-
final CompletableFuture<Optional<String>> future = KOP_ADDRESS_CACHE.get(topic.toString());
166-
if (future != null) {
167-
return future;
168-
}
169-
170-
if (advertisedEndPoint != null && advertisedEndPoint.isValidInProtocolMap()) {
171-
// if kafkaProtocolMap is set, the lookup result is the advertised address
172-
String kafkaAdvertisedAddress = String.format("%s://%s:%s", advertisedEndPoint.getSecurityProtocol().name,
173-
pulsarAddress.getHostName(), pulsarAddress.getPort());
174-
KOP_ADDRESS_CACHE.put(topic.toString(), returnFuture);
175-
returnFuture.complete(Optional.ofNullable(kafkaAdvertisedAddress));
176-
if (log.isDebugEnabled()) {
177-
log.debug("{} get kafka Advertised Address through kafkaListenerName: {}",
178-
topic, pulsarAddress);
179-
}
180-
return returnFuture;
181-
}
182-
183-
List<LoadManagerReport> availableBrokers = metadataStoreCacheLoader.getAvailableBrokers();
113+
final List<LoadManagerReport> availableBrokers = metadataStoreCacheLoader.getAvailableBrokers();
184114
if (log.isDebugEnabled()) {
185115
availableBrokers.forEach(loadManagerReport ->
186116
log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, "
@@ -193,18 +123,20 @@ private CompletableFuture<Optional<String>> getProtocolDataToAdvertise(
193123
loadManagerReport.getProtocol(KafkaProtocolHandler.PROTOCOL_NAME)));
194124
}
195125

196-
String hostAndPort = pulsarAddress.getHostName() + ":" + pulsarAddress.getPort();
197-
Optional<LoadManagerReport> serviceLookupData = availableBrokers.stream()
126+
final String hostAndPort = internalListenerAddress.getHostName() + ":" + internalListenerAddress.getPort();
127+
final Optional<LoadManagerReport> serviceLookupData = availableBrokers.stream()
198128
.filter(loadManagerReport -> lookupDataContainsAddress(loadManagerReport, hostAndPort)).findAny();
199-
if (serviceLookupData.isPresent()) {
200-
KOP_ADDRESS_CACHE.put(topic.toString(), returnFuture);
201-
returnFuture.complete(serviceLookupData.get().getProtocol(KafkaProtocolHandler.PROTOCOL_NAME));
202-
} else {
203-
log.error("No node for broker {} under loadBalance", pulsarAddress);
204-
removeTopicManagerCache(topic.toString());
205-
returnFuture.complete(Optional.empty());
129+
if (!serviceLookupData.isPresent()) {
130+
log.error("No node for broker {} under loadBalance", internalListenerAddress);
131+
return null;
206132
}
207-
return returnFuture;
133+
134+
return serviceLookupData.get().getProtocol(KafkaProtocolHandler.PROTOCOL_NAME).map(kafkaAdvertisedListeners ->
135+
Optional.ofNullable(advertisedEndPoint)
136+
.map(endPoint -> EndPoint.findListener(kafkaAdvertisedListeners, endPoint.getListenerName()))
137+
.orElse(EndPoint.findFirstListener(kafkaAdvertisedListeners))
138+
).orElseThrow(() -> new IllegalStateException(
139+
"No kafkaAdvertisedListeners found in broker " + internalListenerAddress));
208140
}
209141

210142
// whether a ServiceLookupData contains wanted address.
@@ -215,12 +147,10 @@ private static boolean lookupDataContainsAddress(ServiceLookupData data, String
215147

216148
public static void removeTopicManagerCache(String topicName) {
217149
LOOKUP_CACHE.remove(topicName);
218-
KOP_ADDRESS_CACHE.remove(topicName);
219150
}
220151

221152
public static void clear() {
222153
LOOKUP_CACHE.clear();
223-
KOP_ADDRESS_CACHE.clear();
224154
}
225155

226156
public void close() {

0 commit comments

Comments
 (0)