Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

create kafka namespace if missing #593

Merged
merged 6 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,23 @@ public void start(BrokerService service) {
ZooKeeperUtils.tryCreatePath(brokerService.pulsar().getZkClient(),
kafkaConfig.getGroupIdZooKeeperPath(), new byte[0]);

PulsarAdmin pulsarAdmin;
try {
pulsarAdmin = brokerService.getPulsar().getAdminClient();
} catch (PulsarServerException e) {
log.error("init PulsarAdmin failed with ", e);
throw new IllegalStateException(e);
}
final ClusterData clusterData = ClusterData.builder()
.serviceUrl(brokerService.getPulsar().getWebServiceAddress())
.serviceUrlTls(brokerService.getPulsar().getWebServiceAddressTls())
.brokerServiceUrl(brokerService.getPulsar().getBrokerServiceUrl())
.brokerServiceUrlTls(brokerService.getPulsar().getBrokerServiceUrlTls())
.build();

// init and start group coordinator
try {
initGroupCoordinator(brokerService);
initGroupCoordinator(pulsarAdmin, clusterData);
startGroupCoordinator();
// and listener for Offset topics load/unload
brokerService.pulsar()
Expand All @@ -284,9 +298,18 @@ public void start(BrokerService service) {
log.error("initGroupCoordinator failed with", e);
throw new IllegalStateException(e);
}

// init kafka namespaces
try {
initKafkaNamespace(pulsarAdmin, clusterData);
} catch (Exception e) {
// no need to throw exception since we can create kafka namespace later
log.warn("init kafka failed, need to create it manually later", e);
}

if (kafkaConfig.isEnableTransactionCoordinator()) {
try {
initTransactionCoordinator();
initTransactionCoordinator(pulsarAdmin, clusterData);
startTransactionCoordinator();
} catch (Exception e) {
log.error("Initialized transaction coordinator failed.", e);
Expand Down Expand Up @@ -356,7 +379,11 @@ public void close() {
statsProvider.stop();
}

public void initGroupCoordinator(BrokerService service) throws Exception {
public void initKafkaNamespace(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception {
MetadataUtils.createKafkaNamespaceIfMissing(pulsarAdmin, clusterData, kafkaConfig);
}

public void initGroupCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception {
GroupConfig groupConfig = new GroupConfig(
kafkaConfig.getGroupMinSessionTimeoutMs(),
kafkaConfig.getGroupMaxSessionTimeoutMs(),
Expand All @@ -374,18 +401,10 @@ public void initGroupCoordinator(BrokerService service) throws Exception {
.offsetsRetentionMs(TimeUnit.MINUTES.toMillis(kafkaConfig.getOffsetsRetentionMinutes()))
.build();

PulsarAdmin pulsarAdmin = service.pulsar().getAdminClient();
final ClusterData clusterData = ClusterData.builder()
.serviceUrl(brokerService.getPulsar().getWebServiceAddress())
.serviceUrlTls(brokerService.getPulsar().getWebServiceAddressTls())
.brokerServiceUrl(brokerService.getPulsar().getBrokerServiceUrl())
.brokerServiceUrlTls(brokerService.getPulsar().getBrokerServiceUrlTls())
.build();
MetadataUtils.createOffsetMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);


this.groupCoordinator = GroupCoordinator.of(
(PulsarClientImpl) (service.pulsar().getClient()),
(PulsarClientImpl) (brokerService.pulsar().getClient()),
groupConfig,
offsetConfig,
SystemTimer.builder()
Expand All @@ -405,20 +424,12 @@ public void startGroupCoordinator() throws Exception {
}
}

public void initTransactionCoordinator() throws Exception {
public void initTransactionCoordinator(PulsarAdmin pulsarAdmin, ClusterData clusterData) throws Exception {
TransactionConfig transactionConfig = TransactionConfig.builder()
.transactionLogNumPartitions(kafkaConfig.getTxnLogTopicNumPartitions())
.transactionMetadataTopicName(MetadataUtils.constructTxnLogTopicBaseName(kafkaConfig))
.build();

PulsarAdmin pulsarAdmin = brokerService.getPulsar().getAdminClient();
final ClusterData clusterData = ClusterData.builder()
.serviceUrl(brokerService.getPulsar().getWebServiceAddress())
.serviceUrlTls(brokerService.getPulsar().getWebServiceAddressTls())
.brokerServiceUrl(brokerService.getPulsar().getBrokerServiceUrl())
.brokerServiceUrlTls(brokerService.getPulsar().getBrokerServiceUrlTls())
.build();

MetadataUtils.createTxnMetadataIfMissing(pulsarAdmin, clusterData, kafkaConfig);

this.transactionCoordinator = TransactionCoordinator.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
String kafkaMetadataTenant = conf.getKafkaMetadataTenant();
String kafkaMetadataNamespace = kafkaMetadataTenant + "/" + conf.getKafkaMetadataNamespace();

boolean clusterExists, tenantExists, namespaceExists, offsetsTopicExists;
clusterExists = tenantExists = namespaceExists = offsetsTopicExists = false;
boolean clusterExists = false;
boolean tenantExists = false;
boolean namespaceExists = false;
boolean offsetsTopicExists = false;

try {
Clusters clusters = pulsarAdmin.clusters();
Expand Down Expand Up @@ -190,13 +192,115 @@ private static void createKafkaMetadataIfMissing(PulsarAdmin pulsarAdmin,
}
}

/**
* This method creates the Kafka tenant and namespace if they are not currently present.
* <ul>
* <li>If the cluster does not exist this method will throw a PulsarServerException.NotFoundException</li>
* <li>If the tenant does not exist it will be created</li>
* <li>If the tenant exists but the allowedClusters list does not include the cluster this method will
* add the cluster to the allowedClusters list</li>
* <li>If the namespace does not exist it will be created</li>
* <li>If the namespace exists but the replicationClusters list does not include the cluster this method
* will add the cluster to the replicationClusters list</li>
* <li>If the offset topic does not exist it will be created</li>
* <li>If the offset topic exists but some partitions are missing, the missing partitions will be created</li>
* </ul>
*/
public static void createKafkaNamespaceIfMissing(PulsarAdmin pulsarAdmin,
ClusterData clusterData,
KafkaServiceConfiguration conf)
throws PulsarAdminException {
String cluster = conf.getClusterName();
String kafkaTenant = conf.getKafkaTenant();
String kafkaNamespace = kafkaTenant + "/" + conf.getKafkaNamespace();

boolean clusterExists = false;
boolean tenantExists = false;
boolean namespaceExists = false;

try {
Clusters clusters = pulsarAdmin.clusters();
if (!clusters.getClusters().contains(cluster)) {
try {
pulsarAdmin.clusters().createCluster(cluster, clusterData);
} catch (PulsarAdminException e) {
if (e instanceof ConflictException) {
log.info("Attempted to create cluster {} however it was created concurrently.", cluster);
} else {
// Re-throw all other exceptions
throw e;
}
}
} else {
ClusterData configuredClusterData = clusters.getCluster(cluster);
log.info("Cluster {} found: {}", cluster, configuredClusterData);
}
clusterExists = true;

// Check if the kafka tenant exists and create it if not
Tenants tenants = pulsarAdmin.tenants();
if (!tenants.getTenants().contains(kafkaTenant)) {
log.info("Tenant: {} does not exist, creating it ...", kafkaTenant);
tenants.createTenant(kafkaTenant,
TenantInfo.builder()
.adminRoles(conf.getSuperUserRoles())
.allowedClusters(Collections.singleton(cluster))
.build());
} else {
TenantInfo kafkaMetadataTenantInfo = tenants.getTenantInfo(kafkaTenant);
Set<String> allowedClusters = kafkaMetadataTenantInfo.getAllowedClusters();
if (!allowedClusters.contains(cluster)) {
log.info("Tenant: {} exists but cluster: {} is not in the allowedClusters list, updating it ...",
kafkaTenant, cluster);
allowedClusters.add(cluster);
tenants.updateTenant(kafkaTenant, kafkaMetadataTenantInfo);
}
}
tenantExists = true;

// Check if the kafka namespace exists and create it if not
Namespaces namespaces = pulsarAdmin.namespaces();
if (!namespaces.getNamespaces(kafkaTenant).contains(kafkaNamespace)) {
log.info("Namespaces: {} does not exist in tenant: {}, creating it ...",
kafkaNamespace, kafkaTenant);
Set<String> replicationClusters = Sets.newHashSet(cluster);
namespaces.createNamespace(kafkaNamespace, replicationClusters);
namespaces.setNamespaceReplicationClusters(kafkaNamespace, replicationClusters);
} else {
List<String> replicationClusters = namespaces.getNamespaceReplicationClusters(kafkaNamespace);
if (!replicationClusters.contains(cluster)) {
log.info("Namespace: {} exists but cluster: {} is not in the replicationClusters list,"
+ "updating it ...", kafkaNamespace, cluster);
Set<String> newReplicationClusters = Sets.newHashSet(replicationClusters);
newReplicationClusters.add(cluster);
namespaces.setNamespaceReplicationClusters(kafkaNamespace, newReplicationClusters);
}
}
namespaceExists = true;

} catch (PulsarAdminException e) {
if (e instanceof ConflictException) {
log.info("Resources concurrent creating and cause e: ", e);
return;
}

log.error("Failed to successfully initialize Kafka Metadata {}",
kafkaNamespace, e);
throw e;
} finally {
log.info("Current state of kafka metadata, cluster: {} exists: {}, tenant: {} exists: {},"
+ " namespace: {} exists: {}",
cluster, clusterExists, kafkaTenant, tenantExists, kafkaNamespace, namespaceExists);
}
}

private static void createTopicIfNotExist(final PulsarAdmin admin,
final String topic,
final int numPartitions) throws PulsarAdminException {
try {
admin.topics().createPartitionedTopic(topic, numPartitions);
} catch (PulsarAdminException.ConflictException e) {
log.info("Resources concurrent creating: {}", e.getMessage());
log.info("Resources concurrent creating for topic : {}, caused by : {}", topic, e.getMessage());
}
try {
// Ensure all partitions are created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -97,13 +96,6 @@ protected void setup() throws Exception {
conf.setProperties(properties);

super.internalSetup();

admin.tenants().createTenant(TENANT,
TenantInfo.builder()
.adminRoles(Collections.singleton(ADMIN_USER))
.allowedClusters(Collections.singleton(configClusterName))
.build());
admin.namespaces().createNamespace(TENANT + "/" + NAMESPACE);
admin.namespaces()
.setNamespaceReplicationClusters(TENANT + "/" + NAMESPACE, Sets.newHashSet(super.configClusterName));
admin.topics().createPartitionedTopic(TOPIC, 1);
Expand Down