Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MO] - optimize unnecessary 3-node cluster in ITs #6400

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ public void startup() throws InterruptedException {
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter.schemas.enable", "false");
workerProps.put("offset.storage.topic", getClass().getSimpleName() + "-offsets");
workerProps.put("offset.storage.replication.factor", "3");
workerProps.put("offset.storage.replication.factor", "1");
workerProps.put("config.storage.topic", getClass().getSimpleName() + "-config");
workerProps.put("config.storage.replication.factor", "3");
workerProps.put("config.storage.replication.factor", "1");
workerProps.put("status.storage.topic", getClass().getSimpleName() + "-status");
workerProps.put("status.storage.replication.factor", "3");
workerProps.put("status.storage.replication.factor", "1");
workerProps.put("bootstrap.servers", brokerList);
//DistributedConfig config = new DistributedConfig(workerProps);
//RestServer rest = new RestServer(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.strimzi.operator.common.model.OrderedProperties;
import io.strimzi.test.TestUtils;
import io.strimzi.test.annotations.IsolatedTest;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.strimzi.test.container.StrimziKafkaContainer;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
Expand Down Expand Up @@ -50,7 +50,7 @@

@ExtendWith(VertxExtension.class)
public class KafkaConnectApiTest {
private static StrimziKafkaCluster cluster;
private static StrimziKafkaContainer kafkaContainer;
private static Vertx vertx;
private Connect connect;
private static final int PORT = 18083;
Expand All @@ -70,7 +70,10 @@ public void beforeEach() throws IOException, InterruptedException {
workerProps.put("offset.storage.topic", getClass().getSimpleName() + "-offsets");
workerProps.put("config.storage.topic", getClass().getSimpleName() + "-config");
workerProps.put("status.storage.topic", getClass().getSimpleName() + "-status");
workerProps.put("bootstrap.servers", cluster.getBootstrapServers());
workerProps.put("config.storage.replication.factor", "1");
workerProps.put("offset.storage.replication.factor", "1");
workerProps.put("status.storage.replication.factor", "1");
workerProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
//DistributedConfig config = new DistributedConfig(workerProps);
//RestServer rest = new RestServer(config);
//rest.initializeServer();
Expand Down Expand Up @@ -99,13 +102,13 @@ public static void before() throws IOException {
vertx = Vertx.vertx();
final Map<String, String> kafkaClusterConfiguration = new HashMap<>();
kafkaClusterConfiguration.put("zookeeper.connect", "zookeeper:2181");
cluster = new StrimziKafkaCluster(3, 1, kafkaClusterConfiguration);
cluster.start();
kafkaContainer = new StrimziKafkaContainer();
kafkaContainer.start();
}

@AfterAll
public static void after() {
cluster.stop();
kafkaContainer.stop();
vertx.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.strimzi.operator.common.MicrometerMetricsProvider;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.operator.resource.CrdOperator;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.strimzi.test.container.StrimziKafkaContainer;
import io.strimzi.test.mockkube.MockKube;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand All @@ -40,7 +40,6 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -56,7 +55,7 @@

@ExtendWith(VertxExtension.class)
public class KafkaConnectorIT {
private static StrimziKafkaCluster cluster;
private static StrimziKafkaContainer kafkaContainer;
private static Vertx vertx;
private ConnectCluster connectCluster;

Expand All @@ -68,15 +67,13 @@ public static void before() throws IOException {
.setEnabled(true)
));

final Map<String, String> kafkaClusterConfiguration = new HashMap<>();
kafkaClusterConfiguration.put("zookeeper.connect", "zookeeper:2181");
cluster = new StrimziKafkaCluster(3, 1, kafkaClusterConfiguration);
cluster.start();
kafkaContainer = new StrimziKafkaContainer();
kafkaContainer.start();
}

@AfterAll
public static void after() {
cluster.stop();
kafkaContainer.stop();
vertx.close();
}

Expand All @@ -86,7 +83,7 @@ public void beforeEach() throws IOException, InterruptedException {

// Start a 3 node connect cluster
connectCluster = new ConnectCluster()
.usingBrokers(cluster.getBootstrapServers())
.usingBrokers(kafkaContainer.getBootstrapServers())
.addConnectNodes(3);
connectCluster.startup();
}
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
<opentracing-kafka.version>0.1.13</opentracing-kafka.version>
<strimzi-oauth.version>0.10.0</strimzi-oauth.version>
<strimzi-test-container.version>0.101.0</strimzi-test-container.version>
<test-container.version>1.15.2</test-container.version>
<commons-codec.version>1.13</commons-codec.version>
<registry.version>1.3.2.Final</registry.version>
<javax.json-api.version>1.1.4</javax.json-api.version>
Expand Down Expand Up @@ -586,6 +587,12 @@
<artifactId>strimzi-test-container</artifactId>
<version>${strimzi-test-container.version}</version>
</dependency>
<!-- Needed because of Startable Interface -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${test-container.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions topic-operator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.strimzi.api.kafka.model.status.KafkaTopicStatus;
import io.strimzi.test.TestUtils;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.strimzi.test.container.StrimziKafkaContainer;
import io.strimzi.test.k8s.KubeClusterResource;
import io.strimzi.test.k8s.cluster.KubeCluster;
import io.strimzi.test.k8s.exceptions.NoClusterException;
Expand All @@ -39,6 +40,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.lifecycle.Startable;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -74,7 +76,7 @@ public abstract class TopicOperatorBaseIT {

private static final Logger LOGGER = LogManager.getLogger(TopicOperatorBaseIT.class);

private static KubeClusterResource cluster;
protected static KubeClusterResource cluster;

protected static String oldNamespace;

Expand Down Expand Up @@ -140,12 +142,22 @@ public static void teardownKubeCluster() {
}
}

public void setup(StrimziKafkaCluster kafkaCluster) throws Exception {
public <T extends Startable> void setup(final T strimziContainer) throws Exception {
LOGGER.info("Setting up test");
cluster.cluster();

Properties p = new Properties();
p.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());

if (strimziContainer instanceof StrimziKafkaCluster) {
final StrimziKafkaCluster kCluster = ((StrimziKafkaCluster) strimziContainer);
p.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kCluster.getBootstrapServers());
} else if (strimziContainer instanceof StrimziKafkaContainer) {
final StrimziKafkaContainer kContainer = ((StrimziKafkaContainer) strimziContainer);
p.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kContainer.getBootstrapServers());
} else {
throw new RuntimeException("Using not supported known type of Strimzi test container");
}

adminClient = AdminClient.create(p);

kubeClient = kubeClient().getClient();
Expand Down Expand Up @@ -208,10 +220,10 @@ public void teardown(boolean deletionEnabled) throws InterruptedException, Timeo
latch.await(30, TimeUnit.SECONDS);
}

protected void startTopicOperator(StrimziKafkaCluster kafkaCluster) throws InterruptedException, ExecutionException, TimeoutException {
protected <T extends Startable> void startTopicOperator(final T strimziContainer) throws InterruptedException, ExecutionException, TimeoutException {

LOGGER.info("Starting Topic Operator");
session = new Session(kubeClient, new Config(topicOperatorConfig(kafkaCluster)));
session = new Session(kubeClient, new Config(topicOperatorConfig(strimziContainer)));

CompletableFuture<Void> async = new CompletableFuture<>();
vertx.deployVerticle(session, ar -> {
Expand All @@ -226,10 +238,21 @@ protected void startTopicOperator(StrimziKafkaCluster kafkaCluster) throws Inter
LOGGER.info("Started Topic Operator");
}

protected Map<String, String> topicOperatorConfig(StrimziKafkaCluster kafkaCluster) {
Map<String, String> m = new HashMap<>();
m.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaCluster.getBootstrapServers());
m.put(Config.ZOOKEEPER_CONNECT.key, kafkaCluster.getZookeeper().getHost() + ":" + kafkaCluster.getZookeeper().getFirstMappedPort());
protected <T extends Startable> Map<String, String> topicOperatorConfig(final T strimziContainer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need the type parameter? Can't the value parameter have type Startable?

It any case it feels a bit rubbish to require a dependency on testcontainers. We don't actually need any of the methods of Startable, we're just using it because it's a convenient supertype of both StrimziKafkaCluster and StrimziKafkaContainer. Do you think it's worth having a common superinterface for StrimziKafkaCluster and StrimziKafkaContainer in strimzi-test-container, to allow precisely this kind of abstraction in users of strimzi-test-container? It would also be more typesafe, since trying to call this method with any old Startable would then be a compile time error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And looking at how you're using strimziContainer it seems this interface could just expose a getBootstrapServers and getZookeeperConnectString() (and maybe some methods for the kraft case?) thus avoiding the instanceof entirely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this is a good idea to create an interface for such containers, but it would mean another release to get this PR merged. But overall I think it could wait until next release of test container :)

final Map<String, String> m = new HashMap<>();

if (strimziContainer instanceof StrimziKafkaCluster) {
final StrimziKafkaCluster kCluster = ((StrimziKafkaCluster) strimziContainer);
m.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, kCluster.getBootstrapServers());
m.put(Config.ZOOKEEPER_CONNECT.key, kCluster.getZookeeper().getConnectString());
} else if (strimziContainer instanceof StrimziKafkaContainer) {
final StrimziKafkaContainer kContainer = ((StrimziKafkaContainer) strimziContainer);
m.put(Config.KAFKA_BOOTSTRAP_SERVERS.key, kContainer.getBootstrapServers());
m.put(Config.ZOOKEEPER_CONNECT.key, kContainer.getInternalZooKeeperConnect());
} else {
throw new RuntimeException("Using not supported known type of Strimzi test container");
}

m.put(Config.ZOOKEEPER_CONNECTION_TIMEOUT_MS.key, "30000");
m.put(Config.NAMESPACE.key, NAMESPACE);
m.put(Config.CLIENT_ID.key, CLIENTID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.fabric8.kubernetes.client.WatcherException;
import io.strimzi.api.kafka.model.KafkaTopic;
import io.strimzi.api.kafka.model.KafkaTopicBuilder;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.strimzi.test.container.StrimziKafkaContainer;
import kafka.server.KafkaConfig$;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
Expand All @@ -43,26 +43,27 @@

public class TopicOperatorIT extends TopicOperatorBaseIT {
private static final Logger LOGGER = LogManager.getLogger(TopicOperatorIT.class);
protected static StrimziKafkaCluster kafkaCluster;
protected static StrimziKafkaContainer kafkaContainer;

@BeforeAll
public void beforeAll() throws Exception {
kafkaCluster = new StrimziKafkaCluster(numKafkaBrokers(), numKafkaBrokers(), kafkaClusterConfig());
kafkaCluster.start();
kafkaContainer = new StrimziKafkaContainer()
.withKafkaConfigurationMap(Map.of(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), "false"));
kafkaContainer.start();

setupKubeCluster();
setup(kafkaCluster);
setup(kafkaContainer);

LOGGER.info("Using namespace {}", NAMESPACE);
startTopicOperator(kafkaCluster);
startTopicOperator(kafkaContainer);
}

@AfterAll
public void afterAll() throws InterruptedException, ExecutionException, TimeoutException {
teardown(true);
teardownKubeCluster();
adminClient.close();
kafkaCluster.stop();
kafkaContainer.stop();
}

@AfterEach
Expand All @@ -75,13 +76,6 @@ protected static int numKafkaBrokers() {
return 1;
}

protected static Map<String, String> kafkaClusterConfig() {
Map<String, String> p = new HashMap<>();
p.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), "false");
p.put("zookeeper.connect", "zookeeper:2181");
return p;
}


@Test
public void testTopicAdded() throws Exception {
Expand Down Expand Up @@ -537,7 +531,7 @@ public void testReconciliationOnStartup() throws ExecutionException, Interrupted
}

// 4. Start TO
startTopicOperator(kafkaCluster);
startTopicOperator(kafkaContainer);

// 5. Verify topics A, X and Y exist on both sides
waitForTopicInKafka(topicNameA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.strimzi.api.kafka.model.KafkaTopicBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.test.container.StrimziKafkaCluster;
import io.strimzi.test.container.StrimziKafkaContainer;
import io.strimzi.test.mockkube.MockKube;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand All @@ -34,7 +34,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -54,7 +53,7 @@ public class TopicOperatorMockTest {

private KubernetesClient kubeClient;
private Session session;
private StrimziKafkaCluster kafkaCluster;
private StrimziKafkaContainer kafkaContainer;
private static Vertx vertx;
private String deploymentId;
private AdminClient adminClient;
Expand Down Expand Up @@ -84,21 +83,19 @@ public void setup(VertxTestContext context) throws Exception {
//Create cluster in @BeforeEach instead of @BeforeAll as once the checkpoints causing premature success were fixed,
//tests were failing due to topic "my-topic" already existing, and trying to delete the topics at the end of the test was timing out occasionally.
//So works best when the cluster is recreated for each test to avoid shared state
Map<String, String> config = new HashMap<>();
config.put("zookeeper.connect", "zookeeper:2181");
kafkaCluster = new StrimziKafkaCluster(1, 1, config);
kafkaCluster.start();
kafkaContainer = new StrimziKafkaContainer();
kafkaContainer.start();

MockKube mockKube = new MockKube();
mockKube.withCustomResourceDefinition(Crds.kafkaTopic(),
KafkaTopic.class, KafkaTopicList.class, KafkaTopic::getStatus, KafkaTopic::setStatus);

kubeClient = mockKube.build();
adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()));
adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()));

Config topicConfig = new Config(Map.of(
Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaCluster.getBootstrapServers(),
Config.ZOOKEEPER_CONNECT.key, kafkaCluster.getZookeeper().getHost() + ":" + kafkaCluster.getZookeeper().getFirstMappedPort(),
Config.KAFKA_BOOTSTRAP_SERVERS.key, kafkaContainer.getBootstrapServers(),
Config.ZOOKEEPER_CONNECT.key, kafkaContainer.getInternalZooKeeperConnect(),
Config.ZOOKEEPER_CONNECTION_TIMEOUT_MS.key, "30000",
Config.NAMESPACE.key, "myproject",
Config.CLIENT_ID.key, "myproject-client-id",
Expand Down Expand Up @@ -156,7 +153,7 @@ public void tearDown(VertxTestContext context) {
adminClient.close();
}

kafkaCluster.stop();
kafkaContainer.stop();

context.completeNow();
});
Expand Down
Loading