Skip to content

Commit

Permalink
Topic enforcer to generate Strimzi KafkaTopic resources.
Browse files Browse the repository at this point in the history
  • Loading branch information
shk3 committed Dec 10, 2024
1 parent b36a3de commit 739b9ce
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 0 deletions.
16 changes: 16 additions & 0 deletions kafka_topic_enforcer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Kafka topic enforcer's goal is to automate Kafka topic management & hence remove
* Self service, removed dependency on a human
* Simple configuration

If you choose to use Strimzi Kafka operator, this command can also generate Strimzi KafkaTopic CRDs.

## Dependencies

* JVM
Expand Down Expand Up @@ -51,6 +53,9 @@ Usage: <main class> [options] [command] [command options]
enforce Enforce given configuration
Usage: enforce [options] path to a configuration file
Options:
--cluster
a cluster name, if specified, consolidated (multi-cluster)
configuration file is expected
--continuous, -c
run enforcement continuously
Default: false
Expand All @@ -63,6 +68,17 @@ Usage: <main class> [options] [command] [command options]
--unsafemode
run in unsafe mode, topic deletion is _only_ allowed in this mode
Default: false
strimzi Generate the Strimzi KafkaTopic resources YAML on stdout from
the topic config. Certain information such as topic tags and
config comments would be missed, and the resource
metadata names are also converted to conform RFC 1123.
Usage: strimzi [options] /path/to/a/configuration/file
Options:
--cluster
a cluster name, if specified, consolidated (multi-cluster)
configuration file is expected
* --kafka_cluster, -k
the name of the Strimzi Kafka cluster to be generated for
```
## Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ java_library(
"//3rdparty/jvm/com/fasterxml/jackson/core:jackson_databind",
"//3rdparty/jvm/com/fasterxml/jackson/dataformat:jackson_dataformat_yaml",
"//3rdparty/jvm/com/fasterxml/jackson/module:jackson_module_parameter_names",
"//3rdparty/jvm/com/google/guava",
"//3rdparty/jvm/io/fabric8:kubernetes_client_api",
"//3rdparty/jvm/io/fabric8:kubernetes_model_common",
"//3rdparty/jvm/io/fabric8:kubernetes_model_core",
"//3rdparty/jvm/io/prometheus:simpleclient",
"//3rdparty/jvm/io/prometheus:simpleclient_httpserver",
"//3rdparty/jvm/io/strimzi:api",
"//3rdparty/jvm/org/apache/kafka:kafka_clients",
"//3rdparty/jvm/org/slf4j:slf4j_api",
"//3rdparty/jvm_shaded/com/google/guava_shaded",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static void main(String[] args) {
final Main main = new Main();
final Map<String, BaseCommand<ConfiguredTopic>> commands = new HashMap<>();
commands.put("validate", new BaseCommand<>());
commands.put("strimzi", new StrimziCommand());
commands.put("dump", new DumpCommand());
commands.put("enforce", new TopicEnforceCommand());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2024 Tesla Motors, Inc. All rights reserved.
*/

package com.tesla.data.topic.enforcer;

import com.tesla.data.enforcer.BaseCommand;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import com.google.common.base.CaseFormat;
import com.google.common.hash.Hashing;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Parameters(commandDescription = "Generate the Strimzi KafkaTopic resources YAML " +
"(https://strimzi.io/docs/operators/latest/configuring.html#type-KafkaTopic-reference) " +
"on stdout from the topic config. Certain information such as topic tags and config comments would be missed, " +
"and the resource metadata names are also converted to conform RFC 1123.")
public class StrimziCommand extends BaseCommand<ConfiguredTopic> {

@Parameter(
names = {"--kafka_cluster", "-k"},
description = "the name of the Strimzi Kafka cluster to be generated for",
required = true)
protected String kafkaCluster;

@Override
public int run() {
List<ConfiguredTopic> configuredTopics = configuredEntities(ConfiguredTopic.class, "topics", "topicsFile");

ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()
.configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, true)
.configure(YAMLGenerator.Feature.LITERAL_BLOCK_STYLE, true)
.configure(YAMLGenerator.Feature.MINIMIZE_QUOTES, true)
.configure(YAMLGenerator.Feature.INDENT_ARRAYS, false)
.configure(YAMLGenerator.Feature.SPLIT_LINES, false)
);

try {
for (ConfiguredTopic topic : configuredTopics) {
System.out.println(yamlMapper.writeValueAsString(getKafkaTopic(topic)));
}
} catch (JsonProcessingException e) {
LOG.error("Failed to dump config", e);
return FAILURE;
}

return SUCCESS;
}

/**
* Takes the best effort to convert the topic names to a resource name compatible format. We are not aiming at perfect
* correctness for mixed cases. Instead, we take some heuristics to get some good enough results.
*/
private String toResourceName(String s) {
// If the topic name contains an underscore, we assume it follows underscore case.
if (s.contains("_")) {
return CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_HYPHEN, s.toLowerCase()).replaceAll("[^-.a-z0-9]","");
}
// We assume the original name is upper camel case and take the best effort, even if it's not.
return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_HYPHEN, s).replaceAll("[^-.a-z0-9]","");
}

private KafkaTopic getKafkaTopic(ConfiguredTopic enforcerTopic) {
// Kubernetes resources follow RFC 1123 naming convention, so we have to remove the unsupported characters. To avoid
// duplications, we are appending a short hash string of the original topic name.
String topicHash = Hashing.sha256().hashString(enforcerTopic.getName(), StandardCharsets.UTF_8)
.toString().substring(0, 6);
// There could be multiple topics with the same name but belonging to different Kafka clusters. They are different
// topics. To avoid ambiguity, we prepend the Kafka cluster names in the KafkaTopic metadata name.
String resourceName = kafkaCluster + "." + toResourceName(enforcerTopic.getName()) + "-" + topicHash;

return new KafkaTopicBuilder()
.withNewMetadata()
.withName(resourceName)
.withLabels(Map.of("strimzi.io/cluster", kafkaCluster))
.endMetadata()
.withNewSpec()
.withTopicName(enforcerTopic.getName())
.withPartitions(enforcerTopic.getPartitions())
.withReplicas((int) enforcerTopic.getReplicationFactor())
.withConfig(new HashMap<>(enforcerTopic.getConfig()))
.endSpec().build();
}
}


0 comments on commit 739b9ce

Please sign in to comment.