Skip to content

Commit

Permalink
[MO] - replace EmbeddedKafkaCluster with Strimzi test container (3/3)…
Browse files Browse the repository at this point in the history
… - topic module

Signed-off-by: morsak <xorsak02@stud.fit.vutbr.cz>
  • Loading branch information
morsak committed Feb 9, 2022
1 parent 49989aa commit 3153cd0
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 95 deletions.
43 changes: 1 addition & 42 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
<module>api</module>
<module>mockkube</module>
<module>config-model</module>
<module>config-model-generator</module>
<!-- <module>config-model-generator</module>-->
<module>operator-common</module>
<module>topic-operator</module>
<module>cluster-operator</module>
Expand Down Expand Up @@ -587,47 +587,6 @@
<artifactId>strimzi-test-container</artifactId>
<version>${strimzi-test-container.version}</version>
</dependency>
<!-- Used by EmbeddedKafkaCluster -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- ^^^^^ Used by EmbeddedKafkaCluster -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
import java.util.Properties;
import java.util.Set;

import io.strimzi.test.container.StrimziKafkaContainer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;

public class KafkaStreamsTopicStoreTest extends TopicStoreTestBase {
private static final Map<String, String> MANDATORY_CONFIG;
private static EmbeddedKafkaCluster cluster;
private static StrimziKafkaContainer kafkaContainer;

static {
MANDATORY_CONFIG = new HashMap<>();
Expand Down Expand Up @@ -60,11 +60,12 @@ static KafkaStreamsTopicStoreService service(Map<String, String> configMap) thro

@BeforeAll
public static void before() throws Exception {
cluster = new EmbeddedKafkaCluster(1);
cluster.start();
kafkaContainer = new StrimziKafkaContainer()
.withBrokerId(1);
kafkaContainer.start();

MANDATORY_CONFIG.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, cluster.bootstrapServers());
MANDATORY_CONFIG.put(Config.ZOOKEEPER_CONNECT.key, cluster.zKConnectString());
MANDATORY_CONFIG.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaContainer.getBootstrapServers());
MANDATORY_CONFIG.put(Config.ZOOKEEPER_CONNECT.key, "zookeeper:2181");

service = service(Collections.emptyMap());
}
Expand All @@ -75,7 +76,7 @@ public static void after() {
service.stop();
}

cluster.stop();
kafkaContainer.stop();
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.strimzi.api.kafka.model.status.Condition;
import io.strimzi.api.kafka.model.status.KafkaTopicStatus;
import io.strimzi.test.TestUtils;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.strimzi.test.k8s.KubeClusterResource;
import io.strimzi.test.k8s.cluster.KubeCluster;
import io.strimzi.test.k8s.exceptions.NoClusterException;
Expand All @@ -34,7 +35,6 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -136,12 +136,12 @@ public static void teardownKubeCluster() {
}
}

public void setup(EmbeddedKafkaCluster kafkaCluster) throws Exception {
public void setup(StrimziKafkaCluster kafkaCluster) throws Exception {
LOGGER.info("Setting up test");
cluster.cluster();

Properties p = new Properties();
p.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.bootstrapServers());
p.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
adminClient = AdminClient.create(p);

kubeClient = kubeClient().getClient();
Expand Down Expand Up @@ -204,7 +204,7 @@ public void teardown(boolean deletionEnabled) throws InterruptedException, Timeo
latch.await(30, TimeUnit.SECONDS);
}

