Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MO] - replace EmbeddedKafkaCluster with Strimzi test container #6337

Merged
merged 5 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 2 additions & 27 deletions cluster-operator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,36 +191,11 @@
<artifactId>connect-api</artifactId>
<scope>test</scope>
</dependency>
<!-- Used by EmbeddedKafkaCluster -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<classifier>test</classifier>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<scope>test</scope>
</dependency>
<!-- ^^^^^ Used by EmbeddedKafkaCluster -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.strimzi.operator.common.model.OrderedProperties;
import io.strimzi.test.TestUtils;
import io.strimzi.test.annotations.IsolatedTest;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
Expand All @@ -28,7 +29,6 @@
import io.vertx.junit5.VertxTestContext;
import org.apache.kafka.connect.cli.ConnectDistributed;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -50,7 +50,7 @@

@ExtendWith(VertxExtension.class)
public class KafkaConnectApiTest {
private static EmbeddedKafkaCluster cluster;
private static StrimziKafkaCluster cluster;
private static Vertx vertx;
private Connect connect;
private static final int PORT = 18083;
Expand All @@ -70,7 +70,7 @@ public void beforeEach() throws IOException, InterruptedException {
workerProps.put("offset.storage.topic", getClass().getSimpleName() + "-offsets");
workerProps.put("config.storage.topic", getClass().getSimpleName() + "-config");
workerProps.put("status.storage.topic", getClass().getSimpleName() + "-status");
workerProps.put("bootstrap.servers", cluster.bootstrapServers());
workerProps.put("bootstrap.servers", cluster.getBootstrapServers());
//DistributedConfig config = new DistributedConfig(workerProps);
//RestServer rest = new RestServer(config);
//rest.initializeServer();
Expand All @@ -97,8 +97,9 @@ public void afterEach() {
@BeforeAll
public static void before() throws IOException {
vertx = Vertx.vertx();

cluster = new EmbeddedKafkaCluster(3);
final Map<String, String> kafkaClusterConfiguration = new HashMap<>();
kafkaClusterConfiguration.put("zookeeper.connect", "zookeeper:2181");
cluster = new StrimziKafkaCluster(3, 1, kafkaClusterConfiguration);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Map.of to make this a 1 liner?

cluster.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.strimzi.operator.common.MicrometerMetricsProvider;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.operator.resource.CrdOperator;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.strimzi.test.mockkube.MockKube;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand All @@ -29,7 +30,6 @@
import io.vertx.junit5.VertxTestContext;
import io.vertx.micrometer.MicrometerMetricsOptions;
import io.vertx.micrometer.VertxPrometheusOptions;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -40,6 +40,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -55,7 +56,7 @@

@ExtendWith(VertxExtension.class)
public class KafkaConnectorIT {
private static EmbeddedKafkaCluster cluster;
private static StrimziKafkaCluster cluster;
private static Vertx vertx;
private ConnectCluster connectCluster;

Expand All @@ -67,7 +68,9 @@ public static void before() throws IOException {
.setEnabled(true)
));

cluster = new EmbeddedKafkaCluster(3);
final Map<String, String> kafkaClusterConfiguration = new HashMap<>();
kafkaClusterConfiguration.put("zookeeper.connect", "zookeeper:2181");
cluster = new StrimziKafkaCluster(3, 1, kafkaClusterConfiguration);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, Map.of

cluster.start();
}

Expand All @@ -83,7 +86,7 @@ public void beforeEach() throws IOException, InterruptedException {

// Start a 3 node connect cluster
connectCluster = new ConnectCluster()
.usingBrokers(cluster.bootstrapServers())
.usingBrokers(cluster.getBootstrapServers())
.addConnectNodes(3);
connectCluster.startup();
}
Expand Down
43 changes: 4 additions & 39 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
<opentracing.version>0.33.0</opentracing.version>
<opentracing-kafka.version>0.1.13</opentracing-kafka.version>
<strimzi-oauth.version>0.10.0</strimzi-oauth.version>
<strimzi-test-container.version>0.100.0</strimzi-test-container.version>
<commons-codec.version>1.13</commons-codec.version>
<registry.version>1.3.2.Final</registry.version>
<javax.json-api.version>1.1.4</javax.json-api.version>
Expand Down Expand Up @@ -581,47 +582,11 @@
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Used by EmbeddedKafkaCluster -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>*</artifactId>
<groupId>*</groupId>
</exclusion>
</exclusions>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>${strimzi-test-container.version}</version>
see-quick marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
<!-- ^^^^^ Used by EmbeddedKafkaCluster -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
20 changes: 2 additions & 18 deletions topic-operator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,10 @@
<groupId>io.strimzi</groupId>
<artifactId>test</artifactId>
</dependency>
<!-- Used by EmbeddedKafkaCluster -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<classifier>test</classifier>
<scope>test</scope>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
see-quick marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
<!-- ^^^^^ Used by EmbeddedKafkaCluster -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
import java.util.Properties;
import java.util.Set;

import io.strimzi.test.container.StrimziKafkaContainer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;

public class KafkaStreamsTopicStoreTest extends TopicStoreTestBase {
private static final Map<String, String> MANDATORY_CONFIG;
private static EmbeddedKafkaCluster cluster;
private static StrimziKafkaContainer kafkaContainer;

static {
MANDATORY_CONFIG = new HashMap<>();
Expand Down Expand Up @@ -60,11 +60,12 @@ static KafkaStreamsTopicStoreService service(Map<String, String> configMap) thro

@BeforeAll
public static void before() throws Exception {
cluster = new EmbeddedKafkaCluster(1);
cluster.start();
kafkaContainer = new StrimziKafkaContainer()
.withBrokerId(1);
kafkaContainer.start();

MANDATORY_CONFIG.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, cluster.bootstrapServers());
MANDATORY_CONFIG.put(Config.ZOOKEEPER_CONNECT.key, cluster.zKConnectString());
MANDATORY_CONFIG.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaContainer.getBootstrapServers());
MANDATORY_CONFIG.put(Config.ZOOKEEPER_CONNECT.key, "zookeeper:2181");

service = service(Collections.emptyMap());
}
Expand All @@ -75,7 +76,7 @@ public static void after() {
service.stop();
}

cluster.stop();
kafkaContainer.stop();
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.strimzi.api.kafka.model.status.Condition;
import io.strimzi.api.kafka.model.status.KafkaTopicStatus;
import io.strimzi.test.TestUtils;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.strimzi.test.k8s.KubeClusterResource;
import io.strimzi.test.k8s.cluster.KubeCluster;
import io.strimzi.test.k8s.exceptions.NoClusterException;
Expand All @@ -34,7 +35,6 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -136,12 +136,12 @@ public static void teardownKubeCluster() {
}
}

public void setup(EmbeddedKafkaCluster kafkaCluster) throws Exception {
public void setup(StrimziKafkaCluster kafkaCluster) throws Exception {
LOGGER.info("Setting up test");
cluster.cluster();

Properties p = new Properties();
p.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.bootstrapServers());
p.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
adminClient = AdminClient.create(p);

kubeClient = kubeClient().getClient();
Expand Down Expand Up @@ -204,7 +204,7 @@ public void teardown(boolean deletionEnabled) throws InterruptedException, Timeo
latch.await(30, TimeUnit.SECONDS);
}

protected void startTopicOperator(EmbeddedKafkaCluster kafkaCluster) throws InterruptedException, ExecutionException, TimeoutException {
protected void startTopicOperator(StrimziKafkaCluster kafkaCluster) throws InterruptedException, ExecutionException, TimeoutException {

LOGGER.info("Starting Topic Operator");
session = new Session(kubeClient, new Config(topicOperatorConfig(kafkaCluster)));
Expand All @@ -222,10 +222,10 @@ protected void startTopicOperator(EmbeddedKafkaCluster kafkaCluster) throws Inte
LOGGER.info("Started Topic Operator");
}

protected Map<String, String> topicOperatorConfig(EmbeddedKafkaCluster kafkaCluster) {
protected Map<String, String> topicOperatorConfig(StrimziKafkaCluster kafkaCluster) {
Map<String, String> m = new HashMap<>();
m.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaCluster.bootstrapServers());
m.put(Config.ZOOKEEPER_CONNECT.key, kafkaCluster.zKConnectString());
m.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaCluster.getBootstrapServers());
m.put(Config.ZOOKEEPER_CONNECT.key, kafkaCluster.getZookeeper().getHost() + ":" + kafkaCluster.getZookeeper().getFirstMappedPort());
m.put(Config.ZOOKEEPER_CONNECTION_TIMEOUT_MS.key, "30000");
m.put(Config.NAMESPACE.key, NAMESPACE);
m.put(Config.CLIENT_ID.key, CLIENTID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

Expand All @@ -24,11 +23,11 @@
import io.fabric8.kubernetes.client.WatcherException;
import io.strimzi.api.kafka.model.KafkaTopic;
import io.strimzi.api.kafka.model.KafkaTopicBuilder;
import io.strimzi.test.container.StrimziKafkaCluster;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -46,11 +45,11 @@

public class TopicOperatorIT extends TopicOperatorBaseIT {
private static final Logger LOGGER = LogManager.getLogger(TopicOperatorIT.class);
protected static EmbeddedKafkaCluster kafkaCluster;
protected static StrimziKafkaCluster kafkaCluster;

@BeforeAll
public static void beforeAll() throws IOException {
kafkaCluster = new EmbeddedKafkaCluster(numKafkaBrokers(), kafkaClusterConfig());
kafkaCluster = new StrimziKafkaCluster(numKafkaBrokers(), numKafkaBrokers(), kafkaClusterConfig());
kafkaCluster.start();

setupKubeCluster();
Expand All @@ -76,9 +75,10 @@ protected static int numKafkaBrokers() {
return 1;
}

protected static Properties kafkaClusterConfig() {
Properties p = new Properties();
p.setProperty(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), "false");
protected static Map<String, String> kafkaClusterConfig() {
Map<String, String> p = new HashMap<>();
p.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), "false");
p.put("zookeeper.connect", "zookeeper:2181");
Comment on lines +79 to +81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map.of

return p;
}

Expand Down
Loading