Skip to content

Commit

Permalink
Upgrade to Kafka 3.0 (#554)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstepanov authored Jun 16, 2022
1 parent 6d28194 commit 229bd9b
Show file tree
Hide file tree
Showing 31 changed files with 245 additions and 341 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ plugins {
}

dependencies {
api libs.micronaut.messaging
api mn.micronaut.messaging

compileOnly libs.micronaut.management
compileOnly mn.micronaut.management

implementation libs.projectreactor
implementation mn.reactor

testAnnotationProcessor libs.micronaut.inject.java
testAnnotationProcessor mn.micronaut.inject.java

testImplementation libs.micronaut.micrometer
testImplementation libs.micronaut.management
testImplementation mn.micronaut.micrometer.core
testImplementation mn.micronaut.management
testImplementation libs.testcontainers.kafka

testRuntimeOnly libs.micronaut.cache
testRuntimeOnly libs.micronaut.reactor
testRuntimeOnly libs.micronaut.http.server.netty
testRuntimeOnly libs.micronaut.management
testRuntimeOnly mn.micronaut.cache.core
testRuntimeOnly mn.micronaut.reactor
testRuntimeOnly mn.micronaut.http.server.netty
testRuntimeOnly mn.micronaut.management
}

test {
Expand Down
10 changes: 3 additions & 7 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
projectVersion=4.3.1-SNAPSHOT
projectVersion=5.0.0-SNAPSHOT
projectGroup=io.micronaut.kafka

micronautDocsVersion=2.0.0
micronautVersion=3.4.0
groovyVersion=3.0.10
spockVersion=2.0-groovy-3.0
micronautSharedSettingVersion=5.3.9

title=Micronaut Kafka
projectDesc=Integration between Micronaut and Kafka Messaging
projectUrl=https://micronaut.io
githubSlug=micronaut-projects/micronaut-kafka
developers=Graeme Rocher

githubCoreBranch=3.5.x
githubCoreBranch=3.6.x
bomProperty=micronautKafkaVersion

testskafka=kafka/src/test/groovy/io/micronaut/configuration/kafka/docs
Expand Down
23 changes: 6 additions & 17 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
[versions]
managed-kafka = '2.8.1'
micronaut = "3.5.1"
micronaut-docs = "2.0.0"

managed-kafka = '3.2.0'

groovy = "3.0.11"

opentracing-kafka-client = '0.1.15'
opentracing-mock = '0.33.0'
Expand All @@ -10,22 +15,6 @@ zipkin-brave-kafka-clients = '5.13.9'
managed-kafka-clients = { module = 'org.apache.kafka:kafka-clients', version.ref = 'managed-kafka' }
managed-kafka-streams = { module = 'org.apache.kafka:kafka-streams', version.ref = 'managed-kafka' }

micronaut-cache = { module = 'io.micronaut.cache:micronaut-cache-core' }
micronaut-graal = { module = 'io.micronaut:micronaut-graal' }
micronaut-http-client = { module = 'io.micronaut:micronaut-http-client' }
micronaut-http-server-netty = { module = 'io.micronaut:micronaut-http-server-netty' }
micronaut-inject-java = { module = 'io.micronaut:micronaut-inject-java' }
micronaut-management = { module = 'io.micronaut:micronaut-management' }
micronaut-messaging = { module = 'io.micronaut:micronaut-messaging' }
micronaut-micrometer = { module = 'io.micronaut.micrometer:micronaut-micrometer-core' }
micronaut-micrometer-statsd = { module = 'io.micronaut.micrometer:micronaut-micrometer-registry-statsd' }
micronaut-reactor = { module = 'io.micronaut.reactor:micronaut-reactor' }
micronaut-rxjava2 = { module = 'io.micronaut.rxjava2:micronaut-rxjava2' }
micronaut-tracing = { module = 'io.micronaut:micronaut-tracing' }

graal = { module = "org.graalvm.nativeimage:svm" }
projectreactor = { module = 'io.projectreactor:reactor-core' }

opentracing-kafka-client = { module = 'io.opentracing.contrib:opentracing-kafka-client', version.ref = 'opentracing-kafka-client' }
opentracing-mock = { module = 'io.opentracing:opentracing-mock', version.ref = 'opentracing-mock' }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import io.micronaut.management.health.indicator.HealthResult;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.reactivestreams.Publisher;

import jakarta.inject.Singleton;
Expand Down Expand Up @@ -130,7 +130,7 @@ private Map<String, Object> buildDetails(KafkaStreams kafkaStreams) {
final Map<String, Object> streamDetails = new HashMap<>();

if (kafkaStreams.state().isRunningOrRebalancing()) {
for (ThreadMetadata metadata : kafkaStreams.localThreadsMetadata()) {
for (ThreadMetadata metadata : kafkaStreams.metadataForLocalThreads()) {
final Map<String, Object> threadDetails = new HashMap<>();
threadDetails.put("threadName", metadata.threadName());
threadDetails.put("threadState", metadata.threadState());
Expand Down Expand Up @@ -179,10 +179,10 @@ private String getApplicationId(final KafkaStreams kafkaStreams) {
private static String getDefaultStreamName(final KafkaStreams kafkaStreams) {
return Optional.ofNullable(kafkaStreams)
.filter(kafkaStreams1 -> kafkaStreams1.state().isRunningOrRebalancing())
.map(KafkaStreams::localThreadsMetadata)
.map(KafkaStreams::metadataForLocalThreads)
.map(Collection::stream)
.flatMap(Stream::findFirst)
.map(ThreadMetadata::threadName)
.map(org.apache.kafka.streams.ThreadMetadata::threadName)
.orElse("unidentified");
}

Expand All @@ -192,9 +192,9 @@ private static String getDefaultStreamName(final KafkaStreams kafkaStreams) {
* @param taskMetadataSet the task metadata
* @return map of details
*/
private static Map<String, Object> taskDetails(Set<TaskMetadata> taskMetadataSet) {
private static Map<String, Object> taskDetails(Set<org.apache.kafka.streams.TaskMetadata> taskMetadataSet) {
final Map<String, Object> details = new HashMap<>();
for (TaskMetadata taskMetadata : taskMetadataSet) {
for (org.apache.kafka.streams.TaskMetadata taskMetadata : taskMetadataSet) {
details.put("taskId", taskMetadata.taskId());
if (details.containsKey("partitions")) {
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.micronaut.configuration.kafka.streams

import io.micronaut.context.ApplicationContext
import io.micronaut.runtime.server.EmbeddedServer
import spock.lang.AutoCleanup
import spock.lang.Shared

abstract class AbstractEmbeddedServerSpec extends AbstractKafkaContainerSpec {

@Shared @AutoCleanup EmbeddedServer embeddedServer

@Override
void startContext() {
embeddedServer = ApplicationContext.run(EmbeddedServer,
getConfiguration() +
['kafka.bootstrap.servers': kafkaContainer.bootstrapServers]
)
context = embeddedServer.applicationContext
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.micronaut.configuration.kafka.streams

import io.micronaut.context.ApplicationContext
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification

abstract class AbstractKafkaContainerSpec extends AbstractKafkaSpec {

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.9"))
.withEnv(getEnvVariables())
@Shared @AutoCleanup ApplicationContext context

void setupSpec() {
kafkaContainer.start()
afterKafkaStarted()
startContext()
}

void afterKafkaStarted() {
}

void startContext() {
context = ApplicationContext.run(
getConfiguration() + ['kafka.bootstrap.servers': kafkaContainer.bootstrapServers]
)
}

void stopContext() {
context?.stop()
}

void cleanupSpec() {
stopContext()
kafkaContainer.stop()
}

void createTopic(String name, int numPartitions, int replicationFactor) {
try (def admin = AdminClient.create([(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG): kafkaContainer.bootstrapServers])) {
admin.createTopics([new NewTopic(name, numPartitions, (short) replicationFactor)]).all().get()
}
}

protected Map<String, String> getEnvVariables() {
[:]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.micronaut.configuration.kafka.streams

import spock.lang.Specification
import spock.util.concurrent.PollingConditions

abstract class AbstractKafkaSpec extends Specification {

protected final PollingConditions conditions = new PollingConditions(timeout: conditionsTimeout, delay: 1)

protected int getConditionsTimeout() {
60
}

protected Map<String, Object> getConfiguration() {
['spec.name': getClass().simpleName]
}
}
Original file line number Diff line number Diff line change
@@ -1,50 +1,43 @@
package io.micronaut.configuration.kafka.streams

import groovy.util.logging.Slf4j
import io.micronaut.context.ApplicationContext
import io.micronaut.core.util.CollectionUtils
import io.micronaut.runtime.server.EmbeddedServer
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

@Slf4j
abstract class AbstractTestContainersSpec extends Specification {
import io.micronaut.configuration.kafka.streams.optimization.OptimizationStream
import io.micronaut.configuration.kafka.streams.wordcount.WordCountStream
import spock.lang.Shared

PollingConditions conditions = new PollingConditions(timeout: 60, delay: 1)
abstract class AbstractTestContainersSpec extends AbstractEmbeddedServerSpec {

@Shared @AutoCleanup EmbeddedServer embeddedServer
@Shared @AutoCleanup ApplicationContext context
@Shared static KafkaContainer kafkaContainer = KafkaSetup.init()
@Shared
String myStreamApplicationId = 'my-stream-' + UUID.randomUUID().toString()

void setupSpec() {
List<Object> config = ["kafka.bootstrap.servers", kafkaContainer.bootstrapServers]
config.addAll(getConfiguration())
@Shared
String optimizationOnApplicationId = 'optimization-on-' + UUID.randomUUID().toString()

embeddedServer = ApplicationContext.run(EmbeddedServer, CollectionUtils.mapOf(config as Object[]))
@Shared
String optimizationOffApplicationId = 'optimization-off-' + UUID.randomUUID().toString()

context = embeddedServer.applicationContext
protected Map<String, Object> getConfiguration() {
super.getConfiguration() + ['kafka.generic.config': "hello",
'kafka.streams.my-stream.application.id': myStreamApplicationId,
'kafka.streams.my-stream.num.stream.threads': 10,
'kafka.streams.optimization-on.application.id': optimizationOnApplicationId,
'kafka.streams.optimization-on.topology.optimization': 'all',
'kafka.streams.optimization-off.application.id': optimizationOffApplicationId,
'kafka.streams.optimization-off.topology.optimization': 'none']
}

protected List<Object> getConfiguration() {
['kafka.generic.config', "hello",
'kafka.streams.my-stream.application.id', 'my-stream',
'kafka.streams.my-stream.num.stream.threads', 10,
'kafka.streams.optimization-on.application.id', 'optimization-on',
'kafka.streams.optimization-on.topology.optimization', 'all',
'kafka.streams.optimization-off.application.id', 'optimization-off',
'kafka.streams.optimization-off.topology.optimization', 'none']
@Override
void afterKafkaStarted() {
[
WordCountStream.INPUT,
WordCountStream.OUTPUT,
WordCountStream.NAMED_WORD_COUNT_INPUT,
WordCountStream.NAMED_WORD_COUNT_OUTPUT,
OptimizationStream.OPTIMIZATION_ON_INPUT,
OptimizationStream.OPTIMIZATION_OFF_INPUT
].forEach(topic -> {
createTopic(topic.toString(), 1, 1)
})
}

void cleanupSpec() {
try {
embeddedServer.stop()
log.warn("Stopped containers!")
} catch (Exception ignored) {
log.error("Could not stop containers")
}
embeddedServer?.close()
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class KafkaStreamsSpec extends AbstractTestContainersSpec {
def builder = context.getBean(ConfiguredStreamBuilder, Qualifiers.byName('my-stream'))

then:
builder.configuration['application.id'] == "my-stream"
builder.configuration['application.id'] == myStreamApplicationId
builder.configuration['generic.config'] == "hello"
}

Expand All @@ -33,8 +33,8 @@ class KafkaStreamsSpec extends AbstractTestContainersSpec {
def stream = context.getBean(KafkaStreams, Qualifiers.byName('my-stream'))

then:
stream.config.originals()['application.id'] == "my-stream"
stream.config.originals()['generic.config'] == "hello"
stream.applicationConfigs.originals()['application.id'] == myStreamApplicationId
stream.applicationConfigs.originals()['generic.config'] == "hello"
}

void "test kafka stream application"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ class KafkaStreamsHealthDisabledSpec extends AbstractTestContainersSpec {
}

@Override
protected List<Object> getConfiguration() {
List<Object> config = super.configuration
config << "kafka.health.streams.enabled" << 'false'
config
protected Map<String, Object> getConfiguration() {
super.getConfiguration() + ["kafka.health.streams.enabled": 'false']
}
}

This file was deleted.

Loading

0 comments on commit 229bd9b

Please sign in to comment.