Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Udpate some unit and integration tests to use KRaft #10394

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,30 @@ public static String kafkaComponentName(String clusterName) {
}

/**
* Returns the name of the Kafka {@code Pod} for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @param podNum The number of the Kafka pod
* Returns the name of the Kafka {@code Pod} for a {@code Kafka} cluster not using {@code KafkaNodePool} resources.
*
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @param podNum The ordinal number of the Kafka pod
*
* @return The name of the corresponding Kafka {@code Pod}.
*/
public static String kafkaPodName(String clusterName, int podNum) {
return kafkaComponentName(clusterName) + "-" + podNum;
}

/**
* Returns the name of the Kafka {@code Pod} for a {@code Kafka} cluster using {@code KafkaNodePool} resources.
*
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource
* @param nodePoolName The {@code metadata.name} of the {@code KafkaNodePool} resource
* @param podNum The ordinal number of the Kafka pod
*
* @return The name of the corresponding Kafka {@code Pod}.
*/
public static String kafkaPodName(String clusterName, String nodePoolName, int podNum) {
return clusterName + "-" + nodePoolName + "-" + podNum;
}

/**
* Returns the name of the internal bootstrap {@code Service} for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.KafkaVersionTestUtils;
import io.strimzi.operator.cluster.PlatformFeaturesAvailability;
Expand Down Expand Up @@ -49,6 +52,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -57,7 +61,8 @@

@ExtendWith(VertxExtension.class)
public class JbodStorageMockTest {
private static final String NAME = "my-kafka";
private static final String NAME = "my-cluster";
private static final String NODE_POOL_NAME = "mixed";
private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup();

private static Vertx vertx;
Expand All @@ -66,7 +71,7 @@ public class JbodStorageMockTest {
private static MockKube3 mockKube;

private String namespace = "test-jbod-storage";
private Kafka kafka;
private KafkaNodePool kafkaNodePool;
private KafkaAssemblyOperator operator;
private StrimziPodSetController podSetController;

Expand Down Expand Up @@ -116,41 +121,47 @@ public void beforeEach(TestInfo testInfo) {
.withDeleteClaim(false)
.withSize("100Gi").build());

this.kafka = new KafkaBuilder()
Kafka kafka = new KafkaBuilder()
.withNewMetadata()
.withNamespace(namespace)
.withName(NAME)
.withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled", Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled"))
.endMetadata()
.withNewSpec()
.withNewKafka()
.withReplicas(3)
.withListeners(new GenericKafkaListenerBuilder()
.withName("plain")
.withPort(9092)
.withType(KafkaListenerType.INTERNAL)
.withTls(false)
.build())
.withNewJbodStorage()
.withVolumes(volumes)
.endJbodStorage()
.endKafka()
.withNewZookeeper()
.withReplicas(1)
.withNewEphemeralStorage()
.endEphemeralStorage()
.endZookeeper()
.endSpec()
.build();

Crds.kafkaOperation(client).inNamespace(namespace).resource(kafka).create();

this.kafkaNodePool = new KafkaNodePoolBuilder()
.withNewMetadata()
.withName(NODE_POOL_NAME)
.withNamespace(namespace)
.withLabels(Map.of(Labels.STRIMZI_CLUSTER_LABEL, NAME))
.withGeneration(1L)
.endMetadata()
.withNewSpec()
.withReplicas(3)
.withNewJbodStorage()
.withVolumes(volumes)
.endJbodStorage()
.withRoles(ProcessRoles.CONTROLLER, ProcessRoles.BROKER)
.endSpec()
.build();
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).resource(kafkaNodePool).create();

PlatformFeaturesAvailability pfa = new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION);
// creating the Kafka operator
ResourceOperatorSupplier ros =
new ResourceOperatorSupplier(JbodStorageMockTest.vertx, client,
ResourceUtils.zookeeperLeaderFinder(JbodStorageMockTest.vertx, client),
ResourceUtils.adminClientProvider(), ResourceUtils.zookeeperScalerProvider(), ResourceUtils.kafkaAgentClientProvider(),
ResourceUtils.metricsProvider(), ResourceUtils.zooKeeperAdminProvider(), pfa, 60_000L);
new ResourceOperatorSupplier(vertx, client, null, ResourceUtils.adminClientProvider(), null,
ResourceUtils.kafkaAgentClientProvider(), ResourceUtils.metricsProvider(), null, pfa, 60_000L);

podSetController = new StrimziPodSetController(namespace, Labels.EMPTY, ros.kafkaOperator, ros.connectOperator, ros.mirrorMaker2Operator, ros.strimziPodSetOperator, ros.podOperations, ros.metricsProvider, Integer.parseInt(ClusterOperatorConfig.POD_SET_CONTROLLER_WORK_QUEUE_SIZE.defaultValue()));
podSetController.start();
Expand All @@ -172,11 +183,10 @@ public void testJbodStorageCreatesPVCsMatchingKafkaVolumes(VertxTestContext cont
.onComplete(context.succeeding(v -> context.verify(() -> {
List<PersistentVolumeClaim> pvcs = getPvcs();

for (int i = 0; i < this.kafka.getSpec().getKafka().getReplicas(); i++) {
for (int i = 0; i < this.kafkaNodePool.getSpec().getReplicas(); i++) {
for (SingleVolumeStorage volume : this.volumes) {
if (volume instanceof PersistentClaimStorage) {

String expectedPvcName = VolumeUtils.createVolumePrefix(volume.getId(), true) + "-" + KafkaResources.kafkaPodName(NAME, i);
String expectedPvcName = VolumeUtils.createVolumePrefix(volume.getId(), true) + "-" + KafkaResources.kafkaPodName(NAME, NODE_POOL_NAME, i);
List<PersistentVolumeClaim> matchingPvcs = pvcs.stream()
.filter(pvc -> pvc.getMetadata().getName().equals(expectedPvcName))
.collect(Collectors.toList());
Expand Down Expand Up @@ -208,16 +218,14 @@ public void testReconcileWithNewVolumeAddedToJbodStorage(VertxTestContext contex
.withDeleteClaim(false)
.withSize("100Gi").build());

Kafka kafkaWithNewJbodVolume = new KafkaBuilder(kafka)
KafkaNodePool kafkaNodePoolWithNewJbodVolume = new KafkaNodePoolBuilder(kafkaNodePool)
.editSpec()
.editKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endSpec()
.build();

Set<String> expectedPvcs = expectedPvcs(kafka);
Set<String> expectedPvcsWithNewJbodStorageVolume = expectedPvcs(kafkaWithNewJbodVolume);
Set<String> expectedPvcs = expectedPvcs(kafkaNodePool);
Set<String> expectedPvcsWithNewJbodStorageVolume = expectedPvcs(kafkaNodePoolWithNewJbodVolume);

// reconcile for kafka cluster creation
operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME))
Expand All @@ -227,7 +235,7 @@ public void testReconcileWithNewVolumeAddedToJbodStorage(VertxTestContext contex
assertThat(pvcsNames, is(expectedPvcs));
})))
.compose(v -> {
Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithNewJbodVolume);
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName(NODE_POOL_NAME).patch(kafkaNodePoolWithNewJbodVolume);
// reconcile kafka cluster with new Jbod storage
return operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME));
})
Expand All @@ -248,16 +256,14 @@ public void testReconcileWithVolumeRemovedFromJbodStorage(VertxTestContext conte
// remove a volume from the Jbod Storage
volumes.remove(0);

Kafka kafkaWithRemovedJbodVolume = new KafkaBuilder(this.kafka)
KafkaNodePool kafkaNodePoolWithNewJbodVolume = new KafkaNodePoolBuilder(kafkaNodePool)
.editSpec()
.editKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endSpec()
.build();

Set<String> expectedPvcs = expectedPvcs(kafka);
Set<String> expectedPvcsWithRemovedJbodStorageVolume = expectedPvcs(kafkaWithRemovedJbodVolume);
Set<String> expectedPvcs = expectedPvcs(kafkaNodePool);
Set<String> expectedPvcsWithRemovedJbodStorageVolume = expectedPvcs(kafkaNodePoolWithNewJbodVolume);

// reconcile for kafka cluster creation
operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME))
Expand All @@ -267,7 +273,7 @@ public void testReconcileWithVolumeRemovedFromJbodStorage(VertxTestContext conte
assertThat(pvcsNames, is(expectedPvcs));
})))
.compose(v -> {
Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithRemovedJbodVolume);
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName(NODE_POOL_NAME).patch(kafkaNodePoolWithNewJbodVolume);
// reconcile kafka cluster with a Jbod storage volume removed
return operator.reconcile(new Reconciliation("test-trigger2", Kafka.RESOURCE_KIND, namespace, NAME));
})
Expand All @@ -286,16 +292,14 @@ public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) {
// trying to update id for a volume from in the JBOD storage
volumes.get(0).setId(3);

Kafka kafkaWithUpdatedJbodVolume = new KafkaBuilder(this.kafka)
KafkaNodePool kafkaNodePoolWithNewJbodVolume = new KafkaNodePoolBuilder(kafkaNodePool)
.editSpec()
.editKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endSpec()
.build();

Set<String> expectedPvcs = expectedPvcs(kafka);
Set<String> expectedPvcsWithUpdatedJbodStorageVolume = expectedPvcs(kafkaWithUpdatedJbodVolume);
Set<String> expectedPvcs = expectedPvcs(kafkaNodePool);
Set<String> expectedPvcsWithUpdatedJbodStorageVolume = expectedPvcs(kafkaNodePoolWithNewJbodVolume);

// reconcile for kafka cluster creation
operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME))
Expand All @@ -305,7 +309,7 @@ public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) {
assertThat(pvcsNames, is(expectedPvcs));
})))
.compose(v -> {
Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithUpdatedJbodVolume);
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName(NODE_POOL_NAME).patch(kafkaNodePoolWithNewJbodVolume);
// reconcile kafka cluster with a Jbod storage volume removed
return operator.reconcile(new Reconciliation("test-trigger2", Kafka.RESOURCE_KIND, namespace, NAME));
})
Expand All @@ -317,22 +321,20 @@ public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) {
})));
}

private Set<String> expectedPvcs(Kafka kafka) {
private Set<String> expectedPvcs(KafkaNodePool nodePool) {
Set<String> expectedPvcs = new HashSet<>();
for (int i = 0; i < kafka.getSpec().getKafka().getReplicas(); i++) {
for (SingleVolumeStorage volume : ((JbodStorage) kafka.getSpec().getKafka().getStorage()).getVolumes()) {
for (int i = 0; i < nodePool.getSpec().getReplicas(); i++) {
for (SingleVolumeStorage volume : ((JbodStorage) nodePool.getSpec().getStorage()).getVolumes()) {
if (volume instanceof PersistentClaimStorage) {
expectedPvcs.add(VolumeUtils.DATA_VOLUME_NAME + "-" + volume.getId() + "-"
+ KafkaResources.kafkaPodName(NAME, i));
expectedPvcs.add(VolumeUtils.DATA_VOLUME_NAME + "-" + volume.getId() + "-" + KafkaResources.kafkaPodName(NAME, NODE_POOL_NAME, i));
}
}
}
return expectedPvcs;
}

private List<PersistentVolumeClaim> getPvcs() {
String kafkaStsName = KafkaResources.kafkaComponentName(JbodStorageMockTest.NAME);
Labels pvcSelector = Labels.forStrimziCluster(JbodStorageMockTest.NAME).withStrimziKind(Kafka.RESOURCE_KIND).withStrimziName(kafkaStsName);
Labels pvcSelector = Labels.forStrimziCluster(NAME).withStrimziKind(Kafka.RESOURCE_KIND).withStrimziName(KafkaResources.kafkaComponentName(NAME));
return client.persistentVolumeClaims()
.inNamespace(namespace)
.withLabels(pvcSelector.toMap())
Expand Down
Loading
Loading