From 581bb8df6be93c05b89dc5cf8abce7605fffecf6 Mon Sep 17 00:00:00 2001 From: Graeme Rocher Date: Thu, 24 Oct 2024 11:33:54 +0200 Subject: [PATCH 1/3] remove kafka compat and upgrade --- gradle/libs.versions.toml | 6 +---- .../health/DefaultNetworkClientCreator.java | 23 +++++++++++++++---- .../kafka/health/KafkaHealthIndicator.java | 5 ++-- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 8e86a08c1..1da59fec8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -5,8 +5,7 @@ micronaut-docs = "2.0.0" micronaut-gradle-plugin = "4.4.3" # Required to keep catalog compatibility with 3.4.x. Can be removed for 4.0.0 -managed-kafka-compat = "3.8.0" -managed-kafka = '3.7.0' +managed-kafka = '3.8.0' groovy = "4.0.15" @@ -29,9 +28,6 @@ micronaut-test-resources="2.6.2" # Core micronaut-core = { module = 'io.micronaut:micronaut-core-bom', version.ref = 'micronaut' } -# Duplicated to keep catalog compatibility with 3.4.x. Can be removed for 4.0.0 -managed-kafka = { module = 'org.apache.kafka:kafka-clients', version.ref = 'managed-kafka-compat' } - managed-kafka-clients = { module = 'org.apache.kafka:kafka-clients', version.ref = 'managed-kafka' } managed-kafka-streams = { module = 'org.apache.kafka:kafka-streams', version.ref = 'managed-kafka' } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/health/DefaultNetworkClientCreator.java b/kafka/src/main/java/io/micronaut/configuration/kafka/health/DefaultNetworkClientCreator.java index 528b946b7..918ef0642 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/health/DefaultNetworkClientCreator.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/health/DefaultNetworkClientCreator.java @@ -21,6 +21,7 @@ import jdk.jfr.Experimental; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.common.ClusterResourceListener; @@ -88,10 +89,24 @@ public NetworkClient create(@NonNull ClusterResourceListener... listeners) { metrics = metrics(); channelBuilder = createChannelBuilder(config, SYSTEM, logContext); selector = selector(metrics, channelBuilder); - return new NetworkClient(selector, metadata(listeners), clientId, 1, - reconnectBackoff, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, - (int) HOURS.toMillis(1), connectionSetupTimeout, connectionSetupTimeoutMax, - SYSTEM, true, new ApiVersions(), logContext); + return new NetworkClient( + selector, + metadata(listeners), + clientId, + 1, + reconnectBackoff, + reconnectBackoffMax, + socketSendBuffer, + socketReceiveBuffer, + (int) HOURS.toMillis(1), + connectionSetupTimeout, + connectionSetupTimeoutMax, + SYSTEM, + true, + new ApiVersions(), + logContext, + MetadataRecoveryStrategy.NONE + ); } catch (Throwable e) { closeQuietly(metrics, "Metrics"); closeQuietly(selector, "Selector"); diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java b/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java index 49767f0cf..9489c66ab 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/health/KafkaHealthIndicator.java @@ -29,6 +29,7 @@ import io.micronaut.management.health.indicator.HealthResult; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import org.apache.kafka.clients.LeastLoadedNode; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; @@ -225,9 +226,9 @@ private Optional hasReadyNodes(NetworkClient networkClient) { private HealthResult waitForLeastLoadedNode(NetworkClient networkClient) { final long requestTimeoutMs = defaultConfiguration.getHealthTimeout().toMillis(); - final Node node = networkClient.leastLoadedNode(SYSTEM.milliseconds()); + final LeastLoadedNode leastLoadedNode = networkClient.leastLoadedNode(SYSTEM.milliseconds()); try { - return result(awaitReady(networkClient, node, SYSTEM, requestTimeoutMs), null).build(); + return result(awaitReady(networkClient, leastLoadedNode.node(), SYSTEM, requestTimeoutMs), null).build(); } catch (IOException e) { return failure(e, Collections.emptyMap()); } From 5b0c2dd57ad2a0659875f7c540f3d45d2ac4d8a4 Mon Sep 17 00:00:00 2001 From: Graeme Rocher Date: Thu, 24 Oct 2024 12:28:12 +0200 Subject: [PATCH 2/3] fix tests --- .../kafka/KafkaConfigurationSpec.groovy | 8 ++++---- .../annotation/KafkaTypeConversionSpec.groovy | 2 +- .../errors/KafkaBatchErrorStrategySpec.groovy | 18 ++++++++---------- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaConfigurationSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaConfigurationSpec.groovy index c3ffe4169..4842f3639 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaConfigurationSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaConfigurationSpec.groovy @@ -77,8 +77,8 @@ class KafkaConfigurationSpec extends Specification { then: "the new consumer's deserializers have the configured encoding" consumer != null - (consumer.delegate.deserializers.keyDeserializer as StringDeserializer).encoding == StandardCharsets.US_ASCII.name() - (consumer.delegate.deserializers.valueDeserializer as StringDeserializer).encoding == StandardCharsets.ISO_8859_1.name() + (consumer.delegate.deserializers.keyDeserializer as StringDeserializer).encoding.name() == StandardCharsets.US_ASCII.name() + (consumer.delegate.deserializers.valueDeserializer as StringDeserializer).encoding.name() == StandardCharsets.ISO_8859_1.name() cleanup: consumer.close() @@ -103,8 +103,8 @@ class KafkaConfigurationSpec extends Specification { then: "the new producer's serializers have the configured encoding" producer != null - (producer.keySerializer as StringSerializer).encoding == StandardCharsets.US_ASCII.name() - (producer.valueSerializer as StringSerializer).encoding == StandardCharsets.ISO_8859_1.name() + (producer.keySerializer as StringSerializer).encoding.name() == StandardCharsets.US_ASCII.name() + (producer.valueSerializer as StringSerializer).encoding.name() == StandardCharsets.ISO_8859_1.name() cleanup: producer.close() diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaTypeConversionSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaTypeConversionSpec.groovy index f9f875d49..9b4ae51e7 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaTypeConversionSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaTypeConversionSpec.groovy @@ -48,7 +48,7 @@ class KafkaTypeConversionSpec extends AbstractKafkaContainerSpec { myConsumer.lastException != null myConsumer.lastException.cause instanceof SerializationException myConsumer.lastException.cause.printStackTrace() - myConsumer.lastException.cause.message.contains("deserializing key/value for partition") + myConsumer.lastException.cause.message.contains("Error deserializing KEY for partition") } } diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy index 235949b66..eeffb4c1d 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaBatchErrorStrategySpec.groovy @@ -102,9 +102,9 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { and: "The retry error strategy was honored" myConsumer.exceptions.size() == 2 - myConsumer.exceptions[0].message.startsWith('Error deserializing key/value') + myConsumer.exceptions[0].message.startsWith('Error deserializing VALUE') (myConsumer.exceptions[0].cause as RecordDeserializationException).offset() == 3 - myConsumer.exceptions[1].message.startsWith('Error deserializing key/value') + myConsumer.exceptions[1].message.startsWith('Error deserializing VALUE') (myConsumer.exceptions[1].cause as RecordDeserializationException).offset() == 3 } @@ -124,17 +124,15 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { conditions.eventually { myConsumer.received == ['111/222', '333', '444/555'] myConsumer.successful == ['111/222', '333', '444/555'] - myConsumer.skippedOffsets == [4L] + myConsumer.skippedOffsets.contains(4L) } and: "The retry error strategy was honored" - myConsumer.exceptions.size() == 3 - myConsumer.exceptions[0].message.startsWith('Error deserializing key/value') + myConsumer.exceptions.size() == 2 + myConsumer.exceptions[0].message.startsWith('Error deserializing VALUE') (myConsumer.exceptions[0].cause as RecordDeserializationException).offset() == 2 - myConsumer.exceptions[1].message.startsWith('Error deserializing key/value') - (myConsumer.exceptions[1].cause as RecordDeserializationException).offset() == 2 - myConsumer.exceptions[2].message.startsWith('Error deserializing key/value') - (myConsumer.exceptions[2].cause as RecordDeserializationException).offset() == 4 + myConsumer.exceptions[1].message.startsWith('Error deserializing VALUE') + (myConsumer.exceptions[1].cause as RecordDeserializationException).offset() == 4 } void "test batch mode with 'retry exp' error strategy"() { @@ -342,7 +340,7 @@ class KafkaBatchErrorStrategySpec extends AbstractEmbeddedServerSpec { @Override ConditionalRetryBehaviour conditionalRetryBehaviour(KafkaListenerException exception) { - if(exception.getMessage() == "Error deserializing key/value for partition batch-mode-retry-conditionally-deser-0 at offset 4. If needed, please seek past the record to continue consumption.") { + if(exception.message.concat("at offset 4")) { skippedOffsets << 4L return ConditionalRetryBehaviour.SKIP } else { From 2cf4815a1846c2a06336a42677b247ebb2b3f669 Mon Sep 17 00:00:00 2001 From: Graeme Rocher Date: Thu, 24 Oct 2024 13:11:49 +0200 Subject: [PATCH 3/3] accept regressions to BOM --- kafka-bom/build.gradle | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kafka-bom/build.gradle b/kafka-bom/build.gradle index 9bde100af..c3dd4520f 100644 --- a/kafka-bom/build.gradle +++ b/kafka-bom/build.gradle @@ -5,4 +5,8 @@ plugins { micronautBom { // exclude example projects extraExcludedProjects.add("tasks-sasl-plaintext") + suppressions { + acceptedLibraryRegressions.add("kafka") + acceptedVersionRegressions.add("kafka-compat") + } }