Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Refactor KopBrokerLookupManager (#816)
Browse files Browse the repository at this point in the history
## Motivation
Currently, KoP has two lookup manager, is difficult to maintain in future. So we need move the lookup to some manager.

## Modifications
* Move `KafkaTopicManager` duplicate method to `KopBrokerLookupManager`
  • Loading branch information
Demogorgon314 authored and BewareMyPower committed Oct 20, 2021
1 parent f696293 commit 2841dcb
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 426 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class EndPoint {
private final int port;
@Getter
private final boolean multiListener;
@Getter
private final boolean tlsEnabled;

public EndPoint(final String listener, final Map<String, SecurityProtocol> protocolMap) {
this.originalListener = listener;
Expand All @@ -65,6 +67,7 @@ public EndPoint(final String listener, final Map<String, SecurityProtocol> proto
throw new IllegalStateException(this.listenerName + " is not set in kafkaProtocolMap");
}
}
this.tlsEnabled = (securityProtocol == SecurityProtocol.SSL) || (securityProtocol == SecurityProtocol.SASL_SSL);

final String originalHostname = matcher.group(2);
if (originalHostname.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.eclipse.jetty.util.ssl.SslContextFactory;

/**
Expand All @@ -43,6 +41,8 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final KafkaServiceConfiguration kafkaConfig;
@Getter
private final TenantContextManager tenantContextManager;
@Getter
private final KopBrokerLookupManager kopBrokerLookupManager;

private final AdminManager adminManager;
@Getter
Expand All @@ -53,25 +53,24 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final SslContextFactory.Server sslContextFactory;
@Getter
private final StatsLogger statsLogger;
private final MetadataCache<LocalBrokerData> localBrokerDataCache;

public KafkaChannelInitializer(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
TenantContextManager tenantContextManager,
KopBrokerLookupManager kopBrokerLookupManager,
AdminManager adminManager,
boolean enableTLS,
EndPoint advertisedEndPoint,
StatsLogger statsLogger,
MetadataCache<LocalBrokerData> localBrokerDataCache) {
StatsLogger statsLogger) {
super();
this.pulsarService = pulsarService;
this.kafkaConfig = kafkaConfig;
this.tenantContextManager = tenantContextManager;
this.kopBrokerLookupManager = kopBrokerLookupManager;
this.adminManager = adminManager;
this.enableTls = enableTLS;
this.advertisedEndPoint = advertisedEndPoint;
this.statsLogger = statsLogger;
this.localBrokerDataCache = localBrokerDataCache;
if (enableTls) {
sslContextFactory = SSLUtils.createSslContextFactory(kafkaConfig);
} else {
Expand All @@ -95,7 +94,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new KafkaRequestHandler(pulsarService, kafkaConfig,
tenantContextManager, adminManager, localBrokerDataCache,
tenantContextManager, kopBrokerLookupManager, adminManager,
enableTls, advertisedEndPoint, statsLogger));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;

/**
* Kafka Protocol Handler load and run by Pulsar Service.
Expand All @@ -84,9 +81,9 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
private StatsLogger rootStatsLogger;
private StatsLogger scopeStatsLogger;
private PrometheusMetricsProvider statsProvider;
@Getter
private KopBrokerLookupManager kopBrokerLookupManager;
private AdminManager adminManager = null;
private MetadataCache<LocalBrokerData> localBrokerDataCache;
private SystemTopicClient txnTopicClient;

@Getter
Expand Down Expand Up @@ -438,14 +435,6 @@ public void start(BrokerService service) {
KopVersion.getBuildTime());

brokerService = service;
kopBrokerLookupManager = new KopBrokerLookupManager(
brokerService.getPulsar(), false,
kafkaConfig.getKafkaAdvertisedListeners());
MetadataStoreExtended metadataStore = brokerService.pulsar().getLocalMetadataStore();
// Currently each time getMetadataCache() is called, a new MetadataCache<T> instance will be created, even for
// the same type. So we must reuse the same MetadataCache<LocalBrokerData> to avoid creating a lot of instances.
localBrokerDataCache = metadataStore.getMetadataCache(LocalBrokerData.class);

PulsarAdmin pulsarAdmin;
try {
pulsarAdmin = brokerService.getPulsar().getAdminClient();
Expand All @@ -459,6 +448,9 @@ public void start(BrokerService service) {
offsetTopicClient = new SystemTopicClient(brokerService.pulsar(), kafkaConfig);
txnTopicClient = new SystemTopicClient(brokerService.pulsar(), kafkaConfig);

kopBrokerLookupManager = new KopBrokerLookupManager(
brokerService.getPulsar(), kafkaConfig.getKafkaAdvertisedListeners());

brokerService.pulsar()
.getNamespaceService()
.addNamespaceBundleOwnershipListener(
Expand Down Expand Up @@ -563,25 +555,39 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti

try {
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder();
ImmutableMap.builder();

EndPoint.parseListeners(kafkaConfig.getListeners(), kafkaConfig.getKafkaProtocolMap()).
forEach((listener, endPoint) -> {
switch (endPoint.getSecurityProtocol()) {
case PLAINTEXT:
case SASL_PLAINTEXT:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, this, adminManager, false,
endPoint, scopeStatsLogger, localBrokerDataCache));
break;
case SSL:
case SASL_SSL:
builder.put(endPoint.getInetAddress(), new KafkaChannelInitializer(brokerService.getPulsar(),
kafkaConfig, this, adminManager, true,
endPoint, scopeStatsLogger, localBrokerDataCache));
break;
default:
}
switch (endPoint.getSecurityProtocol()) {
case PLAINTEXT:
case SASL_PLAINTEXT:
builder.put(endPoint.getInetAddress(),
new KafkaChannelInitializer(
brokerService.getPulsar(),
kafkaConfig,
this,
kopBrokerLookupManager,
adminManager,
false,
endPoint,
scopeStatsLogger));
break;
case SSL:
case SASL_SSL:
builder.put(endPoint.getInetAddress(),
new KafkaChannelInitializer(
brokerService.getPulsar(),
kafkaConfig,
this,
kopBrokerLookupManager,
adminManager,
true,
endPoint,
scopeStatsLogger));
break;
default:
}
});
return builder.build();
} catch (Exception e){
Expand All @@ -599,12 +605,12 @@ public void close() {
groupCoordinatorsByTenant.values().forEach(GroupCoordinator::shutdown);
kopEventManager.close();
transactionCoordinatorByTenant.values().forEach(TransactionCoordinator::shutdown);
KafkaTopicManager.LOOKUP_CACHE.clear();
KopBrokerLookupManager.clear();
KafkaTopicManager.cancelCursorExpireTask();
KafkaTopicConsumerManagerCache.getInstance().close();
KafkaTopicManager.getReferences().clear();
KafkaTopicManager.getTopics().clear();
kopBrokerLookupManager.close();
statsProvider.stop();
}

Expand Down
Loading

0 comments on commit 2841dcb

Please sign in to comment.