From a33d70f1dd8d3de7250b8834032b49b6f3f9d50e Mon Sep 17 00:00:00 2001 From: Sergio del Amo Date: Mon, 5 Jun 2023 11:26:55 +0200 Subject: [PATCH] kafka to M6 (#725) * kafka to M6 * platform to M2 * fix Kafka streams config --------- Co-authored-by: Graeme Rocher --- build.gradle | 1 - ...micronaut.build.internal.kafka-base.gradle | 1 - gradle/libs.versions.toml | 18 ++++++------ kafka-streams/build.gradle | 2 +- .../streams/KafkaStreamsConfiguration.java | 4 ++- kafka/build.gradle | 2 +- .../config/AbstractKafkaConfiguration.java | 29 +++++++++++++++++++ .../config/KafkaDefaultConfiguration.java | 18 +----------- settings.gradle | 1 - 9 files changed, 44 insertions(+), 32 deletions(-) diff --git a/build.gradle b/build.gradle index 068779e29..ce93eb356 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,6 @@ plugins { repositories { mavenCentral() - maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" } } configurations.all { resolutionStrategy { diff --git a/buildSrc/src/main/groovy/io.micronaut.build.internal.kafka-base.gradle b/buildSrc/src/main/groovy/io.micronaut.build.internal.kafka-base.gradle index 900993883..f078fe193 100644 --- a/buildSrc/src/main/groovy/io.micronaut.build.internal.kafka-base.gradle +++ b/buildSrc/src/main/groovy/io.micronaut.build.internal.kafka-base.gradle @@ -1,6 +1,5 @@ repositories { mavenCentral() - maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" } } configurations.all { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a0f0bdc80..8410df6e8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,8 +1,8 @@ [versions] -micronaut = "4.0.0-M4" -micronaut-platform = "4.0.0-M1" +micronaut = "4.0.0-M6" +micronaut-platform = "4.0.0-M2" micronaut-docs = "2.0.0" -micronaut-gradle-plugin = "4.0.0-M3" +micronaut-gradle-plugin = "4.0.0-M4" # Required to keep catalog compatibility with 3.4.x. Can be removed for 4.0.0 managed-kafka-compat = "3.4.0" @@ -18,12 +18,12 @@ testcontainers = "1.18.0" zipkin-brave-kafka-clients = '5.15.1' micronaut-cache = "4.0.0-M2" -micronaut-micrometer = "5.0.0-M1" -micronaut-reactor = "3.0.0-M2" -micronaut-rxjava2 = "2.0.0-M1" -micronaut-serde = "2.0.0-M7" -micronaut-tracing = "5.0.0-SNAPSHOT" -micronaut-test = "4.0.0-M3" +micronaut-micrometer = "5.0.0-M2" +micronaut-reactor = "3.0.0-M4" +micronaut-rxjava2 = "2.0.0-M3" +micronaut-serde = "2.0.0-M8" +micronaut-tracing = "5.0.0-M3" +micronaut-test = "4.0.0-M5" logback = "1.4.6" diff --git a/kafka-streams/build.gradle b/kafka-streams/build.gradle index 5df81fe25..c9fc72381 100644 --- a/kafka-streams/build.gradle +++ b/kafka-streams/build.gradle @@ -5,7 +5,7 @@ plugins { configurations.all { resolutionStrategy { - force 'io.micronaut.platform:micronaut-platform:4.0.0-M1' + force 'io.micronaut.platform:micronaut-platform:4.0.0-M2' } } diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsConfiguration.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsConfiguration.java index f7132253b..e3f1c2007 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsConfiguration.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsConfiguration.java @@ -58,7 +58,9 @@ public KafkaStreamsConfiguration( super(defaultConfiguration); Properties config = getConfig(); String propertyKey = PREFIX + '.' + NameUtils.hyphenate(streamName, true); - config.putAll(environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new)); + + Properties properties = environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new); + config.putAll(toKafkaProperties(environment, properties)); init(applicationConfiguration, environment, config); } } diff --git a/kafka/build.gradle b/kafka/build.gradle index 3d3b1e6b1..78164a8f8 100644 --- a/kafka/build.gradle +++ b/kafka/build.gradle @@ -6,7 +6,7 @@ plugins { configurations.all { resolutionStrategy { force 'org.graalvm.sdk:graal-sdk:22.3.2' - force 'io.micronaut.platform:micronaut-platform:4.0.0-M1' + force 'io.micronaut.platform:micronaut-platform:4.0.0-M2' } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java index 824095784..480a712ed 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConfiguration.java @@ -15,10 +15,15 @@ */ package io.micronaut.configuration.kafka.config; +import io.micronaut.context.env.Environment; import io.micronaut.core.util.Toggleable; import io.micronaut.core.annotation.NonNull; + +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.stream.Stream; /** * An abstract Kafka configuration class. @@ -63,6 +68,30 @@ protected AbstractKafkaConfiguration(Properties config) { this.config = config; } + /** + * Convert the given map of values to kafka properties. + * @param environment The env + * @param values The values + * @return The kafka properties + */ + protected static Properties toKafkaProperties(Environment environment, Map values) { + Properties properties = new Properties(); + values.entrySet().stream().filter(entry -> { + String key = entry.getKey().toString(); + return Stream.of("embedded", "consumers", "producers", "streams").noneMatch(key::startsWith); + }).forEach(entry -> { + Object value = entry.getValue(); + if (environment.canConvert(entry.getValue().getClass(), String.class)) { + Optional converted = environment.convert(entry.getValue(), String.class); + if (converted.isPresent()) { + value = converted.get(); + } + } + properties.setProperty(entry.getKey().toString(), value.toString()); + }); + return properties; + } + /** * @return The Kafka configuration */ diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/config/KafkaDefaultConfiguration.java b/kafka/src/main/java/io/micronaut/configuration/kafka/config/KafkaDefaultConfiguration.java index dbb75d058..a059a8e3a 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/config/KafkaDefaultConfiguration.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/config/KafkaDefaultConfiguration.java @@ -18,9 +18,7 @@ import java.time.Duration; import java.util.Collections; import java.util.Map; -import java.util.Optional; import java.util.Properties; -import java.util.stream.Stream; import io.micronaut.context.annotation.ConfigurationProperties; import io.micronaut.context.annotation.Requires; @@ -82,20 +80,6 @@ public void setHealthTimeout(Duration healthTimeout) { private static Properties resolveDefaultConfiguration(Environment environment) { Map values = environment.containsProperties(PREFIX) ? environment.getProperties(PREFIX) : Collections.emptyMap(); - Properties properties = new Properties(); - values.entrySet().stream().filter(entry -> { - String key = entry.getKey(); - return Stream.of("embedded", "consumers", "producers", "streams").noneMatch(key::startsWith); - }).forEach(entry -> { - Object value = entry.getValue(); - if (environment.canConvert(entry.getValue().getClass(), String.class)) { - Optional converted = environment.convert(entry.getValue(), String.class); - if (converted.isPresent()) { - value = converted.get(); - } - } - properties.setProperty(entry.getKey(), value.toString()); - }); - return properties; + return toKafkaProperties(environment, values); } } diff --git a/settings.gradle b/settings.gradle index b037ced68..6b27dfd7b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,7 +15,6 @@ rootProject.name = 'kafka-parent' micronautBuild { useStandardizedProjectNames=true - addSnapshotRepository() importMicronautCatalog() importMicronautCatalog("micronaut-cache") importMicronautCatalog("micronaut-micrometer")