Skip to content

Commit

Permalink
Introduce KafkaListener.threadsValue to allow for dynamic config (#769
Browse files Browse the repository at this point in the history
)
  • Loading branch information
guillermocalvo authored Aug 4, 2023
1 parent fd6d9fd commit 968a4b4
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,37 @@
ErrorStrategy errorStrategy() default @ErrorStrategy();

/**
* Kafka consumers are by default single threaded. If you wish to increase the number of threads
* Dynamically configure the number of threads of a Kafka consumer.
*
* <p>Kafka consumers are by default single threaded. If you wish to increase the number of threads
* for a consumer you can alter this setting. Note that this means that multiple partitions will
* be allocated to a single application.
*
* <p>NOTE: When using this setting if your bean is {@link jakarta.inject.Singleton} then local state will be s
* <p>NOTE: When using this setting if your bean is {@link jakarta.inject.Singleton} then local state will be
* shared between invocations from different consumer threads</p>
*
* <p>{@code threadsValue} takes precedence over {@code threads} if they are both set.
*
* @return The number of threads
* @see KafkaListener#threads()
*/
@AliasFor(member = "threads")
String threadsValue() default "";

/**
* Statically configure the number of threads of a Kafka consumer.
*
* <p>Kafka consumers are by default single threaded. If you wish to increase the number of threads
* for a consumer you can alter this setting. Note that this means that multiple partitions will
* be allocated to a single application.
*
* <p>NOTE: When using this setting if your bean is {@link jakarta.inject.Singleton} then local state will be
* shared between invocations from different consumer threads</p>
*
* <p>{@code threads} will be overridden by {@code threadsValue} if they are both set.
*
* @return The number of threads
* @see KafkaListener#threadsValue()
*/
int threads() default 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import io.micronaut.configuration.kafka.metrics.KafkaProducerMetrics
import io.micronaut.configuration.kafka.serde.JsonObjectSerde
import io.micronaut.configuration.metrics.management.endpoint.MetricsEndpoint
import io.micronaut.context.annotation.Requires
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import io.micronaut.core.annotation.Nullable
import io.micronaut.http.HttpResponse
import io.micronaut.http.client.DefaultHttpClientConfiguration
Expand All @@ -20,7 +22,9 @@ import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.serde.annotation.Serdeable
import io.reactivex.Single
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
Expand All @@ -42,6 +46,7 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec {
super.configuration +
['micrometer.metrics.enabled': true,
'endpoints.metrics.sensitive': false,
'my.thread.count': 3,
(EMBEDDED_TOPICS): ['words', 'books', 'words-records', 'books-records']]
}

Expand Down Expand Up @@ -199,6 +204,21 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec {
}
}
void "test kafka consumer with configurable number of threads"() {
expect:
context.getBean(MyConsumer6).kafkaConsumers.size() == 3
}
void "test kafka consumer with fixed number of threads"() {
expect:
context.getBean(MyConsumer7).kafkaConsumers.size() == 2
}
void "test kafka consumer with both thread settings set"() {
expect:
context.getBean(MyConsumer8).kafkaConsumers.size() == 3
}
@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaClient
static interface MyClient {
Expand Down Expand Up @@ -282,6 +302,72 @@ class KafkaListenerSpec extends AbstractEmbeddedServerSpec {
}
}
@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaListener(clientId = "kafka-consumer-with-configurable-number-of-threads", threadsValue = '${my.thread.count}')
static class MyConsumer6 implements BeanCreatedEventListener<Consumer> {
List<KafkaConsumer> kafkaConsumers = []
@Override
Consumer onCreated(BeanCreatedEvent<Consumer> event) {
if (event.bean instanceof KafkaConsumer) {
final consumer = ((KafkaConsumer) event.bean)
if (consumer.clientId.startsWith("kafka-consumer-with-configurable-number-of-threads")) {
kafkaConsumers << consumer
}
}
return event.bean
}
@Topic("words")
void consume(String sentence) {
// Do nothing
}
}
@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaListener(clientId = "kafka-consumer-with-fixed-number-of-threads", threads = 2)
static class MyConsumer7 implements BeanCreatedEventListener<Consumer> {
List<KafkaConsumer> kafkaConsumers = []
@Override
Consumer onCreated(BeanCreatedEvent<Consumer> event) {
if (event.bean instanceof KafkaConsumer) {
final consumer = ((KafkaConsumer) event.bean)
if (consumer.clientId.startsWith("kafka-consumer-with-fixed-number-of-threads")) {
kafkaConsumers << consumer
}
}
return event.bean
}
@Topic("words")
void consume(String sentence) {
// Do nothing
}
}
@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaListener(clientId = "kafka-consumer-with-both-thread-settings-set", threads = 10, threadsValue = '${my.thread.count}')
static class MyConsumer8 implements BeanCreatedEventListener<Consumer> {
List<KafkaConsumer> kafkaConsumers = []
@Override
Consumer onCreated(BeanCreatedEvent<Consumer> event) {
if (event.bean instanceof KafkaConsumer) {
final consumer = ((KafkaConsumer) event.bean)
if (consumer.clientId.startsWith("kafka-consumer-with-both-thread-settings-set")) {
kafkaConsumers << consumer
}
}
return event.bean
}
@Topic("words")
void consume(String sentence) {
// Do nothing
}
}
@Requires(property = 'spec.name', value = 'KafkaListenerSpec')
@KafkaListener(offsetReset = EARLIEST)
static class PojoConsumer2 {
Expand Down
10 changes: 10 additions & 0 deletions src/main/docs/guide/kafkaListener.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ The above example will create 10 link:{kafkaapi}/org/apache/kafka/clients/consum

NOTE: @KafkaListener beans are by default singleton. When using multiple threads you must either synchronize access to local state or declare the bean as `@Prototype`.

You can also make your number of threads configurable by using `threadsValue`:

.Dynamically Configuring Threads
[source,java]
----
@KafkaListener(groupId="myGroup", threadsValue = "${my.thread.count}")
----

NOTE: `threads` will be overridden by `threadsValue` if they are both set.

By default Micronaut will inspect the method signature of the method annotated with `@Topic` that will listen for `ConsumerRecord` instances and from the types infer an appropriate key and value link:{kafkaapi}/org/apache/kafka/common/serialization/Deserializer.html[Deserializer].

.Applying Configuration
Expand Down

0 comments on commit 968a4b4

Please sign in to comment.