Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit d55c1b1

Browse files
wangjialing218wangjialing
authored andcommitted
skip to get kafka advertised addres from zk when lookup with listenerName (#670)
This is the first step for #669, only support one listener which configed in `kafkaListenerName`, and only support PLAINTEXT Co-authored-by: wangjialing <wangjialing@cmss.chinamobile.com>
1 parent 5601c66 commit d55c1b1

File tree

3 files changed

+40
-6
lines changed

3 files changed

+40
-6
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

+18
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@
156156
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
157157
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
158158
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
159+
import org.apache.kafka.common.security.auth.SecurityProtocol;
159160
import org.apache.kafka.common.utils.SystemTime;
160161
import org.apache.kafka.common.utils.Time;
161162
import org.apache.kafka.common.utils.Utils;
@@ -177,6 +178,7 @@
177178
import org.apache.pulsar.metadata.api.MetadataCache;
178179
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
179180
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
181+
import org.eclipse.jetty.util.StringUtil;
180182

181183
/**
182184
* This class contains all the request handling methods.
@@ -2108,6 +2110,22 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
21082110
if (future != null) {
21092111
return future;
21102112
}
2113+
2114+
// if kafkaListenerName is set, the lookup result is the advertised address
2115+
if (!StringUtil.isBlank(kafkaConfig.getKafkaListenerName())) {
2116+
// TODO: should add SecurityProtocol according to which endpoint is handling the request.
2117+
// firstly we only support PLAINTEXT when lookup with kafkaListenerName
2118+
String kafkaAdvertisedAddress = String.format("%s://%s:%s", SecurityProtocol.PLAINTEXT.name(),
2119+
pulsarAddress.getHostName(), pulsarAddress.getPort());
2120+
KafkaTopicManager.KOP_ADDRESS_CACHE.put(topic.toString(), returnFuture);
2121+
returnFuture.complete(Optional.ofNullable(kafkaAdvertisedAddress));
2122+
if (log.isDebugEnabled()) {
2123+
log.debug("{} get kafka Advertised Address through kafkaListenerName: {}",
2124+
topic, pulsarAddress);
2125+
}
2126+
return returnFuture;
2127+
}
2128+
21112129
// advertised data is write in /loadbalance/brokers/advertisedAddress:webServicePort
21122130
// here we get the broker url, need to find related webServiceUrl.
21132131
pulsarService.getPulsarResources()

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java

+18
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.ConcurrentHashMap;
2424
import java.util.stream.Collectors;
2525
import lombok.extern.slf4j.Slf4j;
26+
import org.apache.kafka.common.security.auth.SecurityProtocol;
2627
import org.apache.pulsar.broker.PulsarService;
2728
import org.apache.pulsar.broker.loadbalance.LoadManager;
2829
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -32,6 +33,7 @@
3233
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
3334
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
3435
import org.apache.pulsar.zookeeper.ZooKeeperCache;
36+
import org.eclipse.jetty.util.StringUtil;
3537

3638

3739
/**
@@ -165,6 +167,22 @@ private CompletableFuture<Optional<String>> getProtocolDataToAdvertise(
165167
if (KOP_ADDRESS_CACHE.containsKey(topic.toString())) {
166168
return KOP_ADDRESS_CACHE.get(topic.toString());
167169
}
170+
171+
// if kafkaListenerName is set, the lookup result is the advertised address
172+
if (!StringUtil.isBlank(lookupClient.getPulsarClient().getConfiguration().getListenerName())) {
173+
// TODO: should add SecurityProtocol according to which endpoint is handling the request.
174+
// firstly we only support PLAINTEXT when lookup with kafkaListenerName
175+
String kafkaAdvertisedAddress = String.format("%s://%s:%s", SecurityProtocol.PLAINTEXT.name(),
176+
pulsarAddress.getHostName(), pulsarAddress.getPort());
177+
KOP_ADDRESS_CACHE.put(topic.toString(), kopAddressFuture);
178+
if (log.isDebugEnabled()) {
179+
log.debug("{} get kafka Advertised Address through kafkaListenerName: {}",
180+
topic, pulsarAddress);
181+
}
182+
kopAddressFuture.complete(Optional.ofNullable(kafkaAdvertisedAddress));
183+
return kopAddressFuture;
184+
}
185+
168186
// advertised data is write in /loadbalance/brokers/advertisedAddress:webServicePort
169187
// here we get the broker url, need to find related webServiceUrl.
170188
ZooKeeperCache zkCache = pulsarService.getLocalZkCache();

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaListenerNameTest.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,13 @@ protected void cleanup() throws Exception {
3939
public void testListenerName() throws Exception {
4040
super.resetConfig();
4141
conf.setAdvertisedAddress(null);
42-
conf.setInternalListenerName("external");
43-
// There's a limit that PulsarService doesn't use advertised listener's address as it's brokerServiceUrl's
44-
// address. So here the "external" listener's port should be the same with brokerPort and address should be
45-
// localhost.
4642
final String localAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null);
43+
conf.setInternalListenerName("pulsar");
4744
final String advertisedListeners =
48-
"internal:pulsar://192.168.0.2:6650,external:pulsar://" + localAddress + ":" + brokerPort;
45+
"pulsar:pulsar://" + localAddress + ":" + brokerPort
46+
+ ",kafka:pulsar://" + "localhost:" + kafkaBrokerPort;
4947
conf.setAdvertisedListeners(advertisedListeners);
50-
conf.setKafkaListenerName("external");
48+
conf.setKafkaListenerName("kafka");
5149
log.info("Set advertisedListeners to {}", advertisedListeners);
5250
super.internalSetup();
5351

0 commit comments

Comments
 (0)