diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc index 7b2bb36502..07f0d200bd 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc @@ -88,6 +88,11 @@ double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total") A similar listener is provided for the `StreamsBuilderFactoryBean` - see xref:streams.adoc#streams-micrometer[KafkaStreams Micrometer Support]. +Starting with version 3.3, a `KafkaMetricsSupport` abstract class is introduced to manage `io.micrometer.core.instrument.binder.kafka.KafkaMetrics` binding into a `MeterRegistry` for provided Kafka client. +This class is a super for the mentioned above `MicrometerConsumerListener`, `MicrometerProducerListener` and `KafkaStreamsMicrometerListener`. +However, it can be used for any Kafka client use-cases. +The class needs to be extended and its `bindClient()` and `unbindClient()` API have to be called to connect Kafka client metrics with a Micrometer collector. + [[observation]] == Micrometer Observation diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 0bac8459a3..c80ac18c77 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -58,6 +58,13 @@ When using `KafkaStreamsCustomizer` it is now possible to return a custom implem [[x33-kafka-headers-for-batch-listeners]] === KafkaHeaders.DELIVERY_ATTEMPT for batch listeners + When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields. If the `DeliveryAttemptAwareRetryListener` is set to error handler as retry listener, each `ConsumerRecord` has delivery attempt header. -For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempts-header-for-batch-listener[kafka-headers-for-batch-listener]. +For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempts-header-for-batch-listener[Kafka Headers for Batch Listener]. + +[[x33-task-scheduler-for-kafka-metrics]] +=== Kafka Metrics Listeners and `TaskScheduler` + +The `MicrometerProducerListener`, `MicrometerConsumerListener` and `KafkaStreamsMicrometerListener` can now be configured with a `TaskScheduler`. +See `KafkaMetricsSupport` JavaDocs and xref:kafka/micrometer.adoc[Micrometer Support] for more information. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaMetricsSupport.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaMetricsSupport.java new file mode 100644 index 0000000000..dc3de51fd1 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaMetricsSupport.java @@ -0,0 +1,214 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; + +import org.springframework.lang.Nullable; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.util.Assert; +import org.springframework.util.ReflectionUtils; + +import io.micrometer.core.instrument.ImmutableTag; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.MeterBinder; +import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; + +/** + * An abstract class to manage {@link KafkaClientMetrics}. + * + * @param the Kafka Client type. + * + * @author Artem Bilan + * + * @since 3.3 + * + * @see KafkaClientMetrics + */ +public abstract class KafkaMetricsSupport { + + protected final MeterRegistry meterRegistry; + + protected final List tags; + + @Nullable + protected final ScheduledExecutorService scheduler; + + private final Map metrics = new HashMap<>(); + + /** + * Construct an instance with the provided registry. + * @param meterRegistry the registry. + */ + protected KafkaMetricsSupport(MeterRegistry meterRegistry) { + this(meterRegistry, Collections.emptyList()); + } + + /** + * Construct an instance with the provided {@link MeterRegistry} and {@link TaskScheduler}. + * @param meterRegistry the registry. + * @param taskScheduler the task scheduler. + */ + protected KafkaMetricsSupport(MeterRegistry meterRegistry, TaskScheduler taskScheduler) { + this(meterRegistry, Collections.emptyList(), taskScheduler); + } + + /** + * Construct an instance with the provided {@link MeterRegistry} and tags. + * @param meterRegistry the registry. + * @param tags the tags. + */ + protected KafkaMetricsSupport(MeterRegistry meterRegistry, List tags) { + Assert.notNull(meterRegistry, "The 'meterRegistry' cannot be null"); + this.meterRegistry = meterRegistry; + this.tags = tags; + this.scheduler = null; + } + + /** + * Construct an instance with the provided {@link MeterRegistry}, tags and {@link TaskScheduler}. + * @param meterRegistry the registry. + * @param tags the tags. + * @param taskScheduler the task scheduler. + */ + protected KafkaMetricsSupport(MeterRegistry meterRegistry, List tags, TaskScheduler taskScheduler) { + Assert.notNull(meterRegistry, "The 'meterRegistry' cannot be null"); + Assert.notNull(taskScheduler, "The 'taskScheduler' cannot be null"); + this.meterRegistry = meterRegistry; + this.tags = tags; + this.scheduler = obtainScheduledExecutorService(taskScheduler); + } + + /** + * Bind metrics for the Apache Kafka client with provided id. + * @param id the unique identifier for the client to manage in store. + * @param client the Kafka client instance to bind. + */ + protected final void bindClient(String id, C client) { + if (!this.metrics.containsKey(id)) { + List clientTags = new ArrayList<>(this.tags); + clientTags.add(new ImmutableTag("spring.id", id)); + this.metrics.put(id, createClientMetrics(client, clientTags)); + this.metrics.get(id).bindTo(this.meterRegistry); + } + } + + /** + * Create a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} instance + * for the provided Kafka client and metric tags. + * By default, this factory is aware of {@link Consumer}, {@link Producer} and {@link AdminClient} types. + * For other use-case this method can be overridden. + * @param client the client to create a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} instance for. + * @param tags the tags for the {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics}. + * @return the {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics}. + */ + protected MeterBinder createClientMetrics(C client, List tags) { + if (client instanceof Consumer consumer) { + return createConsumerMetrics(consumer, tags); + } + else if (client instanceof Producer producer) { + return createProducerMetrics(producer, tags); + } + else if (client instanceof AdminClient admin) { + return createAdminMetrics(admin, tags); + } + + throw new IllegalArgumentException("Unsupported client type: " + client.getClass()); + } + + private KafkaClientMetrics createConsumerMetrics(Consumer consumer, List tags) { + return this.scheduler != null + ? new KafkaClientMetrics(consumer, tags, this.scheduler) + : new KafkaClientMetrics(consumer, tags); + } + + private KafkaClientMetrics createProducerMetrics(Producer producer, List tags) { + return this.scheduler != null + ? new KafkaClientMetrics(producer, tags, this.scheduler) + : new KafkaClientMetrics(producer, tags); + } + + private KafkaClientMetrics createAdminMetrics(AdminClient adminClient, List tags) { + return this.scheduler != null + ? new KafkaClientMetrics(adminClient, tags, this.scheduler) + : new KafkaClientMetrics(adminClient, tags); + } + + /** + * Unbind a {@code io.micrometer.core.instrument.binder.kafka.KafkaMetrics} for the provided Kafka client. + * @param id the unique identifier for the client to manage in store. + * @param client the Kafka client instance to unbind. + */ + protected final void unbindClient(String id, C client) { + AutoCloseable removed = (AutoCloseable) this.metrics.remove(id); + if (removed != null) { + try { + removed.close(); + } + catch (Exception ex) { + ReflectionUtils.rethrowRuntimeException(ex); + } + } + } + + private static ScheduledExecutorService obtainScheduledExecutorService(TaskScheduler taskScheduler) { + if (taskScheduler instanceof ThreadPoolTaskScheduler threadPoolTaskScheduler) { + return threadPoolTaskScheduler.getScheduledExecutor(); + } + + return new ScheduledExecutorServiceAdapter(taskScheduler); + } + + private static final class ScheduledExecutorServiceAdapter extends ScheduledThreadPoolExecutor { + + private final TaskScheduler delegate; + + private ScheduledExecutorServiceAdapter(TaskScheduler delegate) { + super(0); + this.delegate = delegate; + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, + long initialDelay, + long period, + TimeUnit unit) { + + return this.delegate.scheduleAtFixedRate(command, + Instant.now().plus(initialDelay, unit.toChronoUnit()), + Duration.of(period, unit.toChronoUnit())); + } + + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java index bfbf4ea874..bdf3e29cd2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerConsumerListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,15 +16,14 @@ package org.springframework.kafka.core; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; + import org.apache.kafka.clients.consumer.Consumer; -import io.micrometer.core.instrument.ImmutableTag; +import org.springframework.scheduling.TaskScheduler; + import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; @@ -36,16 +35,12 @@ * @param the value type. * * @author Gary Russell - * @since 2.5 + * @author Artem Bilan * + * @since 2.5 */ -public class MicrometerConsumerListener implements ConsumerFactory.Listener { - - private final MeterRegistry meterRegistry; - - private final List tags; - - private final Map metrics = new HashMap<>(); +public class MicrometerConsumerListener extends KafkaMetricsSupport> + implements ConsumerFactory.Listener { /** * Construct an instance with the provided registry. @@ -55,32 +50,44 @@ public MicrometerConsumerListener(MeterRegistry meterRegistry) { this(meterRegistry, Collections.emptyList()); } + /** + * Construct an instance with the provided registry and task scheduler. + * @param meterRegistry the registry. + * @param taskScheduler the task scheduler. + * @since 3.3 + */ + public MicrometerConsumerListener(MeterRegistry meterRegistry, TaskScheduler taskScheduler) { + this(meterRegistry, Collections.emptyList(), taskScheduler); + } + /** * Construct an instance with the provided registry and tags. * @param meterRegistry the registry. * @param tags the tags. */ public MicrometerConsumerListener(MeterRegistry meterRegistry, List tags) { - this.meterRegistry = meterRegistry; - this.tags = tags; + super(meterRegistry, tags); + } + + /** + * Construct an instance with the provided registry, tags and task scheduler. + * @param meterRegistry the registry. + * @param tags the tags. + * @param taskScheduler the task scheduler. + * @since 3.3 + */ + public MicrometerConsumerListener(MeterRegistry meterRegistry, List tags, TaskScheduler taskScheduler) { + super(meterRegistry, tags, taskScheduler); } @Override public synchronized void consumerAdded(String id, Consumer consumer) { - if (!this.metrics.containsKey(id)) { - List consumerTags = new ArrayList<>(this.tags); - consumerTags.add(new ImmutableTag("spring.id", id)); - this.metrics.put(id, new KafkaClientMetrics(consumer, consumerTags)); - this.metrics.get(id).bindTo(this.meterRegistry); - } + bindClient(id, consumer); } @Override public synchronized void consumerRemoved(String id, Consumer consumer) { - KafkaClientMetrics removed = this.metrics.remove(id); - if (removed != null) { - removed.close(); - } + unbindClient(id, consumer); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerProducerListener.java b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerProducerListener.java index 34af190512..343378a110 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerProducerListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/MicrometerProducerListener.java @@ -16,15 +16,13 @@ package org.springframework.kafka.core; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.kafka.clients.producer.Producer; -import io.micrometer.core.instrument.ImmutableTag; +import org.springframework.scheduling.TaskScheduler; + import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; @@ -36,16 +34,12 @@ * @param the value type. * * @author Gary Russell - * @since 2.5 + * @author Artem Bilan * + * @since 2.5 */ -public class MicrometerProducerListener implements ProducerFactory.Listener { - - private final MeterRegistry meterRegistry; - - private final List tags; - - private final Map metrics = new HashMap<>(); +public class MicrometerProducerListener extends KafkaMetricsSupport> + implements ProducerFactory.Listener { /** * Construct an instance with the provided registry. @@ -55,32 +49,44 @@ public MicrometerProducerListener(MeterRegistry meterRegistry) { this(meterRegistry, Collections.emptyList()); } + /** + * Construct an instance with the provided registry and task scheduler. + * @param meterRegistry the registry. + * @param taskScheduler the task scheduler. + * @since 3.3 + */ + public MicrometerProducerListener(MeterRegistry meterRegistry, TaskScheduler taskScheduler) { + this(meterRegistry, Collections.emptyList(), taskScheduler); + } + /** * Construct an instance with the provided registry and tags. * @param meterRegistry the registry. * @param tags the tags. */ public MicrometerProducerListener(MeterRegistry meterRegistry, List tags) { - this.meterRegistry = meterRegistry; - this.tags = tags; + super(meterRegistry, tags); + } + + /** + * Construct an instance with the provided registry, tags and task scheduler. + * @param meterRegistry the registry. + * @param tags the tags. + * @param taskScheduler the task scheduler. + * @since 3.3 + */ + public MicrometerProducerListener(MeterRegistry meterRegistry, List tags, TaskScheduler taskScheduler) { + super(meterRegistry, tags, taskScheduler); } @Override public synchronized void producerAdded(String id, Producer producer) { - if (!this.metrics.containsKey(id)) { - List producerTags = new ArrayList<>(this.tags); - producerTags.add(new ImmutableTag("spring.id", id)); - this.metrics.put(id, new KafkaClientMetrics(producer, producerTags)); - this.metrics.get(id).bindTo(this.meterRegistry); - } + bindClient(id, producer); } @Override public synchronized void producerRemoved(String id, Producer producer) { - KafkaClientMetrics removed = this.metrics.remove(id); - if (removed != null) { - removed.close(); - } + unbindClient(id, producer); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsMicrometerListener.java b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsMicrometerListener.java index d290e3e27e..d25dd60dc8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsMicrometerListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/streams/KafkaStreamsMicrometerListener.java @@ -16,35 +16,31 @@ package org.springframework.kafka.streams; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.kafka.streams.KafkaStreams; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.KafkaMetricsSupport; +import org.springframework.scheduling.TaskScheduler; -import io.micrometer.core.instrument.ImmutableTag; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.MeterBinder; import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics; /** * Creates a {@link KafkaStreamsMetrics} for the {@link KafkaStreams}. * * @author Gary Russell + * @author Artem Bilan + * * @since 2.5.3 * */ -public class KafkaStreamsMicrometerListener implements StreamsBuilderFactoryBean.Listener { - - private final MeterRegistry meterRegistry; - - private final List tags; - - private final Map metrics = new HashMap<>(); +public class KafkaStreamsMicrometerListener extends KafkaMetricsSupport + implements StreamsBuilderFactoryBean.Listener { /** * Construct an instance with the provided registry. @@ -54,32 +50,51 @@ public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry) { this(meterRegistry, Collections.emptyList()); } + /** + * Construct an instance with the provided registry and task scheduler. + * @param meterRegistry the registry. + * @param taskScheduler the task scheduler. + * @since 3.3 + */ + public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry, TaskScheduler taskScheduler) { + this(meterRegistry, Collections.emptyList(), taskScheduler); + } + /** * Construct an instance with the provided registry and tags. * @param meterRegistry the registry. * @param tags the tags. */ public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry, List tags) { - this.meterRegistry = meterRegistry; - this.tags = tags; + super(meterRegistry, tags); + } + + /** + * Construct an instance with the provided registry, tags and task scheduler. + * @param meterRegistry the registry. + * @param tags the tags. + * @param taskScheduler the task scheduler. + * @since 3.3 + */ + public KafkaStreamsMicrometerListener(MeterRegistry meterRegistry, List tags, TaskScheduler taskScheduler) { + super(meterRegistry, tags, taskScheduler); } @Override public synchronized void streamsAdded(String id, KafkaStreams kafkaStreams) { - if (!this.metrics.containsKey(id)) { - List streamsTags = new ArrayList<>(this.tags); - streamsTags.add(new ImmutableTag("spring.id", id)); - this.metrics.put(id, new KafkaStreamsMetrics(kafkaStreams, streamsTags)); - this.metrics.get(id).bindTo(this.meterRegistry); - } + bindClient(id, kafkaStreams); + } + + @Override + protected MeterBinder createClientMetrics(KafkaStreams client, List tags) { + return this.scheduler != null + ? new KafkaStreamsMetrics(client, tags, this.scheduler) + : new KafkaStreamsMetrics(client, tags); } @Override public synchronized void streamsRemoved(String id, KafkaStreams streams) { - KafkaStreamsMetrics removed = this.metrics.remove(id); - if (removed != null) { - removed.close(); - } + unbindClient(id, streams); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 6c6f702655..45ab5a2045 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -152,6 +152,7 @@ import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.TestPropertySource; @@ -1489,8 +1490,9 @@ public DefaultKafkaConsumerFactory bytesStringConsumerFactory() Map configs = consumerConfigs(); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(configs); - cf.addListener(new MicrometerConsumerListener(meterRegistry(), - Collections.singletonList(new ImmutableTag("consumerTag", "bytesString")))); + cf.addListener(new MicrometerConsumerListener<>(meterRegistry(), + Collections.singletonList(new ImmutableTag("consumerTag", "bytesString")), + new SimpleAsyncTaskScheduler())); return cf; } @@ -1580,7 +1582,8 @@ public ProducerFactory bytesStringProducerFactory() { configs.put(ProducerConfig.CLIENT_ID_CONFIG, "bsPF"); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(configs); pf.addListener(new MicrometerProducerListener<>(meterRegistry(), - Collections.singletonList(new ImmutableTag("producerTag", "bytesString")))); + Collections.singletonList(new ImmutableTag("producerTag", "bytesString")), + new SimpleAsyncTaskScheduler())); return pf; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java index c12075ad62..6d357b6271 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/config/KafkaStreamsCustomizerTests.java @@ -49,6 +49,7 @@ import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -157,7 +158,8 @@ public void configureTopology(Topology topology) { }); streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry(), - Collections.singletonList(new ImmutableTag("customTag", "stream")))); + Collections.singletonList(new ImmutableTag("customTag", "stream")), + new SimpleAsyncTaskScheduler())); return streamsBuilderFactoryBean; }