Skip to content

Commit

Permalink
kafka to M6 (#725)
Browse files Browse the repository at this point in the history
* kafka to M6

* platform to M2

* fix Kafka streams config

---------

Co-authored-by: Graeme Rocher <graeme.rocher@oracle.com>
  • Loading branch information
sdelamo and graemerocher authored Jun 5, 2023
1 parent f6201a3 commit a33d70f
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 32 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ plugins {

repositories {
mavenCentral()
maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" }
}
configurations.all {
resolutionStrategy {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
repositories {
mavenCentral()
maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" }
}

configurations.all {
Expand Down
18 changes: 9 additions & 9 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[versions]
micronaut = "4.0.0-M4"
micronaut-platform = "4.0.0-M1"
micronaut = "4.0.0-M6"
micronaut-platform = "4.0.0-M2"
micronaut-docs = "2.0.0"
micronaut-gradle-plugin = "4.0.0-M3"
micronaut-gradle-plugin = "4.0.0-M4"

# Required to keep catalog compatibility with 3.4.x. Can be removed for 4.0.0
managed-kafka-compat = "3.4.0"
Expand All @@ -18,12 +18,12 @@ testcontainers = "1.18.0"
zipkin-brave-kafka-clients = '5.15.1'

micronaut-cache = "4.0.0-M2"
micronaut-micrometer = "5.0.0-M1"
micronaut-reactor = "3.0.0-M2"
micronaut-rxjava2 = "2.0.0-M1"
micronaut-serde = "2.0.0-M7"
micronaut-tracing = "5.0.0-SNAPSHOT"
micronaut-test = "4.0.0-M3"
micronaut-micrometer = "5.0.0-M2"
micronaut-reactor = "3.0.0-M4"
micronaut-rxjava2 = "2.0.0-M3"
micronaut-serde = "2.0.0-M8"
micronaut-tracing = "5.0.0-M3"
micronaut-test = "4.0.0-M5"

logback = "1.4.6"

Expand Down
2 changes: 1 addition & 1 deletion kafka-streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {

configurations.all {
resolutionStrategy {
force 'io.micronaut.platform:micronaut-platform:4.0.0-M1'
force 'io.micronaut.platform:micronaut-platform:4.0.0-M2'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public KafkaStreamsConfiguration(
super(defaultConfiguration);
Properties config = getConfig();
String propertyKey = PREFIX + '.' + NameUtils.hyphenate(streamName, true);
config.putAll(environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new));

Properties properties = environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new);
config.putAll(toKafkaProperties(environment, properties));
init(applicationConfiguration, environment, config);
}
}
2 changes: 1 addition & 1 deletion kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
configurations.all {
resolutionStrategy {
force 'org.graalvm.sdk:graal-sdk:22.3.2'
force 'io.micronaut.platform:micronaut-platform:4.0.0-M1'
force 'io.micronaut.platform:micronaut-platform:4.0.0-M2'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
*/
package io.micronaut.configuration.kafka.config;

import io.micronaut.context.env.Environment;
import io.micronaut.core.util.Toggleable;

import io.micronaut.core.annotation.NonNull;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;

/**
* An abstract Kafka configuration class.
Expand Down Expand Up @@ -63,6 +68,30 @@ protected AbstractKafkaConfiguration(Properties config) {
this.config = config;
}

/**
* Convert the given map of values to kafka properties.
* @param environment The env
* @param values The values
* @return The kafka properties
*/
protected static Properties toKafkaProperties(Environment environment, Map<?, ?> values) {
Properties properties = new Properties();
values.entrySet().stream().filter(entry -> {
String key = entry.getKey().toString();
return Stream.of("embedded", "consumers", "producers", "streams").noneMatch(key::startsWith);
}).forEach(entry -> {
Object value = entry.getValue();
if (environment.canConvert(entry.getValue().getClass(), String.class)) {
Optional<?> converted = environment.convert(entry.getValue(), String.class);
if (converted.isPresent()) {
value = converted.get();
}
}
properties.setProperty(entry.getKey().toString(), value.toString());
});
return properties;
}

/**
* @return The Kafka configuration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Requires;
Expand Down Expand Up @@ -82,20 +80,6 @@ public void setHealthTimeout(Duration healthTimeout) {

private static Properties resolveDefaultConfiguration(Environment environment) {
Map<String, Object> values = environment.containsProperties(PREFIX) ? environment.getProperties(PREFIX) : Collections.emptyMap();
Properties properties = new Properties();
values.entrySet().stream().filter(entry -> {
String key = entry.getKey();
return Stream.of("embedded", "consumers", "producers", "streams").noneMatch(key::startsWith);
}).forEach(entry -> {
Object value = entry.getValue();
if (environment.canConvert(entry.getValue().getClass(), String.class)) {
Optional<?> converted = environment.convert(entry.getValue(), String.class);
if (converted.isPresent()) {
value = converted.get();
}
}
properties.setProperty(entry.getKey(), value.toString());
});
return properties;
return toKafkaProperties(environment, values);
}
}
1 change: 0 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ rootProject.name = 'kafka-parent'

micronautBuild {
useStandardizedProjectNames=true
addSnapshotRepository()
importMicronautCatalog()
importMicronautCatalog("micronaut-cache")
importMicronautCatalog("micronaut-micrometer")
Expand Down

0 comments on commit a33d70f

Please sign in to comment.