Skip to content

Commit

Permalink
Synchronize topic replicas + partitions (#52)
Browse files Browse the repository at this point in the history
* Display previous value for topic config update

* Implement replication and partitions synchronisation

* Cleanup + respect the includeUnchanged flag
  • Loading branch information
jrevillard authored Sep 21, 2021
1 parent 19fa471 commit 4309147
Show file tree
Hide file tree
Showing 66 changed files with 1,024 additions and 298 deletions.
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
.classpath
.factorypath
.project
.settings/
bin/
.idea/
*.iml
.gradle/
Expand All @@ -6,4 +11,5 @@ out/
docker/data/
state.yaml
plan.json
test.py
test.py
/generated/
57 changes: 55 additions & 2 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
- ./data/zoo1/datalog:/datalog

kafka1:
image: confluentinc/cp-kafka:5.3.1
image: confluentinc/cp-kafka:5.5.3
hostname: kafka1
ports:
- "9092:9092"
Expand All @@ -38,4 +38,57 @@ services:
- ./data/kafka1/data:/var/lib/kafka/data
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
depends_on:
- zoo1
- zoo1

kafka2:
image: confluentinc/cp-kafka:5.5.3
hostname: kafka2
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:SASL_PLAINTEXT,LISTENER_DOCKER_EXTERNAL:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 2
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
ZOOKEEPER_SASL_ENABLED: "false"
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer"
KAFKA_SUPER_USERS: "User:test;User:kafka"
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
volumes:
- ./data/kafka2/data:/var/lib/kafka/data
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
depends_on:
- zoo1

kafka3:
image: confluentinc/cp-kafka:5.5.3
hostname: kafka3
ports:
- "9094:9094"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:SASL_PLAINTEXT,LISTENER_DOCKER_EXTERNAL:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 3
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
ZOOKEEPER_SASL_ENABLED: "false"
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.auth.SimpleAclAuthorizer"
KAFKA_SUPER_USERS: "User:test;User:kafka"
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
volumes:
- ./data/kafka3/data:/var/lib/kafka/data
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
depends_on:
- zoo1

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public interface TopicConfigPlan {
String getKey();

Optional<String> getValue();

Optional<String> getPreviousValue();

PlanAction getAction();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.devshawn.kafka.gitops.domain.plan;

import java.util.Optional;
import org.inferred.freebuilder.FreeBuilder;
import com.devshawn.kafka.gitops.enums.PlanAction;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;

@FreeBuilder
@JsonDeserialize(builder = TopicDetailsPlan.Builder.class)
public interface TopicDetailsPlan {
Optional<Integer> getPartitions();
Optional<Integer> getPreviousPartitions();
PlanAction getPartitionsAction();

Optional<Integer> getReplication();
Optional<Integer> getPreviousReplication();
PlanAction getReplicationAction();

public static Optional<TopicDetailsPlan> toChangesOnlyPlan(Optional<TopicDetailsPlan> topicDetailsPlan) {
if(! topicDetailsPlan.isPresent()) {
return topicDetailsPlan;
}
TopicDetailsPlan.Builder builder = new TopicDetailsPlan.Builder();
builder.setReplicationAction(topicDetailsPlan.get().getReplicationAction());
builder.setPartitionsAction(topicDetailsPlan.get().getPartitionsAction());
boolean nochanges = true;
if ( topicDetailsPlan.get().getReplicationAction() != null && ! topicDetailsPlan.get().getReplicationAction().equals(PlanAction.NO_CHANGE)) {
builder.setReplication(topicDetailsPlan.get().getReplication());
builder.setPreviousReplication(topicDetailsPlan.get().getPreviousReplication());
nochanges = false;
}
if (topicDetailsPlan.get().getPartitionsAction() != null && ! topicDetailsPlan.get().getPartitionsAction().equals(PlanAction.NO_CHANGE)) {
builder.setPartitions(topicDetailsPlan.get().getPartitions());
builder.setPreviousPartitions(topicDetailsPlan.get().getPreviousPartitions());
nochanges = false;
}
if(nochanges) {
return Optional.<TopicDetailsPlan>empty();
}
return Optional.of(builder.build());
}
class Builder extends TopicDetailsPlan_Builder {
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.devshawn.kafka.gitops.domain.plan;

import com.devshawn.kafka.gitops.domain.state.TopicDetails;
import com.devshawn.kafka.gitops.enums.PlanAction;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.inferred.freebuilder.FreeBuilder;
Expand All @@ -16,12 +15,15 @@ public interface TopicPlan {

PlanAction getAction();

Optional<TopicDetails> getTopicDetails();
Optional<TopicDetailsPlan> getTopicDetailsPlan();

List<TopicConfigPlan> getTopicConfigPlans();

default TopicPlan toChangesOnlyPlan() {
TopicPlan.Builder builder = new TopicPlan.Builder().setName(getName()).setAction(getAction()).setTopicDetails(getTopicDetails());
TopicPlan.Builder builder = new TopicPlan.Builder().setName(getName()).setAction(getAction());

builder.setTopicDetailsPlan(TopicDetailsPlan.toChangesOnlyPlan(getTopicDetailsPlan()));

getTopicConfigPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).forEach(builder::addTopicConfigPlans);
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.devshawn.kafka.gitops.domain.state;

import com.devshawn.kafka.gitops.exception.ValidationException;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
Expand All @@ -11,10 +10,6 @@
import org.apache.kafka.common.resource.ResourceType;
import org.inferred.freebuilder.FreeBuilder;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

@FreeBuilder
@JsonDeserialize(builder = AclDetails.Builder.class)
public abstract class AclDetails {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import com.devshawn.kafka.gitops.config.ManagerConfig;
import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
import com.devshawn.kafka.gitops.domain.plan.TopicConfigPlan;
import com.devshawn.kafka.gitops.domain.plan.TopicDetailsPlan;
import com.devshawn.kafka.gitops.domain.plan.TopicPlan;
import com.devshawn.kafka.gitops.enums.PlanAction;
import com.devshawn.kafka.gitops.service.KafkaService;
import com.devshawn.kafka.gitops.util.LogUtil;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;

import java.util.*;
Expand All @@ -24,13 +26,25 @@ public ApplyManager(ManagerConfig managerConfig, KafkaService kafkaService) {
}

public void applyTopics(DesiredPlan desiredPlan) {
Collection<Node> clusterNodes = kafkaService.describeClusterNodes();
desiredPlan.getTopicPlans().forEach(topicPlan -> {
if (topicPlan.getAction() == PlanAction.ADD) {
LogUtil.printTopicPreApply(topicPlan);
kafkaService.createTopic(topicPlan.getName(), topicPlan.getTopicDetails().get());
kafkaService.createTopic(topicPlan.getName(), topicPlan.getTopicDetailsPlan().get(), topicPlan.getTopicConfigPlans());
LogUtil.printPostApply();
} else if (topicPlan.getAction() == PlanAction.UPDATE) {
LogUtil.printTopicPreApply(topicPlan);

if(topicPlan.getTopicDetailsPlan().isPresent()) {
// Update Replication factor and partition number
TopicDetailsPlan topicDetailsPlan = topicPlan.getTopicDetailsPlan().get();
if(topicDetailsPlan.getPartitionsAction() == PlanAction.UPDATE) {
kafkaService.addTopicPartition(topicPlan.getName(), topicDetailsPlan.getPartitions().get());
}
if(topicDetailsPlan.getReplicationAction() == PlanAction.UPDATE) {
kafkaService.updateTopicReplication(clusterNodes, topicPlan.getName(), topicDetailsPlan.getReplication().get());
}
}
topicPlan.getTopicConfigPlans().forEach(topicConfigPlan -> applyTopicConfiguration(topicPlan, topicConfigPlan));
LogUtil.printPostApply();
} else if (topicPlan.getAction() == PlanAction.REMOVE && !managerConfig.isDeleteDisabled()) {
Expand Down
81 changes: 64 additions & 17 deletions src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
import com.devshawn.kafka.gitops.domain.plan.PlanOverview;
import com.devshawn.kafka.gitops.domain.plan.TopicConfigPlan;
import com.devshawn.kafka.gitops.domain.plan.TopicDetailsPlan;
import com.devshawn.kafka.gitops.domain.plan.TopicPlan;
import com.devshawn.kafka.gitops.domain.state.AclDetails;
import com.devshawn.kafka.gitops.domain.state.DesiredState;
import com.devshawn.kafka.gitops.domain.state.TopicDetails;
import com.devshawn.kafka.gitops.enums.PlanAction;
import com.devshawn.kafka.gitops.exception.PlanIsUpToDateException;
import com.devshawn.kafka.gitops.exception.ReadPlanInputException;
import com.devshawn.kafka.gitops.exception.ValidationException;
import com.devshawn.kafka.gitops.exception.WritePlanOutputException;
import com.devshawn.kafka.gitops.service.KafkaService;
import com.devshawn.kafka.gitops.util.PlanUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -47,37 +49,74 @@ public PlanManager(ManagerConfig managerConfig, KafkaService kafkaService, Objec
}

public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPlan) {
List<TopicListing> topics = kafkaService.getTopics();
List<String> topicNames = topics.stream().map(TopicListing::name).collect(Collectors.toList());
Map<String, TopicDescription> topics = kafkaService.getTopics();
List<String> topicNames = topics.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
Map<String, List<ConfigEntry>> topicConfigs = fetchTopicConfigurations(topicNames);

desiredState.getTopics().forEach((key, value) -> {
TopicPlan.Builder topicPlan = new TopicPlan.Builder()
.setName(key)
.setTopicDetails(value);
TopicDetailsPlan.Builder topicDetailsPlan = new TopicDetailsPlan.Builder();
topicDetailsPlan.setPartitionsAction(PlanAction.NO_CHANGE)
.setReplicationAction(PlanAction.NO_CHANGE);

TopicPlan.Builder topicPlan = new TopicPlan.Builder()
.setName(key);
boolean topicDetailsAddOrUpdate = false;
if (!topicNames.contains(key)) {
log.info("[PLAN] Topic {} does not exist; it will be created.", key);
topicPlan.setAction(PlanAction.ADD);
topicDetailsPlan.setPartitionsAction(PlanAction.ADD)
.setPartitions(value.getPartitions())
.setReplicationAction(PlanAction.ADD)
.setReplication(value.getReplication().get());
planTopicConfigurations(key, value, topicConfigs.get(key), topicPlan);
topicDetailsAddOrUpdate = true;
} else {
log.info("[PLAN] Topic {} exists, it will not be created.", key);
TopicDescription topicDescription = topics.get(key);

topicPlan.setAction(PlanAction.NO_CHANGE);
topicDetailsPlan.setPartitions(topicDescription.partitions().size())
.setReplication(topicDescription.partitions().get(0).replicas().size());

if (value.getPartitions().intValue() != topicDescription.partitions().size()) {
if( value.getPartitions().intValue() < topicDescription.partitions().size()) {
throw new ValidationException("Removing the partition number is not supported by Apache Kafka "
+ "(topic: " + key + " ("+topicDescription.partitions().size()+" -> "+value.getPartitions().intValue()+"))");
}
topicDetailsPlan.setPartitions(value.getPartitions())
.setPreviousPartitions(topicDescription.partitions().size());
topicDetailsPlan.setPartitionsAction(PlanAction.UPDATE);
topicDetailsAddOrUpdate = true;
}
if (value.getReplication().isPresent() &&
( value.getReplication().get().intValue() != topicDescription.partitions().get(0).replicas().size()) ) {
topicDetailsPlan.setReplication(value.getReplication().get())
.setPreviousReplication(topicDescription.partitions().get(0).replicas().size());
topicDetailsPlan.setReplicationAction(PlanAction.UPDATE);
topicDetailsAddOrUpdate = true;
}
if (topicDetailsAddOrUpdate) {
topicPlan.setAction(PlanAction.UPDATE);
}

planTopicConfigurations(key, value, topicConfigs.get(key), topicPlan);
}

topicPlan.setTopicDetailsPlan(topicDetailsPlan.build());

desiredPlan.addTopicPlans(topicPlan.build());
});

topics.forEach(currentTopic -> {
boolean shouldIgnore = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
topics.forEach((currentTopicName, currentTopicDescription) -> {
boolean shouldIgnore = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopicName.startsWith(it));
if (shouldIgnore) {
log.info("[PLAN] Ignoring topic {} due to prefix", currentTopic.name());
log.info("[PLAN] Ignoring topic {} due to prefix", currentTopicName);
return;
}

if (!managerConfig.isDeleteDisabled() && desiredState.getTopics().getOrDefault(currentTopic.name(), null) == null) {
if (!managerConfig.isDeleteDisabled() && desiredState.getTopics().getOrDefault(currentTopicName, null) == null) {
TopicPlan topicPlan = new TopicPlan.Builder()
.setName(currentTopic.name())
.setName(currentTopicName)
.setAction(PlanAction.REMOVE)
.build();

Expand All @@ -88,7 +127,7 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla

private void planTopicConfigurations(String topicName, TopicDetails topicDetails, List<ConfigEntry> configs, TopicPlan.Builder topicPlan) {
Map<String, TopicConfigPlan> configPlans = new HashMap<>();
List<ConfigEntry> customConfigs = configs.stream()
List<ConfigEntry> customConfigs = configs == null ? new ArrayList<>() : configs.stream()
.filter(it -> it.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
.collect(Collectors.toList());

Expand All @@ -104,8 +143,11 @@ private void planTopicConfigurations(String topicName, TopicDetails topicDetails
configPlans.put(currentConfig.name(), topicConfigPlan.build());
} else if (newConfig == null) {
topicConfigPlan.setAction(PlanAction.REMOVE);
topicConfigPlan.setPreviousValue(currentConfig.value());
configPlans.put(currentConfig.name(), topicConfigPlan.build());
topicPlan.setAction(PlanAction.UPDATE);
if (topicPlan.getAction() == null || topicPlan.getAction().equals(PlanAction.NO_CHANGE)) {
topicPlan.setAction(PlanAction.UPDATE);
}
}
});

Expand All @@ -119,11 +161,16 @@ private void planTopicConfigurations(String topicName, TopicDetails topicDetails
if (currentConfig == null) {
topicConfigPlan.setAction(PlanAction.ADD);
configPlans.put(key, topicConfigPlan.build());
topicPlan.setAction(PlanAction.UPDATE);
if (topicPlan.getAction() == null || topicPlan.getAction().equals(PlanAction.NO_CHANGE)) {
topicPlan.setAction(PlanAction.UPDATE);
}
} else if (!currentConfig.value().equals(value)) {
topicConfigPlan.setAction(PlanAction.UPDATE);
topicConfigPlan.setPreviousValue(currentConfig.value())
.setAction(PlanAction.UPDATE);
configPlans.put(key, topicConfigPlan.build());
topicPlan.setAction(PlanAction.UPDATE);
if (topicPlan.getAction() == null || topicPlan.getAction().equals(PlanAction.NO_CHANGE)) {
topicPlan.setAction(PlanAction.UPDATE);
}
}
});

Expand Down Expand Up @@ -203,7 +250,7 @@ public void writePlanToFile(DesiredPlan desiredPlan) {
writer.write(objectMapper.writeValueAsString(outputPlan));
writer.close();
} catch (IOException ex) {
throw new WritePlanOutputException(ex.getMessage());
throw new WritePlanOutputException(ex.getMessage() + " ('" + managerConfig.getPlanFile().get() + "')");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ public ServiceAccount createServiceAccount(String name, boolean isUser) {
}

public static String execCmd(String[] cmd) throws java.io.IOException {
java.util.Scanner s = new java.util.Scanner(Runtime.getRuntime().exec(cmd).getInputStream()).useDelimiter("\\A");
return s.hasNext() ? s.next() : "";
try (java.util.Scanner s = new java.util.Scanner(Runtime.getRuntime().exec(cmd).getInputStream()).useDelimiter("\\A");) {
return s.hasNext() ? s.next() : "";
}
}

static {
Expand Down
Loading

0 comments on commit 4309147

Please sign in to comment.