Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-language docs examples for Kafka Streams #839

Merged
merged 15 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 12 additions & 32 deletions src/main/docs/guide/kafkaStreams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,28 @@ kafka:

You should then define a ann:context.annotation.Factory[] for your streams that defines beans that return a `KStream`. For example to implement the Word Count example from the Kafka Streams documentation:


.Kafka Streams Word Count
[source,java]
----
include::{testskafkastreams}/wordcount/WordCountStream.java[tags=imports, indent=0]

include::{testskafkastreams}/wordcount/WordCountStream.java[tags=clazz, indent=0]

include::{testskafkastreams}/wordcount/WordCountStream.java[tags=wordCountStream, indent=4]

}
----
snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="imports, clazz, wordCountStream"]

<1> The input topic
<2> The output topic
<3> An instance of api:configuration.kafka.streams.ConfiguredStreamBuilder[] is injected that allows mutating the configuration
<4> Materialize the count stream and save to a state store
<1> An instance of api:configuration.kafka.streams.ConfiguredStreamBuilder[] is injected that allows mutating the configuration
<2> The input topic
<3> Materialize the count stream and save to a state store
<4> The output topic

NOTE: With Kafka streams the key and value `Serdes` (serializer/deserializer) must be classes with a zero argument constructor. If you wish to use JSON (de)serialization you can subclass api:configuration.kafka.serde.JsonObjectSerde[] to define your `Serdes`

You can use the ann:configuration.kafka.annotation.KafkaClient[] annotation to send a sentence to be processed by the above stream:

.Defining a Kafka Client
[source,java]
----
include::{testskafkastreams}/wordcount/WordCountClient.java[]
----

snippet::io.micronaut.kafka.docs.streams.WordCountClient[tags="imports, clazz", indent=0]

You can also define a ann:configuration.kafka.annotation.KafkaListener[] to listen for the result of the word count stream:

.Defining a Kafka Listener
[source,java]
----
include::{testskafkastreams}/wordcount/WordCountListener.java[]
----

snippet::io.micronaut.kafka.docs.streams.WordCountListener[tags="imports, clazz", indent=0]

== Configuring Kafka Streams