protected void startTopicOperator(EmbeddedKafkaCluster kafkaCluster) throws InterruptedException, ExecutionException, TimeoutException {
protected void startTopicOperator(StrimziKafkaCluster kafkaCluster) throws InterruptedException, ExecutionException, TimeoutException {

LOGGER.info("Starting Topic Operator");
session = new Session(kubeClient, new Config(topicOperatorConfig(kafkaCluster)));
Expand All @@ -222,10 +222,11 @@ protected void startTopicOperator(EmbeddedKafkaCluster kafkaCluster) throws Inte
LOGGER.info("Started Topic Operator");
}

protected Map<String, String> topicOperatorConfig(EmbeddedKafkaCluster kafkaCluster) {
protected Map<String, String> topicOperatorConfig(StrimziKafkaCluster kafkaCluster) {
Map<String, String> m = new HashMap<>();
m.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaCluster.bootstrapServers());
m.put(Config.ZOOKEEPER_CONNECT.key, kafkaCluster.zKConnectString());
m.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaCluster.getBootstrapServers());
// TODO: check this one..
m.put(Config.ZOOKEEPER_CONNECT.key, kafkaCluster.getZookeeper().getHost() + ":" + kafkaCluster.getZookeeper().getFirstMappedPort());
m.put(Config.ZOOKEEPER_CONNECTION_TIMEOUT_MS.key, "30000");
m.put(Config.NAMESPACE.key, NAMESPACE);
m.put(Config.CLIENT_ID.key, CLIENTID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

Expand All @@ -24,11 +23,11 @@
import io.fabric8.kubernetes.client.WatcherException;
import io.strimzi.api.kafka.model.KafkaTopic;
import io.strimzi.api.kafka.model.KafkaTopicBuilder;
import io.strimzi.test.container.StrimziKafkaCluster;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -46,11 +45,11 @@

public class TopicOperatorIT extends TopicOperatorBaseIT {
private static final Logger LOGGER = LogManager.getLogger(TopicOperatorIT.class);
protected static EmbeddedKafkaCluster kafkaCluster;
protected static StrimziKafkaCluster kafkaCluster;

@BeforeAll
public static void beforeAll() throws IOException {
kafkaCluster = new EmbeddedKafkaCluster(numKafkaBrokers(), kafkaClusterConfig());
kafkaCluster = new StrimziKafkaCluster(numKafkaBrokers(), numKafkaBrokers(), kafkaClusterConfig());
kafkaCluster.start();

setupKubeCluster();
Expand All @@ -76,9 +75,10 @@ protected static int numKafkaBrokers() {
return 1;
}

protected static Properties kafkaClusterConfig() {
Properties p = new Properties();
p.setProperty(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), "false");
protected static Map<String, String> kafkaClusterConfig() {
Map<String, String> p = new HashMap<>();
p.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), "false");
p.put("zookeeper.connect", "zookeeper:2181");
return p;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.strimzi.api.kafka.model.KafkaTopicBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.strimzi.test.mockkube.MockKube;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand All @@ -24,7 +25,6 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -34,6 +34,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -53,7 +54,7 @@ public class TopicOperatorMockTest {

private KubernetesClient kubeClient;
private Session session;
private EmbeddedKafkaCluster cluster;
private StrimziKafkaCluster kafkaCluster;
private static Vertx vertx;
private String deploymentId;
private AdminClient adminClient;
Expand Down Expand Up @@ -83,19 +84,21 @@ public void setup(VertxTestContext context) throws Exception {
//Create cluster in @BeforeEach instead of @BeforeAll as once the checkpoints causing premature success were fixed,
//tests were failing due to topic "my-topic" already existing, and trying to delete the topics at the end of the test was timing out occasionally.
//So works best when the cluster is recreated for each test to avoid shared state
cluster = new EmbeddedKafkaCluster(1);
cluster.start();
Map<String, String> config = new HashMap<>();
config.put("zookeeper.connect", "zookeeper:2181");
kafkaCluster = new StrimziKafkaCluster(1, 1, config);
kafkaCluster.start();

MockKube mockKube = new MockKube();
mockKube.withCustomResourceDefinition(Crds.kafkaTopic(),
KafkaTopic.class, KafkaTopicList.class, KafkaTopic::getStatus, KafkaTopic::setStatus);

kubeClient = mockKube.build();
adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()));
adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()));

Config topicConfig = new Config(Map.of(
Config.KAFKA_BOOTSTRAP_SERVERS.key, cluster.bootstrapServers(),
Config.ZOOKEEPER_CONNECT.key, cluster.zKConnectString(),
Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaCluster.getBootstrapServers(),
Config.ZOOKEEPER_CONNECT.key, kafkaCluster.getZookeeper().getHost() + ":" + kafkaCluster.getZookeeper().getFirstMappedPort(),
Config.ZOOKEEPER_CONNECTION_TIMEOUT_MS.key, "30000",
Config.NAMESPACE.key, "myproject",
Config.CLIENT_ID.key, "myproject-client-id",
Expand Down Expand Up @@ -153,7 +156,7 @@ public void tearDown(VertxTestContext context) {
adminClient.close();
}

cluster.stop();
kafkaCluster.stop();

context.completeNow();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.KafkaTopic;
import io.strimzi.api.kafka.model.KafkaTopicBuilder;
import io.strimzi.test.container.StrimziKafkaCluster;
import kafka.admin.ReassignPartitionsCommand;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -23,8 +23,8 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

Expand All @@ -34,11 +34,11 @@

public class TopicOperatorReplicationIT extends TopicOperatorBaseIT {
private static final Logger LOGGER = LogManager.getLogger(TopicOperatorReplicationIT.class);
protected static EmbeddedKafkaCluster kafkaCluster;
protected static StrimziKafkaCluster kafkaCluster;

@BeforeAll
public static void beforeAll() throws IOException {
kafkaCluster = new EmbeddedKafkaCluster(numKafkaBrokers(), kafkaClusterConfig());
kafkaCluster = new StrimziKafkaCluster(numKafkaBrokers(), numKafkaBrokers(), kafkaClusterConfig());
kafkaCluster.start();

setupKubeCluster();
Expand All @@ -64,12 +64,14 @@ protected static int numKafkaBrokers() {
return 2;
}

protected static Properties kafkaClusterConfig() {
return new Properties();
protected static Map<String, String> kafkaClusterConfig() {
Map<String, String> p = new HashMap<>();
p.put("zookeeper.connect", "zookeeper:2181");
return p;
}

@Override
protected Map<String, String> topicOperatorConfig(EmbeddedKafkaCluster kafkaCluster) {
protected Map<String, String> topicOperatorConfig(StrimziKafkaCluster kafkaCluster) {
Map<String, String> m = super.topicOperatorConfig(kafkaCluster);
m.put(Config.FULL_RECONCILIATION_INTERVAL_MS.key, "20000");
return m;
Expand Down Expand Up @@ -112,7 +114,7 @@ public void testKafkaTopicModifiedChangedReplication() throws Exception {

// Now change it in Kafka
String reassignmentOutput = doReassignmentCommand(
"--bootstrap-server", kafkaCluster.bootstrapServers(),
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--reassignment-json-file", file.getAbsolutePath(),
"--execute");

Expand All @@ -121,7 +123,7 @@ public void testKafkaTopicModifiedChangedReplication() throws Exception {
LOGGER.info("Waiting for reassignment completion");
waitFor(() -> {
String output = doReassignmentCommand(
"--bootstrap-server", kafkaCluster.bootstrapServers(),
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--reassignment-json-file", file.getAbsolutePath(),
"--verify");
LOGGER.info(output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@
package io.strimzi.operator.topic;

import io.strimzi.api.kafka.model.KafkaTopic;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.vertx.core.Future;
import kafka.server.KafkaConfig$;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Properties;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class TopicOperatorTopicDeletionDisabledIT extends TopicOperatorBaseIT {
protected static EmbeddedKafkaCluster kafkaCluster;
protected static StrimziKafkaCluster kafkaCluster;

@BeforeAll
public static void beforeAll() throws IOException {
kafkaCluster = new EmbeddedKafkaCluster(numKafkaBrokers(), kafkaClusterConfig());
kafkaCluster = new StrimziKafkaCluster(numKafkaBrokers(), numKafkaBrokers(), kafkaClusterConfig());
kafkaCluster.start();

setupKubeCluster();
Expand All @@ -50,10 +51,11 @@ protected static int numKafkaBrokers() {
return 1;
}

protected static Properties kafkaClusterConfig() {
Properties config = new Properties();
config.setProperty(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), "false");
return config;
protected static Map<String, String> kafkaClusterConfig() {
Map<String, String> p = new HashMap<>();
p.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), "false");
p.put("zookeeper.connect", "zookeeper:2181");
return p;
}

@Test
Expand Down
Loading

0 comments on commit 3153cd0

Please sign in to comment.