Expand Down Expand Up @@ -109,20 +96,13 @@ The above configuration sets the `num.stream.threads` setting of the Kafka `Stre

You can then inject an `api:configuration.kafka.streams.ConfiguredStreamBuilder[]` specifically for the above configuration using `jakarta.inject.Named`:

[source,java]
----
@Named("my-other-stream")
KStream<String, String> myOtherKStream(@Named("my-other-stream") ConfiguredStreamBuilder builder) {...}
----
snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="myOtherStream", indent=0]

NOTE: If you do not provide a `@Named` on the `ConfiguredStreamBuilder` you have multiple KStreams defined that share the default configurations like client id, application id, etc.It is advisable when using multiple streams in a single app to provide a `@Named` instance of `ConfiguredStreamBuilder` for each stream.

.Kafka Streams Word Count
[source,java]
----
include::{testskafkastreams}/wordcount/WordCountStream.java[tags=namedStream,indent=0]
}
----

snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="namedStream", indent=0]

.Configuring Kafka Streams for testing
When writing a test without starting the actual Kafka server, you can instruct Micronaut not to start Kafka Streams. To do this create a config file suffixed with an environment name, such as `application-test.yml` and set the `kafka.streams.[STREAM-NAME].start-kafka-streams` to `false`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
When using streams you can set a state store for your stream using a store builder and telling the stream to store its data. In the above example for the Kafka Streams Word Count, the output is materialized to a named store that can later be retrieved via the Interactive Query Service. Apache Kafka docs https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html#querying-local-key-value-stores[available here].

You can inject the `InteractiveQueryService` and use the method `getQueryableStore` to get values from a state store.
You can inject the api:configuration.kafka.streams.InteractiveQueryService[] and use the method `getQueryableStore(String storeName, QueryableStoreType<T> storeType)` to get values from a state store.

[source,java]
----
include::{kafkastreams}/InteractiveQueryService.java[tags=getQueryableStore, indent=0]
----

An example service that wraps the `InteractiveQueryService` is included below. This is here to illustrate that when calling the `getQueryableStore` method you must provide the store name and preferably the type of key and value you are trying to retrieve.

[source,java]
----
include::{testskafkastreams}/wordcount/InteractiveQueryServiceExample.java[]
----
snippet::io.micronaut.kafka.docs.streams.InteractiveQueryServiceExample[tags="imports, clazz", indent=0]
1 change: 1 addition & 0 deletions test-suite-groovy/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ dependencies {
testImplementation(mnSerde.micronaut.serde.jackson)
testImplementation(mnRxjava2.micronaut.rxjava2)
testImplementation(projects.micronautKafka)
testImplementation(projects.micronautKafkaStreams)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.micronaut.kafka.docs.streams;

// tag::imports[]
import io.micronaut.configuration.kafka.streams.InteractiveQueryService
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
// end::imports[]


@Requires(property = 'spec.name', value = 'WordCountStreamTest')
// tag::clazz[]
/**
* Example service that uses the InteractiveQueryService in a reusable way. This is only intended as an example.
*/
@Singleton
class InteractiveQueryServiceExample {

private final InteractiveQueryService interactiveQueryService;

InteractiveQueryServiceExample(InteractiveQueryService interactiveQueryService) {
this.interactiveQueryService = interactiveQueryService;
}

/**
* Method to get the word state store and word count from the store using the interactive query service.
*
* @param stateStore the name of the state store ie "foo-store"
* @param word the key to get, in this case the word as the stream and ktable have been grouped by word
* @return the Long count of the word in the store
*/
Long getWordCount(String stateStore, String word) {
Optional<ReadOnlyKeyValueStore<String, Long>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore());
return queryableStore.map(kvReadOnlyKeyValueStore ->
kvReadOnlyKeyValueStore.get(word)).orElse(0L);
}

/**
* Method to get byte array from a state store using the interactive query service.
*
* @param stateStore the name of the state store ie "bar-store"
* @param blobName the key to get, in this case the name of the blob
* @return the byte[] stored in the state store
*/
byte[] getBytes(String stateStore, String blobName) {
Optional<ReadOnlyKeyValueStore<String, byte[]>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore());
return queryableStore.map(stringReadOnlyKeyValueStore ->
stringReadOnlyKeyValueStore.get(blobName)).orElse(null);
}

/**
* Method to get value V by key K.
*
* @param stateStore the name of the state store ie "baz-store"
* @param name the key to get
* @return the value of type V stored in the state store
*/
<K, V> V getGenericKeyValue(String stateStore, K name) {
Optional<ReadOnlyKeyValueStore<K, V>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.<K, V>keyValueStore());
return queryableStore.map(kvReadOnlyKeyValueStore ->
kvReadOnlyKeyValueStore.get(name)).orElse(null);
}
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.micronaut.kafka.docs.streams

import io.micronaut.context.annotation.BootstrapContextCompatible
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.micronaut.context.env.BootstrapPropertySourceLocator
import io.micronaut.context.env.Environment
import io.micronaut.context.env.PropertySource
import io.micronaut.context.exceptions.ConfigurationException
import jakarta.annotation.PostConstruct
import jakarta.inject.Singleton
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic

import java.util.concurrent.ExecutionException
import java.util.stream.Collectors
import java.util.stream.Stream

@Requires(property = 'spec.name', value = 'WordCountStreamTest')
@BootstrapContextCompatible
@Singleton
class KafkaTestInitializer implements BootstrapPropertySourceLocator {

private final Map<String, Object> adminProps

KafkaTestInitializer(@Value('${kafka.bootstrap.servers}') String bootstrapServers) {
this.adminProps = Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
}

@PostConstruct
void initializeTopics() {
createTopics(
Stream.of(
'streams-plaintext-input',
'named-word-count-input',
'my-other-stream',
'no-op-input'
).map(topicName -> configureTopic(topicName, 1, 1)).collect(Collectors.toSet())
)
}

@Override
Iterable<PropertySource> findPropertySources(Environment environment) throws ConfigurationException {
return BootstrapPropertySourceLocator.EMPTY_LOCATOR.findPropertySources(environment)
}

private NewTopic configureTopic(String name, int numPartitions, int replicationFactor) {
return new NewTopic(name, numPartitions, (short) replicationFactor)
}

private void createTopics(Set<NewTopic> topicsToCreate) {
try (AdminClient admin = AdminClient.create(adminProps)) {
Set<String> existingTopics = admin.listTopics().names().get()
Set<NewTopic> newTopics = topicsToCreate.stream().filter(newTopic ->
!existingTopics.contains(newTopic.name())).collect(Collectors.toSet());
admin.createTopics(newTopics).all().get()
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException('Failed to initialize test kafka topics', e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.micronaut.kafka.docs.streams

import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder
import io.micronaut.context.annotation.Factory
import jakarta.inject.Named
import jakarta.inject.Singleton
import org.apache.kafka.streams.kstream.KStream

@Factory
class NoOpStreamFactory {

@Singleton
@Named("no-op-stream")
KStream<String, String> noOpStream(ConfiguredStreamBuilder builder) {
return builder.stream("no-op-input")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.micronaut.kafka.docs.streams

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
// end::imports[]

@Requires(property = 'spec.name', value = 'WordCountStreamTest')
// tag::clazz[]
@KafkaClient
interface WordCountClient {

@Topic("streams-plaintext-input")
void publishSentence(String sentence)
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.micronaut.kafka.docs.streams

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires

import java.util.concurrent.ConcurrentHashMap

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
// end::imports[]

@Requires(property = 'spec.name', value = 'WordCountStreamTest')
// tag::clazz[]
@KafkaListener(offsetReset = EARLIEST, groupId = 'WordCountListener')
class WordCountListener {

private final Map<String, Long> wordCounts = new ConcurrentHashMap<>()

@Topic("streams-wordcount-output")
void count(@KafkaKey String word, long count) {
wordCounts.put(word, count)
}

long getCount(String word) {
Long num = wordCounts.get(word)
num ?: 0
}

Map<String, Long> getWordCounts() {
Collections.unmodifiableMap(wordCounts)
}
}
// end::clazz[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.micronaut.kafka.docs.streams

// tag::imports[]
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import jakarta.inject.Named
import jakarta.inject.Singleton
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.Grouped
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.KTable
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Produced
// end::imports[]

@Requires(property = 'spec.name', value = 'WordCountStreamTest')
// tag::clazz[]
@Factory
class WordCountStream {
// end::clazz[]

// tag::wordCountStream[]
@Singleton
@Named('word-count')
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // <1>
// set default serdes
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest')
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, '500')

KStream<String, String> source = builder.stream('streams-plaintext-input') // <2>

KTable<String, Long> groupedByWord = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
//Store the result in a store for lookup later
.count(Materialized.as('word-count-store-groovy')) // <3>

groupedByWord
//convert to stream
.toStream()
//send to output using specific serdes
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())) // <4>

return source
}
// end::wordCountStream[]

// tag::namedStream[]
@Singleton
@Named('my-stream')
KStream<String, String> myStream(ConfiguredStreamBuilder builder) {

// end::namedStream[]
// set default serdes
Properties props = builder.getConfiguration()
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest')
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, '500')

KStream<String, String> source = builder.stream("named-word-count-input")
KTable<String, Long> counts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.groupBy((key, value) -> value)
.count()

// need to override value serde to Long type
counts.toStream().to("named-word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
return source
}

// tag::myOtherStream[]
@Singleton
@Named('my-other-stream')
KStream<String, String> myOtherKStream(ConfiguredStreamBuilder builder) {
return builder.stream('my-other-stream')
}
// end::myOtherStream[]
}
Loading