diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaMessage.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaMessage.java new file mode 100644 index 000000000..2325c4ce4 --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaMessage.java @@ -0,0 +1,158 @@ +/* + * Copyright 2017-2021 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka; + +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; + +import java.util.Map; + +/** + * Message payload representation. + * + * @param The key type + * @param The value type + * @author Denis Stepanov + * @since 4.1.0 + */ +public final class KafkaMessage { + + private final String topic; + private final K key; + private final V body; + private final Integer partition; + private final Long timestamp; + private final Map headers; + + /** + * The default constructor. + * + * @param topic The topic + * @param key The key + * @param body The body + * @param partition The partition + * @param timestamp The timestamp + * @param headers The headers + */ + public KafkaMessage(@Nullable String topic, @Nullable K key, @Nullable V body, @Nullable Integer partition, + @Nullable Long timestamp, @Nullable Map headers) { + this.topic = topic; + this.key = key; + this.body = body; + this.partition = partition; + this.timestamp = timestamp; + this.headers = headers; + } + + @Nullable + public String getTopic() { + return topic; + } + + @Nullable + public K getKey() { + return key; + } + + @Nullable + public V getBody() { + return body; + } + + @Nullable + public Integer getPartition() { + return partition; + } + + @Nullable + public Long getTimestamp() { + return timestamp; + } + + @Nullable + public Map getHeaders() { + return headers; + } + + /** + * The message builder. + * @param The key type + * @param The value type + */ + public static final class Builder { + private String topic; + private K key; + private V body; + private Integer partition; + private Long timestamp; + private Map headers; + + @NonNull + public static Builder withBody(@Nullable F body) { + Builder builder = new Builder<>(); + builder.body = body; + return builder; + } + + @NonNull + public static Builder withoutBody() { + Builder builder = new Builder<>(); + return builder; + } + + @NonNull + public Builder topic(@Nullable String topic) { + this.topic = topic; + return this; + } + + @NonNull + public Builder key(@Nullable K key) { + this.key = key; + return this; + } + + @NonNull + public Builder body(@Nullable V body) { + this.body = body; + return this; + } + + @NonNull + public Builder header(@Nullable Map headers) { + this.headers = headers; + return this; + } + + @NonNull + public Builder partition(@Nullable Integer partition) { + this.partition = partition; + return this; + } + + @NonNull + public Builder timestamp(@Nullable Long timestamp) { + this.timestamp = timestamp; + return this; + } + + @NonNull + public KafkaMessage build() { + return new KafkaMessage(topic, key, body, partition, timestamp, headers); + } + } + +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java index cc900b773..60733e871 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java @@ -32,6 +32,7 @@ import io.micronaut.inject.FieldInjectionPoint; import io.micronaut.inject.InjectionPoint; import io.micronaut.inject.qualifiers.Qualifiers; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -54,7 +55,7 @@ * @since 1.0 */ @Factory -public class KafkaProducerFactory implements ProducerRegistry { +public class KafkaProducerFactory implements ProducerRegistry, TransactionalProducerRegistry { private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFactory.class); private final Map clients = new ConcurrentHashMap<>(); private final BeanContext beanContext; @@ -126,16 +127,16 @@ public Producer getProducer( throw new ConfigurationException("@KafkaClient used on type missing generic argument values for Key and Value: " + injectionPoint); } final String id = injectionPoint.getAnnotationMetadata().stringValue(KafkaClient.class).orElse(null); - return getKafkaProducer(id, k, v); + return getKafkaProducer(id, null, k, v, false); } @SuppressWarnings("unchecked") - private T getKafkaProducer(@Nullable String id, Argument keyType, Argument valueType) { + private T getKafkaProducer(@Nullable String id, @Nullable String transactionalId, Argument keyType, Argument valueType, boolean transactional) { ClientKey key = new ClientKey( id, keyType.getType(), - valueType.getType() - ); + valueType.getType(), + transactional); return (T) clients.computeIfAbsent(key, clientKey -> { Supplier defaultResolver = () -> beanContext.getBean(AbstractKafkaProducerConfiguration.class); @@ -163,10 +164,21 @@ private T getKafkaProducer(@Nullable String id, Argument keyType, Argumen newConfig.setValueSerializer(valueSerializer); } + if (StringUtils.isNotEmpty(transactionalId)) { + properties.putIfAbsent(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + properties.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + } + if (hasId) { properties.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, id); } - return beanContext.createBean(Producer.class, newConfig); + + Producer producer = beanContext.createBean(Producer.class, newConfig); + if (transactional) { + producer.initTransactions(); + } + + return producer; }); } @@ -187,9 +199,23 @@ protected void stop() { @Override public Producer getProducer(String id, Argument keyType, Argument valueType) { - return getKafkaProducer(id, keyType, valueType); + return getKafkaProducer(id, null, keyType, valueType, false); } + @Override + public Producer getTransactionalProducer(String id, String transactionalId, Argument keyType, Argument valueType) { + return getKafkaProducer(id, transactionalId, keyType, valueType, true); + } + + @Override + public void close(Producer producer) { + for (Map.Entry e : clients.entrySet()) { + if (e.getValue() == producer) { + clients.remove(e.getKey()); + break; + } + } + } /** * key for retrieving built producers. @@ -201,11 +227,13 @@ private static final class ClientKey { private final String id; private final Class keyType; private final Class valueType; + private final boolean transactional; - ClientKey(String id, Class keyType, Class valueType) { + ClientKey(String id, Class keyType, Class valueType, boolean transactional) { this.id = id; this.keyType = keyType; this.valueType = valueType; + this.transactional = transactional; } @Override @@ -219,13 +247,13 @@ public boolean equals(Object o) { ClientKey clientKey = (ClientKey) o; return Objects.equals(id, clientKey.id) && Objects.equals(keyType, clientKey.keyType) && - Objects.equals(valueType, clientKey.valueType); + Objects.equals(valueType, clientKey.valueType) && + Objects.equals(transactional, clientKey.transactional); } @Override public int hashCode() { - - return Objects.hash(id, keyType, valueType); + return Objects.hash(id, keyType, valueType, transactional); } } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/TransactionalProducerRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/TransactionalProducerRegistry.java new file mode 100644 index 000000000..b9a22dc4c --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/TransactionalProducerRegistry.java @@ -0,0 +1,56 @@ +/* + * Copyright 2017-2020 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka; + +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.type.Argument; +import org.apache.kafka.clients.producer.Producer; + +/** + * A registry of managed transactional {@link Producer} instances key by id and type. + * + * The producer instance returned will have transactions initialized by executing {@link Producer#initTransactions()}. + * + * @author Denis Stepanov + * @since 4.1.0 + */ +public interface TransactionalProducerRegistry { + + /** + * Returns a transactional managed Producer. + * + * @param clientId The client id of the producer. + * @param transactionalId The transactional id of the producer. + * @param keyType The key type + * @param valueType The value type + * @param The key generic type + * @param The value generic type + * @return The producer + * @since + */ + @NonNull + Producer getTransactionalProducer(@Nullable String clientId, String transactionalId, Argument keyType, Argument valueType); + + /** + * Closed the producer. + * Should be used for cases when {@link org.apache.kafka.common.errors.ProducerFencedException} is thrown. + * + * @param producer The producer + */ + void close(@NonNull Producer producer); + +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaClient.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaClient.java index 28bf2dd60..89594963e 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaClient.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaClient.java @@ -55,6 +55,17 @@ @AliasFor(member = "value") String id() default ""; + /** + * The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer + * sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. + * If no TransactionalId is provided, then the producer is limited to idempotent delivery. + * If a TransactionalId is configured, enable.idempotence is implied. + * By default, the TransactionId is not configured, which means transactions cannot be used. + * + * @return true to enable transaction + */ + String transactionalId() default ""; + /** * The maximum duration to block synchronous send operations. * diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaListener.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaListener.java index 10f5c2885..5598ffa86 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaListener.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/KafkaListener.java @@ -17,6 +17,8 @@ import io.micronaut.context.annotation.*; import io.micronaut.messaging.annotation.MessageListener; +import org.apache.kafka.common.IsolationLevel; + import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -83,6 +85,35 @@ */ OffsetReset offsetReset() default OffsetReset.LATEST; + /** + * The client id of the producer that is used for {@link io.micronaut.messaging.annotation.SendTo}. + * + * @return the producer client id + */ + String producerClientId() default ""; + + /** + * This setting applies only for the producer that is used for {@link io.micronaut.messaging.annotation.SendTo}. + * + * The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer + * sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. + * If no TransactionalId is provided, then the producer is limited to idempotent delivery. + * If a TransactionalId is configured, enable.idempotence is implied. + * By default, the TransactionId is not configured, which means transactions cannot be used. + * + * @return the producer transaction id + */ + String producerTransactionalId() default ""; + + /** + * Kafka consumer isolation level to control how to read messages written transactionally. + * + * See {@link org.apache.kafka.clients.consumer.ConsumerConfig#ISOLATION_LEVEL_CONFIG}. + * + * @return The isolation level + */ + IsolationLevel isolation() default IsolationLevel.READ_UNCOMMITTED; + /** * Setting the error strategy allows you to resume at the next offset * or to seek the consumer (stop on error) to the failed offset so that diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/OffsetStrategy.java b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/OffsetStrategy.java index 94d99615b..e0cb3a32b 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/OffsetStrategy.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/annotation/OffsetStrategy.java @@ -49,6 +49,11 @@ public enum OffsetStrategy { /** * Asynchronously commit offsets using {@link org.apache.kafka.clients.consumer.Consumer#commitSync()} after each {@link org.apache.kafka.clients.consumer.ConsumerRecord} is consumed. */ - ASYNC_PER_RECORD + ASYNC_PER_RECORD, + /** + * Only applicable for transactional processing in combination with {@link io.micronaut.messaging.annotation.SendTo}. + * Sends offsets to transaction using {@link org.apache.kafka.clients.producer.Producer#sendOffsetsToTransaction(java.util.Map, String)} + */ + SEND_TO_TRANSACTION } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java index 62fba5c9d..eba4ab2bf 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java @@ -32,12 +32,14 @@ import io.micronaut.context.BeanContext; import io.micronaut.core.annotation.AnnotationMetadata; import io.micronaut.core.annotation.AnnotationValue; +import io.micronaut.core.annotation.Nullable; import io.micronaut.core.async.publisher.Publishers; import io.micronaut.core.bind.annotation.Bindable; import io.micronaut.core.convert.ConversionService; import io.micronaut.core.type.Argument; import io.micronaut.core.type.ReturnType; import io.micronaut.core.util.StringUtils; +import io.micronaut.inject.ExecutableMethod; import io.micronaut.inject.qualifiers.Qualifiers; import io.micronaut.messaging.annotation.MessageBody; import io.micronaut.messaging.annotation.MessageHeader; @@ -59,9 +61,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.micronaut.core.annotation.Nullable; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import javax.annotation.PreDestroy; @@ -71,6 +71,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -79,6 +80,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.Function; /** @@ -91,11 +93,12 @@ @InterceptorBean(KafkaClient.class) public class KafkaClientIntroductionAdvice implements MethodInterceptor, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(KafkaClientIntroductionAdvice.class); + private static final ContextSupplier NULL_SUPPLIER = __ -> null; private final BeanContext beanContext; private final SerdeRegistry serdeRegistry; private final ConversionService conversionService; - private final Map producerMap = new ConcurrentHashMap<>(); + private final Map producerMap = new ConcurrentHashMap<>(); /** * Creates the introduction advice for the given arguments. @@ -116,389 +119,263 @@ public KafkaClientIntroductionAdvice( @SuppressWarnings("unchecked") @Override public final Object intercept(MethodInvocationContext context) { - if (context.hasAnnotation(KafkaClient.class)) { if (!context.hasAnnotation(KafkaClient.class)) { throw new IllegalStateException("No @KafkaClient annotation present on method: " + context); } + ProducerState producerState = getProducer(context); - boolean isBatchSend = context.isTrue(KafkaClient.class, "batch"); - - String topic = context.stringValue(Topic.class) - .orElse(null); - - Argument keyArgument = null; - Argument bodyArgument = null; - List
kafkaHeaders = new ArrayList<>(); - List> headers = context.getAnnotationValuesByType(MessageHeader.class); - - for (AnnotationValue header : headers) { - String name = header.stringValue("name").orElse(null); - String value = header.stringValue().orElse(null); - - if (StringUtils.isNotEmpty(name) && StringUtils.isNotEmpty(value)) { - kafkaHeaders.add( - new RecordHeader( - name, - value.getBytes(StandardCharsets.UTF_8) - ) - ); + InterceptedMethod interceptedMethod = InterceptedMethod.of(context); + try { + Argument returnType = interceptedMethod.returnTypeValue(); + if (Argument.OBJECT_ARGUMENT.equalsType(returnType)) { + returnType = Argument.of(RecordMetadata.class); } + switch (interceptedMethod.resultType()) { + case COMPLETION_STAGE: + CompletableFuture completableFuture = returnCompletableFuture(context, producerState, returnType); + return interceptedMethod.handleResult(completableFuture); + case PUBLISHER: + Flux returnFlowable = returnPublisher(context, producerState, returnType); + return interceptedMethod.handleResult(returnFlowable); + case SYNCHRONOUS: + return returnSynchronous(context, producerState); + default: + return interceptedMethod.unsupported(); + } + } catch (Exception e) { + return interceptedMethod.handleException(e); } + } else { + // can't be implemented so proceed + return context.proceed(); + } + } - Argument[] arguments = context.getArguments(); - Object[] parameterValues = context.getParameterValues(); - Object key = null; - Object value = null; - Long timestampArgument = null; - Function partitionSupplier = producer -> null; - for (int i = 0; i < arguments.length; i++) { - Argument argument = arguments[i]; - if (ProducerRecord.class.isAssignableFrom(argument.getType()) || argument.isAnnotationPresent(MessageBody.class)) { - bodyArgument = argument; - value = parameterValues[i]; - } else if (argument.isAnnotationPresent(KafkaKey.class)) { - keyArgument = argument; - key = parameterValues[i]; - } else if (argument.isAnnotationPresent(Topic.class)) { - Object o = parameterValues[i]; - if (o != null) { - topic = o.toString(); - } - } else if (argument.isAnnotationPresent(KafkaTimestamp.class)) { - Object o = parameterValues[i]; - if (o instanceof Long) { - timestampArgument = (Long) o; - } - } else if (argument.isAnnotationPresent(KafkaPartition.class)) { - Object o = parameterValues[i]; - if (o != null && Integer.class.isAssignableFrom(o.getClass())) { - partitionSupplier = __ -> (Integer) o; + private Object returnSynchronous(MethodInvocationContext context, ProducerState producerState) { + ReturnType returnType = context.getReturnType(); + Class javaReturnType = returnType.getType(); + Argument returnTypeArgument = returnType.asArgument(); + Object value = producerState.valueSupplier.get(context); + boolean isReactiveValue = value != null && Publishers.isConvertibleToPublisher(value.getClass()); + if (isReactiveValue) { + Flux sendFlowable = buildSendFluxForReactiveValue(context, producerState, returnTypeArgument, value); + if (Iterable.class.isAssignableFrom(javaReturnType)) { + return conversionService.convert(sendFlowable.collectList().block(), returnTypeArgument).orElse(null); + } else if (void.class.isAssignableFrom(javaReturnType)) { + // a maybe will return null, and not throw an exception + Mono maybe = sendFlowable.next(); + return maybe.block(); + } else { + return conversionService.convert(sendFlowable.blockFirst(), returnTypeArgument).orElse(null); + } + } else { + boolean transactional = producerState.transactional; + Producer kafkaProducer = producerState.kafkaProducer; + try { + if (transactional) { + LOG.trace("Beginning transaction for producer: {}", producerState.transactionalId); + kafkaProducer.beginTransaction(); + } + Object returnValue; + if (producerState.isBatchSend) { + Iterable batchValue; + if (value != null && value.getClass().isArray()) { + batchValue = Arrays.asList((Object[]) value); + } else if (!(value instanceof Iterable)) { + batchValue = Collections.singletonList(value); + } else { + batchValue = (Iterable) value; } - } else if (argument.isAnnotationPresent(KafkaPartitionKey.class)) { - String finalTopic = topic; - Object partitionKey = parameterValues[i]; - if (partitionKey != null) { - Serializer serializer = serdeRegistry.pickSerializer(argument); - if (serializer == null) { - serializer = new ByteArraySerializer(); + + List results = new ArrayList<>(); + for (Object o : batchValue) { + ProducerRecord record = buildProducerRecord(context, producerState, o); + if (LOG.isTraceEnabled()) { + LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record); } - byte[] partitionKeyBytes = serializer.serialize(finalTopic, parameterValues[i]); - partitionSupplier = producer -> Utils.toPositive(Utils.murmur2(partitionKeyBytes)) % producer.partitionsFor(finalTopic).size(); - } - } else if (argument.isAnnotationPresent(MessageHeader.class)) { - final AnnotationMetadata annotationMetadata = argument.getAnnotationMetadata(); - String argumentName = argument.getName(); - String name = annotationMetadata - .stringValue(MessageHeader.class, "name") - .orElseGet(() -> - annotationMetadata.stringValue(MessageHeader.class).orElse(argumentName)); - Object v = parameterValues[i]; - - if (v != null) { - - Serializer serializer = serdeRegistry.pickSerializer(argument); - if (serializer != null) { - - try { - kafkaHeaders.add( - new RecordHeader( - name, - serializer.serialize( - null, - v - ) - ) - ); - } catch (Exception e) { - throw new MessagingClientException( - "Cannot serialize header argument [" + argument + "] for method [" + context + "]: " + e.getMessage(), e - ); - } + + Object result; + if (producerState.maxBlock != null) { + result = kafkaProducer.send(record).get(producerState.maxBlock.toMillis(), TimeUnit.MILLISECONDS); + } else { + result = kafkaProducer.send(record).get(); } + results.add(result); } - } else { - if (argument.isContainerType() && Header.class.isAssignableFrom(argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT).getType())) { - final Collection
parameterValue = (Collection
) parameterValues[i]; - if (parameterValue != null) { - kafkaHeaders.addAll(parameterValue); + returnValue = conversionService.convert(results, returnTypeArgument).orElseGet(() -> { + if (javaReturnType == producerState.bodyArgument.getType()) { + return value; + } else { + return null; } + }); + } else { + ProducerRecord record = buildProducerRecord(context, producerState, value); + + LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record); + + Object result; + if (producerState.maxBlock != null) { + result = kafkaProducer.send(record).get(producerState.maxBlock.toMillis(), TimeUnit.MILLISECONDS); } else { - Class argumentType = argument.getType(); - if (argumentType == Headers.class || argumentType == RecordHeaders.class) { - final Headers parameterValue = (Headers) parameterValues[i]; - if (parameterValue != null) { - parameterValue.forEach(kafkaHeaders::add); - } - } + result = kafkaProducer.send(record).get(); } + returnValue = conversionService.convert(result, returnTypeArgument).orElseGet(() -> { + if (javaReturnType == producerState.bodyArgument.getType()) { + return value; + } else { + return null; + } + }); } - } - if (bodyArgument == null) { - for (int i = 0; i < arguments.length; i++) { - Argument argument = arguments[i]; - if (!argument.getAnnotationMetadata().hasStereotype(Bindable.class)) { - bodyArgument = argument; - value = parameterValues[i]; - break; - } + if (transactional) { + LOG.trace("Committing transaction for producer: {}", producerState.transactionalId); + kafkaProducer.commitTransaction(); } - if (bodyArgument == null) { - throw new MessagingClientException("No valid message body argument found for method: " + context); + return returnValue; + } catch (Exception e) { + if (transactional) { + LOG.trace("Aborting transaction for producer: {}", producerState.transactionalId); + kafkaProducer.abortTransaction(); } + throw wrapException(context, e); } + } + } - Producer kafkaProducer = getProducer(bodyArgument, keyArgument, context); - - Integer partition = partitionSupplier.apply(kafkaProducer); - Long timestamp = context.isTrue(KafkaClient.class, "timestamp") ? Long.valueOf(System.currentTimeMillis()) : timestampArgument; - Duration maxBlock = context.getValue(KafkaClient.class, "maxBlock", Duration.class) - .orElse(null); - - boolean isReactiveValue = value != null && Publishers.isConvertibleToPublisher(value.getClass()); - if (StringUtils.isEmpty(topic)) { - throw new MessagingClientException("No topic specified for method: " + context); - } - - InterceptedMethod interceptedMethod = InterceptedMethod.of(context); - try { - Argument reactiveTypeValue = interceptedMethod.returnTypeValue(); - boolean returnTypeValueVoid = reactiveTypeValue.equalsType(Argument.VOID_OBJECT); - if (Argument.OBJECT_ARGUMENT.equalsType(reactiveTypeValue)) { - reactiveTypeValue = Argument.of(RecordMetadata.class); + private Flux returnPublisher(MethodInvocationContext context, ProducerState producerState, Argument returnType) { + Object value = producerState.valueSupplier.get(context); + boolean isReactiveValue = value != null && Publishers.isConvertibleToPublisher(value.getClass()); + Flux returnFlowable; + if (isReactiveValue) { + returnFlowable = buildSendFluxForReactiveValue(context, producerState, returnType, value); + } else { + if (producerState.isBatchSend) { + // TODO :batch + Object batchValue; + if (value != null && value.getClass().isArray()) { + batchValue = Arrays.asList((Object[]) value); + } else { + batchValue = value; } - switch (interceptedMethod.resultType()) { - case COMPLETION_STAGE: - - CompletableFuture completableFuture = new CompletableFuture(); - - if (isReactiveValue) { - Flux sendFlowable = buildSendFlux( - context, - topic, - kafkaProducer, - kafkaHeaders, - reactiveTypeValue, - key, - partition, - value, - timestamp, - maxBlock); - - if (!Publishers.isSingle(value.getClass())) { - sendFlowable = sendFlowable.collectList().flux(); - } - //noinspection SubscriberImplementation - sendFlowable.subscribe(new Subscriber() { - boolean completed = false; + Flux bodyEmitter; + if (batchValue instanceof Iterable) { + bodyEmitter = Flux.fromIterable((Iterable) batchValue); + } else { + bodyEmitter = Flux.just(batchValue); + } - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } + returnFlowable = bodyEmitter.flatMap(o -> buildSendFlux(context, producerState, o, returnType)); + } else { + returnFlowable = buildSendFlux(context, producerState, value, returnType); + } + } + return returnFlowable; + } - @Override - public void onNext(Object o) { - completableFuture.complete(o); - completed = true; - } + private CompletableFuture returnCompletableFuture(MethodInvocationContext context, ProducerState producerState, Argument returnType) { + CompletableFuture completableFuture = new CompletableFuture<>(); + Object value = producerState.valueSupplier.get(context); + boolean isReactiveValue = value != null && Publishers.isConvertibleToPublisher(value.getClass()); + if (isReactiveValue) { + Flux sendFlowable = buildSendFluxForReactiveValue(context, producerState, returnType, value); - @Override - public void onError(Throwable t) { - completableFuture.completeExceptionally(wrapException(context, t)); - } + if (!Publishers.isSingle(value.getClass())) { + sendFlowable = sendFlowable.collectList().flux(); + } - @Override - public void onComplete() { - if (!completed) { - // empty publisher - completableFuture.complete(null); - } - } - }); - } else { + //noinspection SubscriberImplementation + sendFlowable.subscribe(new Subscriber() { + boolean completed = false; - ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, value, timestamp); - if (LOG.isTraceEnabled()) { - LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record); - } + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } - Argument finalReturnTypeValue = reactiveTypeValue; - Argument finalBodyArgument = bodyArgument; - Object finalValue = value; - kafkaProducer.send(record, (metadata, exception) -> { - if (exception != null) { - completableFuture.completeExceptionally(wrapException(context, exception)); - } else { - if (!returnTypeValueVoid) { - Optional converted = conversionService.convert(metadata, finalReturnTypeValue); - if (converted.isPresent()) { - completableFuture.complete(converted.get()); - } else if (finalReturnTypeValue.getType() == finalBodyArgument.getType()) { - completableFuture.complete(finalValue); - } - } else { - completableFuture.complete(null); - } - } - }); - } + @Override + public void onNext(Object o) { + completableFuture.complete(o); + completed = true; + } - return interceptedMethod.handleResult(completableFuture); - case PUBLISHER: - Flux returnFlowable; - if (isReactiveValue) { - returnFlowable = buildSendFlux( - context, - topic, - kafkaProducer, - kafkaHeaders, - reactiveTypeValue, - key, - partition, - value, - timestamp, - maxBlock); + @Override + public void onError(Throwable t) { + completableFuture.completeExceptionally(wrapException(context, t)); + } - } else { - if (isBatchSend) { - Object batchValue; - if (value != null && value.getClass().isArray()) { - batchValue = Arrays.asList((Object[]) value); - } else { - batchValue = value; - } + @Override + public void onComplete() { + if (!completed) { + // empty publisher + completableFuture.complete(null); + } + } + }); + } else { - Flux bodyEmitter; - if (batchValue instanceof Iterable) { - bodyEmitter = Flux.fromIterable((Iterable) batchValue); - } else { - bodyEmitter = Flux.just(batchValue); - } + ProducerRecord record = buildProducerRecord(context, producerState, value); + if (LOG.isTraceEnabled()) { + LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record); + } - String finalTopic = topic; - Argument finalBodyArgument = bodyArgument; - Object finalKey = key; - Integer finalPartition = partition; - Argument finalReactiveTypeValue = reactiveTypeValue; - returnFlowable = bodyEmitter.flatMap(o -> - buildSendFlux(context, finalTopic, finalBodyArgument, kafkaProducer, kafkaHeaders, finalKey, finalPartition, o, timestamp, finalReactiveTypeValue) - ); - - } else { - returnFlowable = buildSendFlux(context, topic, bodyArgument, kafkaProducer, kafkaHeaders, key, partition, value, timestamp, reactiveTypeValue); - } - } - return interceptedMethod.handleResult(returnFlowable); - case SYNCHRONOUS: - ReturnType returnType = context.getReturnType(); - Class javaReturnType = returnType.getType(); - Argument returnTypeArgument = returnType.asArgument(); - if (isReactiveValue) { - Flux sendFlowable = buildSendFlux( - context, - topic, - kafkaProducer, - kafkaHeaders, - returnTypeArgument, - key, - partition, - value, - timestamp, - maxBlock - ); - - if (Iterable.class.isAssignableFrom(javaReturnType)) { - return conversionService.convert(sendFlowable.collectList().block(), returnTypeArgument).orElse(null); - } else if (void.class.isAssignableFrom(javaReturnType)) { - // a maybe will return null, and not throw an exception - Mono maybe = sendFlowable.next(); - return maybe.block(); - } else { - return conversionService.convert(sendFlowable.blockFirst(), returnTypeArgument).orElse(null); - } + boolean transactional = producerState.transactional; + Producer kafkaProducer = producerState.kafkaProducer; + try { + if (transactional) { + LOG.trace("Beginning transaction for producer: {}", producerState.transactionalId); + kafkaProducer.beginTransaction(); + } + kafkaProducer.send(record, (metadata, exception) -> { + if (exception != null) { + completableFuture.completeExceptionally(wrapException(context, exception)); + } else { + if (returnType.equalsType(Argument.VOID_OBJECT)) { + completableFuture.complete(null); } else { - try { - if (isBatchSend) { - Iterable batchValue; - if (value != null && value.getClass().isArray()) { - batchValue = Arrays.asList((Object[]) value); - } else if (!(value instanceof Iterable)) { - batchValue = Collections.singletonList(value); - } else { - batchValue = (Iterable) value; - } - - List results = new ArrayList(); - for (Object o : batchValue) { - ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, o, timestamp); - - if (LOG.isTraceEnabled()) { - LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record); - } - - Object result; - if (maxBlock != null) { - result = kafkaProducer.send(record).get(maxBlock.toMillis(), TimeUnit.MILLISECONDS); - } else { - result = kafkaProducer.send(record).get(); - } - results.add(result); - } - Argument finalBodyArgument = bodyArgument; - Object finalValue = value; - return conversionService.convert(results, returnTypeArgument).orElseGet(() -> { - if (javaReturnType == finalBodyArgument.getType()) { - return finalValue; - } else { - return null; - } - }); - } - ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, value, timestamp); - - LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record); - - Object result; - if (maxBlock != null) { - result = kafkaProducer.send(record).get(maxBlock.toMillis(), TimeUnit.MILLISECONDS); - } else { - result = kafkaProducer.send(record).get(); - } - Argument finalBodyArgument = bodyArgument; - Object finalValue = value; - return conversionService.convert(result, returnTypeArgument).orElseGet(() -> { - if (javaReturnType == finalBodyArgument.getType()) { - return finalValue; - } else { - return null; - } - }); - } catch (Exception e) { - throw wrapException(context, e); + Optional converted = conversionService.convert(metadata, returnType); + if (converted.isPresent()) { + completableFuture.complete(converted.get()); + } else if (returnType.getType() == producerState.bodyArgument.getType()) { + completableFuture.complete(value); } } - default: - return interceptedMethod.unsupported(); + } + }); + if (transactional) { + LOG.trace("Committing transaction for producer: {}", producerState.transactionalId); + kafkaProducer.commitTransaction(); } } catch (Exception e) { - return interceptedMethod.handleException(e); + if (transactional) { + LOG.trace("Aborting transaction for producer: {}", producerState.transactionalId); + kafkaProducer.abortTransaction(); + } + throw e; } - } else { - // can't be implemented so proceed - return context.proceed(); } + return completableFuture; + } + + private Mono producerSend(Producer producer, ProducerRecord record) { + return Mono.create(emitter -> producer.send(record, (metadata, exception) -> { + if (exception != null) { + emitter.error(exception); + } else { + emitter.success(metadata); + } + })); } @Override @PreDestroy public final void close() { - Collection kafkaProducers = producerMap.values(); try { - for (Producer kafkaProducer : kafkaProducers) { + for (ProducerState producerState : producerMap.values()) { try { - kafkaProducer.close(); + producerState.kafkaProducer.close(); } catch (Exception e) { LOG.warn("Error closing Kafka producer: {}", e.getMessage(), e); } @@ -508,117 +385,247 @@ public final void close() { } } - private Flux buildSendFlux( - MethodInvocationContext context, - String topic, - Argument bodyArgument, - Producer kafkaProducer, - List
kafkaHeaders, - Object key, - Integer partition, - Object value, - Long timestamp, - Argument reactiveValueType) { - Flux returnFlowable; - ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, value, timestamp); - returnFlowable = Flux.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> { - if (exception != null) { - emitter.error(wrapException(context, exception)); - } else { - if (!reactiveValueType.equalsType(Argument.VOID_OBJECT)) { - Optional converted = conversionService.convert(metadata, reactiveValueType); - - if (converted.isPresent()) { - emitter.next(converted.get()); - } else if (reactiveValueType.getType() == bodyArgument.getType()) { - emitter.next(value); - } - } - emitter.complete(); + private Flux buildSendFlux(MethodInvocationContext context, ProducerState producerState, Object value, Argument returnType) { + ProducerRecord record = buildProducerRecord(context, producerState, value); + return Flux.defer(() -> { + boolean transactional = producerState.transactional; + Producer kafkaProducer = producerState.kafkaProducer; + if (transactional) { + LOG.trace("Committing transaction for producer: {}", producerState.transactionalId); + kafkaProducer.beginTransaction(); } - }), FluxSink.OverflowStrategy.ERROR); - return returnFlowable; + Mono result = producerSend(kafkaProducer, record) + .map(metadata -> convertResult(metadata, returnType, value, producerState.bodyArgument)) + .onErrorMap(e -> wrapException(context, e)); + if (transactional) { + return addTransactionalProcessing(producerState, result.flux()); + } + return result; + }); } - private Flux buildSendFlux( - MethodInvocationContext context, - String topic, - Producer kafkaProducer, - List
kafkaHeaders, - Argument returnType, - Object key, - Integer partition, - Object value, - Long timestamp, - Duration maxBlock) { + private Flux buildSendFluxForReactiveValue(MethodInvocationContext context, ProducerState producerState, Argument returnType, Object value) { Flux valueFlowable = Flux.from(Publishers.convertPublisher(value, Publisher.class)); Class javaReturnType = returnType.getType(); if (Iterable.class.isAssignableFrom(javaReturnType)) { - javaReturnType = returnType.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT).getType(); + returnType = returnType.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT); + } + boolean transactional = producerState.transactional; + Producer kafkaProducer = producerState.kafkaProducer; + + if (transactional) { + LOG.trace("Beginning transaction for producer: {}", producerState.transactionalId); + kafkaProducer.beginTransaction(); } - Class finalJavaReturnType = javaReturnType; + Argument finalReturnType = returnType; Flux sendFlowable = valueFlowable.flatMap(o -> { - ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, o, timestamp); + ProducerRecord record = buildProducerRecord(context, producerState, o); LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record); - //noinspection unchecked - return Flux.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> { - if (exception != null) { - emitter.error(wrapException(context, exception)); - } else { - if (RecordMetadata.class.isAssignableFrom(finalJavaReturnType)) { - emitter.next(metadata); - } else if (finalJavaReturnType.isInstance(o)) { - emitter.next(o); - } else { - Optional converted = conversionService.convert(metadata, finalJavaReturnType); - if (converted.isPresent()) { - emitter.next(converted.get()); - } - } - - emitter.complete(); - } - }), FluxSink.OverflowStrategy.BUFFER); + return producerSend(kafkaProducer, record) + .map(metadata -> convertResult(metadata, finalReturnType, o, producerState.bodyArgument)) + .onErrorMap(e -> wrapException(context, e)); }); - - if (maxBlock != null) { - sendFlowable = sendFlowable.timeout(maxBlock); + if (transactional) { + sendFlowable = addTransactionalProcessing(producerState, sendFlowable); + } + if (producerState.maxBlock != null) { + sendFlowable = sendFlowable.timeout(producerState.maxBlock); } return sendFlowable; } + private Flux addTransactionalProcessing(ProducerState producerState, Flux sendFlowable) { + return sendFlowable.doOnError(throwable -> { + LOG.trace("Aborting transaction for producer: {}", producerState.transactionalId); + producerState.kafkaProducer.abortTransaction(); + }) + .doOnComplete(() -> { + LOG.trace("Committing transaction for producer: {}", producerState.transactionalId); + producerState.kafkaProducer.commitTransaction(); + }); + } + + private Object convertResult(RecordMetadata metadata, Argument returnType, Object value, Argument valueArgument) { + if (returnType.isVoid()) { + return metadata; + } + if (RecordMetadata.class.isAssignableFrom(returnType.getType())) { + return metadata; + } else if (returnType.getType() == valueArgument.getType()) { + return value; + } else { + return conversionService.convertRequired(metadata, returnType); + } + } + private MessagingClientException wrapException(MethodInvocationContext context, Throwable exception) { return new MessagingClientException( "Exception sending producer record for method [" + context + "]: " + exception.getMessage(), exception ); } - @SuppressWarnings("unchecked") - private ProducerRecord buildProducerRecord(String topic, Integer partition, List
kafkaHeaders, Object key, Object value, Long timestamp) { - return new ProducerRecord( - topic, - partition, - timestamp, - key, + private ProducerRecord buildProducerRecord(MethodInvocationContext context, ProducerState producerState, Object value) { + return new ProducerRecord<>( + producerState.topicSupplier.get(context), + producerState.partitionSupplier.get(context), + producerState.timestampSupplier.get(context), + producerState.keySupplier.get(context), value, - kafkaHeaders.isEmpty() ? null : kafkaHeaders + producerState.headersSupplier.get(context) ); } @SuppressWarnings("unchecked") - private Producer getProducer(Argument bodyArgument, @Nullable Argument keyArgument, AnnotationMetadata metadata) { - Class keyType = keyArgument != null ? keyArgument.getType() : byte[].class; - String clientId = metadata.stringValue(KafkaClient.class).orElse(null); - ProducerKey key = new ProducerKey(keyType, bodyArgument.getType(), clientId); + private ProducerState getProducer(MethodInvocationContext context) { + ProducerKey key = new ProducerKey(context.getTarget(), context.getExecutableMethod()); return producerMap.computeIfAbsent(key, producerKey -> { - String producerId = producerKey.id; + String clientId = context.stringValue(KafkaClient.class).orElse(null); + + List>> headersSuppliers = new LinkedList<>(); + List> headers = context.getAnnotationValuesByType(MessageHeader.class); + + if (!headers.isEmpty()) { + List
kafkaHeaders = new ArrayList<>(headers.size()); + for (AnnotationValue header : headers) { + String name = header.stringValue("name").orElse(null); + String value = header.stringValue().orElse(null); + + if (StringUtils.isNotEmpty(name) && StringUtils.isNotEmpty(value)) { + kafkaHeaders.add(new RecordHeader(name, value.getBytes(StandardCharsets.UTF_8))); + } + } + if (!kafkaHeaders.isEmpty()) { + headersSuppliers.add(ctx -> kafkaHeaders); + } + } + + Argument keyArgument = null; + Argument bodyArgument = null; + ContextSupplier[] topicSupplier = new ContextSupplier[1]; + topicSupplier[0] = ctx -> ctx.stringValue(Topic.class).filter(StringUtils::isNotEmpty) + .orElseThrow(() -> new MessagingClientException("No topic specified for method: " + context)); + ContextSupplier keySupplier = NULL_SUPPLIER; + ContextSupplier valueSupplier = NULL_SUPPLIER; + ContextSupplier timestampSupplier = NULL_SUPPLIER; + BiFunction, Producer, Integer> partitionFromProducerFn = (ctx, producer) -> null; + Argument[] arguments = context.getArguments(); + for (int i = 0; i < arguments.length; i++) { + int finalI = i; + Argument argument = arguments[i]; + if (ProducerRecord.class.isAssignableFrom(argument.getType()) || argument.isAnnotationPresent(MessageBody.class)) { + bodyArgument = argument.isAsyncOrReactive() ? argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT) : argument; + valueSupplier = ctx -> ctx.getParameterValues()[finalI]; + } else if (argument.isAnnotationPresent(KafkaKey.class)) { + keyArgument = argument; + keySupplier = ctx -> ctx.getParameterValues()[finalI]; + } else if (argument.isAnnotationPresent(Topic.class)) { + ContextSupplier prevTopicSupplier = topicSupplier[0]; + topicSupplier[0] = ctx -> { + Object o = ctx.getParameterValues()[finalI]; + if (o != null) { + String topic = o.toString(); + if (StringUtils.isNotEmpty(topic)) { + return topic; + } + } + return prevTopicSupplier.get(ctx); + }; + } else if (argument.isAnnotationPresent(KafkaTimestamp.class)) { + timestampSupplier = ctx -> { + Object o = ctx.getParameterValues()[finalI]; + if (o instanceof Long) { + return (Long) o; + } + return null; + }; + } else if (argument.isAnnotationPresent(KafkaPartition.class)) { + partitionFromProducerFn = (ctx, producer) -> { + Object o = ctx.getParameterValues()[finalI]; + if (o != null && Integer.class.isAssignableFrom(o.getClass())) { + return (Integer) o; + } + return null; + }; + } else if (argument.isAnnotationPresent(KafkaPartitionKey.class)) { + partitionFromProducerFn = (ctx, producer) -> { + Object partitionKey = ctx.getParameterValues()[finalI]; + if (partitionKey != null) { + Serializer serializer = serdeRegistry.pickSerializer(argument); + if (serializer == null) { + serializer = new ByteArraySerializer(); + } + String topic = topicSupplier[0].get(ctx); + byte[] partitionKeyBytes = serializer.serialize(topic, partitionKey); + return Utils.toPositive(Utils.murmur2(partitionKeyBytes)) % producer.partitionsFor(topic).size(); + } + return null; + }; + } else if (argument.isAnnotationPresent(MessageHeader.class)) { + final AnnotationMetadata annotationMetadata = argument.getAnnotationMetadata(); + String name = annotationMetadata + .stringValue(MessageHeader.class, "name") + .orElseGet(() -> annotationMetadata.stringValue(MessageHeader.class).orElseGet(argument::getName)); + headersSuppliers.add(ctx -> { + Object headerValue = ctx.getParameterValues()[finalI]; + if (headerValue != null) { + Serializer serializer = serdeRegistry.pickSerializer(argument); + if (serializer != null) { + try { + return Collections.singleton(new RecordHeader(name, serializer.serialize(null, headerValue))); + } catch (Exception e) { + throw new MessagingClientException( + "Cannot serialize header argument [" + argument + "] for method [" + ctx + "]: " + e.getMessage(), e + ); + } + } + } + return Collections.emptySet(); + }); + } else { + if (argument.isContainerType() && Header.class.isAssignableFrom(argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT).getType())) { + headersSuppliers.add(ctx -> { + Collection
parameterHeaders = (Collection
) ctx.getParameterValues()[finalI]; + if (parameterHeaders != null) { + return parameterHeaders; + } + return Collections.emptySet(); + }); + } else { + Class argumentType = argument.getType(); + if (argumentType == Headers.class || argumentType == RecordHeaders.class) { + headersSuppliers.add(ctx -> { + Headers parameterHeaders = (Headers) ctx.getParameterValues()[finalI]; + if (parameterHeaders != null) { + return parameterHeaders; + } + return Collections.emptySet(); + }); + } + } + } + } + if (bodyArgument == null) { + for (int i = 0; i < arguments.length; i++) { + int finalI = i; + Argument argument = arguments[i]; + if (!argument.getAnnotationMetadata().hasStereotype(Bindable.class)) { + bodyArgument = argument.isAsyncOrReactive() ? argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT) : argument; + valueSupplier = ctx -> ctx.getParameterValues()[finalI]; + break; + } + } + if (bodyArgument == null) { + throw new MessagingClientException("No valid message body argument found for method: " + context); + } + } + AbstractKafkaProducerConfiguration configuration; - if (producerId != null) { - Optional namedConfig = beanContext.findBean(KafkaProducerConfiguration.class, Qualifiers.byName(producerId)); + if (clientId != null) { + Optional namedConfig = beanContext.findBean(KafkaProducerConfiguration.class, Qualifiers.byName(clientId)); if (namedConfig.isPresent()) { configuration = namedConfig.get(); } else { @@ -628,23 +635,26 @@ private Producer getProducer(Argument bodyArgument, @Nullable Argument keyArgume configuration = beanContext.getBean(AbstractKafkaProducerConfiguration.class); } - DefaultKafkaProducerConfiguration newConfiguration = new DefaultKafkaProducerConfiguration<>( - configuration - ); + DefaultKafkaProducerConfiguration newConfiguration = new DefaultKafkaProducerConfiguration<>(configuration); Properties newProperties = newConfiguration.getConfig(); + String transactionalId = context.stringValue(KafkaClient.class, "transactionalId").filter(StringUtils::isNotEmpty).orElse(null); + if (clientId != null) { newProperties.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, clientId); } + if (transactionalId != null) { + newProperties.putIfAbsent(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + } - metadata.getValue(KafkaClient.class, "maxBlock", Duration.class).ifPresent(maxBlock -> + context.getValue(KafkaClient.class, "maxBlock", Duration.class).ifPresent(maxBlock -> newProperties.put( ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(maxBlock.toMillis()) )); - Integer ack = metadata.intValue(KafkaClient.class, "acks").orElse(KafkaClient.Acknowledge.DEFAULT); + Integer ack = context.intValue(KafkaClient.class, "acks").orElse(KafkaClient.Acknowledge.DEFAULT); if (ack != KafkaClient.Acknowledge.DEFAULT) { String acksValue = ack == -1 ? "all" : String.valueOf(ack); @@ -654,9 +664,8 @@ private Producer getProducer(Argument bodyArgument, @Nullable Argument keyArgume ); } - metadata.findAnnotation(KafkaClient.class).map(ann -> - ann.getProperties("properties", "name") - ).ifPresent(newProperties::putAll); + context.findAnnotation(KafkaClient.class).map(ann -> ann.getProperties("properties", "name")) + .ifPresent(newProperties::putAll); LOG.debug("Creating new KafkaProducer."); @@ -674,34 +683,113 @@ private Producer getProducer(Argument bodyArgument, @Nullable Argument keyArgume } } + boolean isBatchSend = context.isTrue(KafkaClient.class, "batch"); + if (!newProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { Serializer valueSerializer = newConfiguration.getValueSerializer().orElse(null); if (valueSerializer == null) { - boolean batch = metadata.isTrue(KafkaClient.class, "batch"); - valueSerializer = serdeRegistry.pickSerializer(batch ? bodyArgument.getFirstTypeVariable().orElse(bodyArgument) : bodyArgument); + valueSerializer = serdeRegistry.pickSerializer(isBatchSend ? bodyArgument.getFirstTypeVariable().orElse(bodyArgument) : bodyArgument); LOG.debug("Using Kafka value serializer: {}", valueSerializer); newConfiguration.setValueSerializer((Serializer) valueSerializer); } } - return beanContext.createBean(Producer.class, newConfiguration); + Producer producer = beanContext.createBean(Producer.class, newConfiguration); + + boolean transactional = StringUtils.isNotEmpty(transactionalId); + timestampSupplier = context.isTrue(KafkaClient.class, "timestamp") ? ctx -> System.currentTimeMillis() : timestampSupplier; + Duration maxBlock = context.getValue(KafkaClient.class, "maxBlock", Duration.class).orElse(null); + + if (transactional) { + producer.initTransactions(); + } + ContextSupplier> headersSupplier = ctx -> { + if (headersSuppliers.isEmpty()) { + return null; + } + List
headerList = new ArrayList<>(headersSuppliers.size()); + for (ContextSupplier> supplier : headersSuppliers) { + for (Header header : supplier.get(ctx)) { + headerList.add(header); + } + } + if (headerList.isEmpty()) { + return null; + } + return headerList; + }; + BiFunction, Producer, Integer> finalPartitionFromProducerFn = partitionFromProducerFn; + ContextSupplier partitionSupplier = ctx -> finalPartitionFromProducerFn.apply(ctx, producer); + return new ProducerState(producer, keySupplier, topicSupplier[0], valueSupplier, timestampSupplier, partitionSupplier, headersSupplier, + transactional, transactionalId, maxBlock, isBatchSend, bodyArgument); }); } + private static final class ProducerState { + + private final Producer kafkaProducer; + private final ContextSupplier keySupplier; + private final ContextSupplier topicSupplier; + private final ContextSupplier valueSupplier; + private final ContextSupplier timestampSupplier; + private final ContextSupplier partitionSupplier; + private final ContextSupplier> headersSupplier; + private final boolean transactional; + private final String transactionalId; + @Nullable + private final Duration maxBlock; + private final boolean isBatchSend; + private final Argument bodyArgument; + + private ProducerState(Producer kafkaProducer, + ContextSupplier keySupplier, + ContextSupplier topicSupplier, + ContextSupplier valueSupplier, + ContextSupplier timestampSupplier, + ContextSupplier partitionSupplier, + ContextSupplier> headersSupplier, + boolean transactional, + @Nullable String transactionalId, + @Nullable Duration maxBlock, + boolean isBatchSend, + @Nullable Argument bodyArgument) { + this.kafkaProducer = kafkaProducer; + this.keySupplier = keySupplier; + this.topicSupplier = topicSupplier; + this.valueSupplier = valueSupplier; + this.timestampSupplier = timestampSupplier; + this.partitionSupplier = partitionSupplier; + this.headersSupplier = headersSupplier; + this.transactional = transactional; + this.transactionalId = transactionalId; + this.maxBlock = maxBlock; + this.isBatchSend = isBatchSend; + this.bodyArgument = bodyArgument; + } + } + + private interface ContextSupplier extends Function, T> { + + default T get(MethodInvocationContext ctx) { + return apply(ctx); + } + + } + /** * Key used to cache {@link org.apache.kafka.clients.producer.Producer} instances. */ private static final class ProducerKey { - final Class keyType; - final Class valueType; - final String id; - - ProducerKey(Class keyType, Class valueType, String id) { - this.keyType = keyType; - this.valueType = valueType; - this.id = id; + private final Object target; + private final ExecutableMethod method; + private final int hashCode; + + private ProducerKey(Object target, ExecutableMethod method) { + this.target = target; + this.method = method; + this.hashCode = Objects.hash(target, method); } @Override @@ -713,14 +801,12 @@ public boolean equals(Object o) { return false; } ProducerKey that = (ProducerKey) o; - return Objects.equals(keyType, that.keyType) && - Objects.equals(valueType, that.valueType) && - Objects.equals(id, that.id); + return Objects.equals(target, that.target) && Objects.equals(method, that.method); } @Override public int hashCode() { - return Objects.hash(keyType, valueType, id); + return hashCode; } } } diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index 0449ad86f..94e9bc2fb 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -18,7 +18,9 @@ import io.micronaut.configuration.kafka.ConsumerAware; import io.micronaut.configuration.kafka.ConsumerRegistry; import io.micronaut.configuration.kafka.KafkaAcknowledgement; +import io.micronaut.configuration.kafka.KafkaMessage; import io.micronaut.configuration.kafka.ProducerRegistry; +import io.micronaut.configuration.kafka.TransactionalProducerRegistry; import io.micronaut.configuration.kafka.annotation.ErrorStrategy; import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue; import io.micronaut.configuration.kafka.annotation.KafkaKey; @@ -48,6 +50,7 @@ import io.micronaut.core.bind.annotation.Bindable; import io.micronaut.core.naming.NameUtils; import io.micronaut.core.type.Argument; +import io.micronaut.core.type.ReturnType; import io.micronaut.core.util.ArgumentUtils; import io.micronaut.core.util.ArrayUtils; import io.micronaut.core.util.CollectionUtils; @@ -76,19 +79,23 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; import java.util.Collection; @@ -97,6 +104,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -135,22 +143,24 @@ public class KafkaConsumerProcessor private final KafkaListenerExceptionHandler exceptionHandler; private final TaskScheduler taskScheduler; private final ProducerRegistry producerRegistry; + private final TransactionalProducerRegistry transactionalProducerRegistry; private final BatchConsumerRecordsBinderRegistry batchBinderRegistry; private final AtomicInteger clientIdGenerator = new AtomicInteger(10); /** * Creates a new processor using the given {@link ExecutorService} to schedule consumers on. * - * @param executorService The executor service - * @param applicationConfiguration The application configuration - * @param beanContext The bean context - * @param defaultConsumerConfiguration The default consumer config - * @param binderRegistry The {@link ConsumerRecordBinderRegistry} - * @param batchBinderRegistry The {@link BatchConsumerRecordsBinderRegistry} - * @param serdeRegistry The {@link org.apache.kafka.common.serialization.Serde} registry - * @param producerRegistry The {@link ProducerRegistry} - * @param exceptionHandler The exception handler to use - * @param schedulerService The scheduler service + * @param executorService The executor service + * @param applicationConfiguration The application configuration + * @param beanContext The bean context + * @param defaultConsumerConfiguration The default consumer config + * @param binderRegistry The {@link ConsumerRecordBinderRegistry} + * @param batchBinderRegistry The {@link BatchConsumerRecordsBinderRegistry} + * @param serdeRegistry The {@link org.apache.kafka.common.serialization.Serde} registry + * @param producerRegistry The {@link ProducerRegistry} + * @param exceptionHandler The exception handler to use + * @param schedulerService The scheduler service + * @param transactionalProducerRegistry The transactional producer registry */ public KafkaConsumerProcessor( @Named(TaskExecutors.MESSAGE_CONSUMER) ExecutorService executorService, @@ -162,7 +172,8 @@ public KafkaConsumerProcessor( SerdeRegistry serdeRegistry, ProducerRegistry producerRegistry, KafkaListenerExceptionHandler exceptionHandler, - @Named(TaskExecutors.SCHEDULED) ExecutorService schedulerService) { + @Named(TaskExecutors.SCHEDULED) ExecutorService schedulerService, + TransactionalProducerRegistry transactionalProducerRegistry) { this.executorService = executorService; this.applicationConfiguration = applicationConfiguration; this.beanContext = beanContext; @@ -174,6 +185,7 @@ public KafkaConsumerProcessor( this.producerRegistry = producerRegistry; this.exceptionHandler = exceptionHandler; this.taskScheduler = new ScheduledExecutorTaskScheduler(schedulerService); + this.transactionalProducerRegistry = transactionalProducerRegistry; this.beanContext.getBeanDefinitions(Qualifiers.byType(KafkaListener.class)) .forEach(definition -> { // pre-initialize singletons before processing @@ -279,9 +291,12 @@ public void process(BeanDefinition beanDefinition, ExecutableMethod met return; // No topics to consume } final Class beanType = beanDefinition.getBeanType(); - final String groupId = consumerAnnotation.stringValue("groupId") + String groupId = consumerAnnotation.stringValue("groupId") .filter(StringUtils::isNotEmpty) .orElseGet(() -> applicationConfiguration.getName().orElse(beanType.getName())); + if (consumerAnnotation.isTrue("uniqueGroupId")) { + groupId = groupId + "_" + UUID.randomUUID(); + } final String clientId = consumerAnnotation.stringValue("clientId") .filter(StringUtils::isNotEmpty) .orElseGet(() -> applicationConfiguration.getName().map(s -> s + '-' + NameUtils.hyphenate(beanType.getSimpleName())).orElse(null)); @@ -290,9 +305,9 @@ public void process(BeanDefinition beanDefinition, ExecutableMethod met final AbstractKafkaConsumerConfiguration consumerConfigurationDefaults = beanContext.findBean(AbstractKafkaConsumerConfiguration.class, Qualifiers.byName(groupId)) .orElse(defaultConsumerConfiguration); final DefaultKafkaConsumerConfiguration consumerConfiguration = new DefaultKafkaConsumerConfiguration<>(consumerConfigurationDefaults); - final Properties properties = createConsumerProperties(method, consumerAnnotation, consumerConfiguration, clientId, groupId, offsetStrategy); + final Properties properties = createConsumerProperties(consumerAnnotation, consumerConfiguration, clientId, groupId, offsetStrategy); configureDeserializers(method, consumerConfiguration); - submitConsumerThreads(method, clientId, offsetStrategy, topicAnnotations, consumerAnnotation, consumerConfiguration, properties, beanType); + submitConsumerThreads(method, clientId, groupId, offsetStrategy, topicAnnotations, consumerAnnotation, consumerConfiguration, properties, beanType); } @Override @@ -304,9 +319,11 @@ public void close() { consumers.clear(); } - private Properties createConsumerProperties(final ExecutableMethod method, final AnnotationValue consumerAnnotation, - final DefaultKafkaConsumerConfiguration consumerConfiguration, final String clientId, - final String groupId, final OffsetStrategy offsetStrategy) { + private Properties createConsumerProperties(final AnnotationValue consumerAnnotation, + final DefaultKafkaConsumerConfiguration consumerConfiguration, + final String clientId, + final String groupId, + final OffsetStrategy offsetStrategy) { final Properties properties = consumerConfiguration.getConfig(); if (consumerAnnotation.getRequiredValue("offsetReset", OffsetReset.class) == OffsetReset.EARLIEST) { @@ -316,22 +333,20 @@ private Properties createConsumerProperties(final ExecutableMethod method, // enable auto commit offsets if necessary properties.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(offsetStrategy == OffsetStrategy.AUTO)); - method.getValue(KafkaListener.class, "heartbeatInterval", Duration.class) + consumerAnnotation.get("heartbeatInterval", Duration.class) .map(Duration::toMillis) .map(String::valueOf) .ifPresent(heartbeatInterval -> properties.putIfAbsent(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval)); - method.getValue(KafkaListener.class, "sessionTimeout", Duration.class) + consumerAnnotation.get("sessionTimeout", Duration.class) .map(Duration::toMillis) .map(String::valueOf) .ifPresent(sessionTimeout -> properties.putIfAbsent(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout)); - if (consumerAnnotation.isTrue("uniqueGroupId")) { - final String uniqueGroupId = groupId + "_" + UUID.randomUUID().toString(); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, uniqueGroupId); - } else { - properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - } + consumerAnnotation.enumValue("isolation", IsolationLevel.class) + .ifPresent(isolation -> properties.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolation.toString().toLowerCase(Locale.ROOT))); + + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); if (clientId != null) { properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); @@ -360,9 +375,15 @@ private void debugDeserializationConfiguration(final ExecutableMethod meth } } - private void submitConsumerThreads(final ExecutableMethod method, final String clientId, final OffsetStrategy offsetStrategy, - final List> topicAnnotations, final AnnotationValue consumerAnnotation, - final DefaultKafkaConsumerConfiguration consumerConfiguration, final Properties properties, final Class beanType) { + private void submitConsumerThreads(final ExecutableMethod method, + final String clientId, + final String groupId, + final OffsetStrategy offsetStrategy, + final List> topicAnnotations, + final AnnotationValue consumerAnnotation, + final DefaultKafkaConsumerConfiguration consumerConfiguration, + final Properties properties, + final Class beanType) { final int consumerThreads = consumerAnnotation.intValue("threads").orElse(1); for (int i = 0; i < consumerThreads; i++) { final String finalClientId; @@ -376,12 +397,13 @@ private void submitConsumerThreads(final ExecutableMethod method, final St } else { finalClientId = "kafka-consumer-" + clientIdGenerator.incrementAndGet(); } - submitConsumerThread(method, finalClientId, offsetStrategy, topicAnnotations, consumerAnnotation, consumerConfiguration, beanType); + submitConsumerThread(method, finalClientId, groupId, offsetStrategy, topicAnnotations, consumerAnnotation, consumerConfiguration, beanType); } } private void submitConsumerThread(final ExecutableMethod method, - final String finalClientId, + final String clientId, + final String groupId, final OffsetStrategy offsetStrategy, final List> topicAnnotations, final AnnotationValue consumerAnnotation, @@ -394,14 +416,13 @@ private void submitConsumerThread(final ExecutableMethod method, ((ConsumerAware) consumerBean).setKafkaConsumer(kafkaConsumer); } setupConsumerSubscription(method, topicAnnotations, consumerBean, kafkaConsumer); - ConsumerState consumerState = new ConsumerState(finalClientId, kafkaConsumer, consumerBean, Collections.unmodifiableSet(kafkaConsumer.subscription()), consumerAnnotation); - consumers.put(finalClientId, consumerState); - executorService.submit(() -> createConsumerThreadPollLoop(method, consumerState, offsetStrategy)); + ConsumerState consumerState = new ConsumerState(clientId, groupId, offsetStrategy, kafkaConsumer, consumerBean, Collections.unmodifiableSet(kafkaConsumer.subscription()), consumerAnnotation, method); + consumers.put(clientId, consumerState); + executorService.submit(() -> createConsumerThreadPollLoop(method, consumerState)); } private void createConsumerThreadPollLoop(final ExecutableMethod method, - final ConsumerState consumerState, - final OffsetStrategy offsetStrategy) { + final ConsumerState consumerState) { final boolean isBatch = method.isTrue(KafkaListener.class, "batch"); final Duration pollTimeout = method.getValue(KafkaListener.class, "pollTimeout", Duration.class) @@ -415,7 +436,7 @@ private void createConsumerThreadPollLoop(final ExecutableMethod method, try (Consumer kafkaConsumer = consumerState.kafkaConsumer) { - final boolean trackPartitions = ackArg.isPresent() || offsetStrategy == OffsetStrategy.SYNC_PER_RECORD || offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD; + final boolean trackPartitions = ackArg.isPresent() || consumerState.offsetStrategy == OffsetStrategy.SYNC_PER_RECORD || consumerState.offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD; final Map, Object> boundArguments = new HashMap<>(2); consumerArg.ifPresent(argument -> boundArguments.put(argument, kafkaConsumer)); @@ -436,23 +457,23 @@ private void createConsumerThreadPollLoop(final ExecutableMethod method, if (isBatch) { failed = !processConsumerRecordsAsBatch(consumerState, method, boundArguments, consumerRecords); } else { - failed = !processConsumerRecords(consumerState, method, boundArguments, trackPartitions, ackArg, consumerRecords, offsetStrategy); + failed = !processConsumerRecords(consumerState, method, boundArguments, trackPartitions, ackArg, consumerRecords); } if (!failed) { - if (offsetStrategy == OffsetStrategy.SYNC) { + if (consumerState.offsetStrategy == OffsetStrategy.SYNC) { try { kafkaConsumer.commitSync(); } catch (CommitFailedException e) { handleException(consumerState, null, e); } - } else if (offsetStrategy == OffsetStrategy.ASYNC) { + } else if (consumerState.offsetStrategy == OffsetStrategy.ASYNC) { kafkaConsumer.commitAsync(resolveCommitCallback(consumerState.consumerBean)); } } } catch (WakeupException e) { try { - if (!failed && offsetStrategy != OffsetStrategy.DISABLED) { + if (!failed && consumerState.offsetStrategy != OffsetStrategy.DISABLED) { kafkaConsumer.commitSync(); } } catch (Throwable ex) { @@ -473,8 +494,7 @@ private boolean processConsumerRecords(final ConsumerState consumerState, final Map, Object> boundArguments, final boolean trackPartitions, final Optional> ackArg, - final ConsumerRecords consumerRecords, - final OffsetStrategy offsetStrategy) { + final ConsumerRecords consumerRecords) { final ExecutableBinder> executableBinder = new DefaultExecutableBinder<>(boundArguments); final Map currentOffsets = trackPartitions ? new HashMap<>() : null; @@ -504,7 +524,7 @@ private boolean processConsumerRecords(final ConsumerState consumerState, resultFlowable = Flux.just(result); isBlocking = true; } - handleResultFlux(consumerState, method, consumerRecord, resultFlowable, isBlocking); + handleResultFlux(consumerState, method, consumerRecord, resultFlowable, isBlocking, consumerRecords); } } catch (Throwable e) { if (resolveWithErrorStrategy(consumerState, consumerRecord, e)) { @@ -512,13 +532,13 @@ private boolean processConsumerRecords(final ConsumerState consumerState, } } - if (offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) { + if (consumerState.offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) { try { kafkaConsumer.commitSync(currentOffsets); } catch (CommitFailedException e) { handleException(consumerState, consumerRecord, e); } - } else if (offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) { + } else if (consumerState.offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) { kafkaConsumer.commitAsync(currentOffsets, resolveCommitCallback(consumerState.consumerBean)); } } @@ -549,9 +569,9 @@ private boolean resolveWithErrorStrategy(ConsumerState consumerState, // in the stop on error strategy, pause the consumer and resume after the retryDelay duration consumerState.pause(); taskScheduler.schedule(retryDelay, (Runnable) consumerState::resume); - // skip handle exception - return false; } + // skip handle exception + return true; } else { // Skip the failing record currentErrorStrategy = ErrorStrategyValue.RESUME_AT_NEXT_RECORD; @@ -593,14 +613,14 @@ private boolean processConsumerRecordsAsBatch(final ConsumerState consumerState, for (Object object : objects) { if (iterator.hasNext()) { final ConsumerRecord consumerRecord = iterator.next(); - handleResultFlux(consumerState, method, consumerRecord, Flux.just(object), isBlocking); + handleResultFlux(consumerState, method, consumerRecord, Flux.just(object), isBlocking, consumerRecords); } } } else { resultFlux.subscribe(o -> { if (iterator.hasNext()) { final ConsumerRecord consumerRecord = iterator.next(); - handleResultFlux(consumerState, method, consumerRecord, Flux.just(o), isBlocking); + handleResultFlux(consumerState, method, consumerRecord, Flux.just(o), isBlocking, consumerRecords); } }); } @@ -669,105 +689,187 @@ private void handleResultFlux(ConsumerState consumerState, ExecutableMethod method, ConsumerRecord consumerRecord, Flux resultFlowable, - boolean isBlocking) { - Flux recordMetadataProducer = resultFlowable.subscribeOn(executorScheduler) - .flatMap((Function>) o -> { - String[] destinationTopics = method.stringValues(SendTo.class); - if (ArrayUtils.isNotEmpty(destinationTopics)) { - Object key = consumerRecord.key(); - Object value = o; + boolean isBlocking, + ConsumerRecords consumerRecords) { + Flux recordMetadataProducer = resultFlowable + .flatMap((Function>) value -> { + if (consumerState.sendToDestinationTopics != null) { + Object key = consumerRecord.key(); if (value != null) { - Producer kafkaProducer = producerRegistry.getProducer( - consumerState.groupId, - Argument.of((Class) (key != null ? key.getClass() : byte[].class)), - Argument.of(value.getClass()) - ); - + Producer kafkaProducer; + if (consumerState.useSendOffsetsToTransaction) { + kafkaProducer = transactionalProducerRegistry.getTransactionalProducer( + consumerState.producerClientId, + consumerState.producerTransactionalId, + Argument.of(byte[].class), + Argument.of(Object.class) + ); + } else { + kafkaProducer = producerRegistry.getProducer( + consumerState.producerClientId == null ? consumerState.groupId : consumerState.producerClientId, + Argument.of((Class) (key != null ? key.getClass() : byte[].class)), + Argument.of(value.getClass()) + ); + } return Flux.create(emitter -> { - for (String destinationTopic : destinationTopics) { - ProducerRecord record = new ProducerRecord( - destinationTopic, - null, - key, - value, - consumerRecord.headers() - ); - - kafkaProducer.send(record, (metadata, exception) -> { - if (exception != null) { - emitter.error(exception); + try { + if (consumerState.useSendOffsetsToTransaction) { + try { + LOG.trace("Beginning transaction for producer: {}", consumerState.producerTransactionalId); + kafkaProducer.beginTransaction(); + } catch (ProducerFencedException e) { + handleProducerFencedException(kafkaProducer, e); + } + } + for (String destinationTopic : consumerState.sendToDestinationTopics) { + if (consumerState.isMessagesIterableReturnType) { + Iterable messages = (Iterable) value; + for (KafkaMessage message : messages) { + ProducerRecord record = createFromMessage(destinationTopic, message); + kafkaProducer.send(record, (metadata, exception) -> { + if (exception != null) { + emitter.error(exception); + } else { + emitter.next(metadata); + } + }); + } } else { - emitter.next(metadata); + ProducerRecord record; + if (consumerState.isMessageReturnType) { + record = createFromMessage(destinationTopic, (KafkaMessage) value); + } else { + record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.headers()); + } + LOG.trace("Sending record: {} for producer: {} {}", record, kafkaProducer, consumerState.producerTransactionalId); + kafkaProducer.send(record, (metadata, exception) -> { + if (exception != null) { + emitter.error(exception); + } else { + emitter.next(metadata); + } + }); } - }); - + } + if (consumerState.useSendOffsetsToTransaction) { + Map offsetsToCommit = new HashMap<>(); + for (TopicPartition partition : consumerRecords.partitions()) { + List> partitionedRecords = consumerRecords.records(partition); + long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); + offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); + } + try { + LOG.trace("Sending offsets: {} to transaction for producer: {} and customer group id: {}", offsetsToCommit, consumerState.producerTransactionalId, consumerState.groupId); + kafkaProducer.sendOffsetsToTransaction(offsetsToCommit, consumerState.groupId); + LOG.trace("Committing transaction for producer: {}", consumerState.producerTransactionalId); + kafkaProducer.commitTransaction(); + LOG.trace("Committed transaction for producer: {}", consumerState.producerTransactionalId); + } catch (ProducerFencedException e) { + handleProducerFencedException(kafkaProducer, e); + } + } + emitter.complete(); + } catch (Exception e) { + if (consumerState.useSendOffsetsToTransaction) { + try { + LOG.trace("Aborting transaction for producer: {} because of error: {}", consumerState.producerTransactionalId, e.getMessage()); + kafkaProducer.abortTransaction(); + } catch (ProducerFencedException ex) { + handleProducerFencedException(kafkaProducer, ex); + } + } + emitter.error(e); } - emitter.complete(); - }, FluxSink.OverflowStrategy.ERROR); + }); } return Flux.empty(); } return Flux.empty(); - }).onErrorResume((Function>) throwable -> { - handleException(consumerState, new KafkaListenerException( - "Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), - throwable, - consumerState.consumerBean, - consumerState.kafkaConsumer, - consumerRecord - )); - - if (consumerState.redelivery) { - LOG.debug("Attempting redelivery of record [{}] following error", consumerRecord); + }); - Object key = consumerRecord.key(); - Object value = consumerRecord.value(); + recordMetadataProducer = recordMetadataProducer.onErrorResume((Function>) throwable -> { + handleException(consumerState, new KafkaListenerException( + "Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), + throwable, + consumerState.consumerBean, + consumerState.kafkaConsumer, + consumerRecord + )); + + if (consumerState.redelivery) { + LOG.debug("Attempting redelivery of record [{}] following error", consumerRecord); + + Object key = consumerRecord.key(); + Object value = consumerRecord.value(); + + if (key != null && value != null) { + Producer kafkaProducer = producerRegistry.getProducer( + consumerState.producerClientId == null ? consumerState.groupId : consumerState.producerClientId, + Argument.of(key.getClass()), + Argument.of(value.getClass()) + ); + + ProducerRecord record = new ProducerRecord( + consumerRecord.topic(), + consumerRecord.partition(), + key, + value, + consumerRecord.headers() + ); + + return producerSend(consumerState, kafkaProducer, record).doOnError(ex -> { + handleException(consumerState, new KafkaListenerException( + "Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), + throwable, + consumerState.consumerBean, + consumerState.kafkaConsumer, + consumerRecord + )); + }); + } + } + return Flux.empty(); + }); - if (key != null && value != null) { - Producer kafkaProducer = producerRegistry.getProducer( - consumerState.groupId, - Argument.of(key.getClass()), - Argument.of(value.getClass()) - ); + if (isBlocking) { + List listRecords = recordMetadataProducer.collectList().block(); + LOG.trace("Method [{}] produced record metadata: {}", method, listRecords); + } else { + recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", method, recordMetadata)); + } + } - ProducerRecord record = new ProducerRecord( - consumerRecord.topic(), - consumerRecord.partition(), - key, - value, - consumerRecord.headers() - ); + private Mono producerSend(ConsumerState consumerState, Producer producer, ProducerRecord record) { + LOG.trace("Sending record: {} for producer: {} {}", record, producer, consumerState.producerTransactionalId); + return Mono.create(emitter -> producer.send(record, (metadata, exception) -> { + if (exception != null) { + emitter.error(exception); + } else { + emitter.success(metadata); + } + })); + } - return Flux.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> { - if (exception != null) { - handleException(consumerState, new KafkaListenerException( - "Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), - throwable, - consumerState.consumerBean, - consumerState.kafkaConsumer, - consumerRecord - )); + private ProducerRecord createFromMessage(String topic, KafkaMessage message) { + return new ProducerRecord( + message.getTopic() == null ? topic : message.getTopic(), + message.getPartition(), + message.getTimestamp(), + message.getKey(), + message.getBody(), + convertHeaders(message)); + } - emitter.complete(); - } else { - emitter.next(metadata); - emitter.complete(); - } - }), FluxSink.OverflowStrategy.ERROR); - } - } - return Flux.empty(); - }); + private List convertHeaders(KafkaMessage message) { + return message.getHeaders() == null ? null : message.getHeaders().entrySet() + .stream() + .map(e -> new RecordHeader(e.getKey(), e.getValue().toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList()); + } - if (isBlocking) { - RecordMetadata recordMetadata = recordMetadataProducer.blockFirst(); - LOG.trace("Method [{}] produced record metadata: {}", method, recordMetadata); - } else { - recordMetadataProducer.subscribe(recordMetadata -> { - LOG.trace("Method [{}] produced record metadata: {}", method, recordMetadata); - }); - } + private void handleProducerFencedException(Producer producer, ProducerFencedException e) { + LOG.error("Failed accessing the producer: " + producer, e); + transactionalProducerRegistry.close(producer); } private static Argument findBodyArgument(ExecutableMethod method) { @@ -855,6 +957,7 @@ private static final class ConsumerState { final Consumer kafkaConsumer; final Object consumerBean; final Set subscriptions; + Set assignments; Set _pausedTopicPartitions; Set _pauseRequests; @@ -863,6 +966,8 @@ private static final class ConsumerState { final String groupId; final boolean redelivery; + final OffsetStrategy offsetStrategy; + final ErrorStrategyValue errorStrategy; @Nullable final Duration errorStrategyRetryDelay; @@ -872,14 +977,25 @@ private static final class ConsumerState { int currentRetryCount; boolean autoPaused; + final String producerClientId; + final String producerTransactionalId; + final boolean transactional; - private ConsumerState(String clientId, Consumer consumer, Object consumerBean, Set subscriptions, AnnotationValue kafkaListener) { + @Nullable + final String[] sendToDestinationTopics; + final boolean useSendOffsetsToTransaction; + final boolean isMessageReturnType; + final boolean isMessagesIterableReturnType; + + private ConsumerState(String clientId, String groupId, OffsetStrategy offsetStrategy, Consumer consumer, Object consumerBean, Set subscriptions, + AnnotationValue kafkaListener, ExecutableMethod method) { this.clientId = clientId; + this.groupId = groupId; this.kafkaConsumer = consumer; this.consumerBean = consumerBean; this.subscriptions = subscriptions; + this.offsetStrategy = offsetStrategy; - groupId = kafkaListener.stringValue("groupId").filter(StringUtils::isNotEmpty).orElse(null); redelivery = kafkaListener.isTrue("redelivery"); AnnotationValue errorStrategyAnnotation = kafkaListener.getAnnotation("errorStrategy", ErrorStrategy.class).orElse(null); @@ -898,6 +1014,30 @@ private ConsumerState(String clientId, Consumer consumer, Object consumerB } autoPaused = !kafkaListener.booleanValue("autoStartup").orElse(true); + producerClientId = kafkaListener.stringValue("producerClientId").orElse(null); + producerTransactionalId = kafkaListener.stringValue("producerTransactionalId").orElse(null); + transactional = StringUtils.isNotEmpty(producerTransactionalId); + + String[] destinationTopics = method.stringValues(SendTo.class); + sendToDestinationTopics = ArrayUtils.isNotEmpty(destinationTopics) ? destinationTopics : null; + + if (offsetStrategy == OffsetStrategy.SEND_TO_TRANSACTION) { + if (transactional && method.hasAnnotation(SendTo.class)) { + useSendOffsetsToTransaction = true; + } else { + throw new MessagingSystemException("Offset strategy 'SEND_TO_TRANSACTION' can only be used when transaction is enabled and @SendTo is used"); + } + } else { + useSendOffsetsToTransaction = false; + } + + if (useSendOffsetsToTransaction && redelivery) { + throw new MessagingSystemException("Redelivery not supported for transactions in combination with @SendTo"); + } + ReturnType returnType = method.getReturnType(); + isMessageReturnType = returnType.getType().isAssignableFrom(KafkaMessage.class) + || returnType.isAsyncOrReactive() && returnType.getFirstTypeVariable().map(t -> t.getType().isAssignableFrom(KafkaMessage.class)).orElse(false); + isMessagesIterableReturnType = Iterable.class.isAssignableFrom(returnType.getType()) && returnType.getFirstTypeVariable().map(t -> t.getType().isAssignableFrom(KafkaMessage.class)).orElse(false); } void pause() { diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaTxSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaTxSpec.groovy new file mode 100644 index 000000000..24c76bf3a --- /dev/null +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/KafkaTxSpec.groovy @@ -0,0 +1,215 @@ +package io.micronaut.configuration.kafka + +import io.micronaut.configuration.kafka.annotation.ErrorStrategy +import io.micronaut.configuration.kafka.annotation.ErrorStrategyValue +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.OffsetStrategy +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.core.type.Argument +import io.micronaut.messaging.annotation.SendTo +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.IsolationLevel + +import java.util.stream.Collectors +import java.util.stream.Stream + +class KafkaTxSpec extends AbstractKafkaContainerSpec { + + protected Map getConfiguration() { + super.configuration + [ + 'kafka.producers.tx-word-counter.key.serializer' : 'org.apache.kafka.common.serialization.StringSerializer', + 'kafka.producers.tx-word-counter.value.serializer': 'org.apache.kafka.common.serialization.IntegerSerializer' + ] + } + + def "should count words in transaction"() { + given: + StringProducer stringProducer = context.getBean(StringProducer) + WordCountCollector wordCountCollector = context.getBean(WordCountCollector) + when: + stringProducer.send("The quick brown fox jumps over the lazy dog. THE QUICK BROWN FOX JUMPED OVER THE LAZY DOG'S BACK") + then: + conditions.eventually { + wordCountCollector.counter['quick'] == 1 + wordCountCollector.counter['THE'] == 2 + wordCountCollector.counter.size() == 18 + } + } + + def "should not receive messages when tx is rollbacked"() { + given: + TransactionalProducerRegistry transactionalProducerRegistry = context.getBean(TransactionalProducerRegistry) + def numbersProducer = transactionalProducerRegistry.getTransactionalProducer(null, "tx-nid-producer", Argument.of(String), Argument.of(Integer)) + CommittedNumbersCollector committedNumbersCollector = context.getBean(CommittedNumbersCollector) + UncommittedNumbersCollector uncommittedNumbersCollector = context.getBean(UncommittedNumbersCollector) + when: + numbersProducer.beginTransaction() + (1..20).forEach(it -> { + numbersProducer.send(new ProducerRecord("tx-numbers", it)) + }) + Thread.sleep(5000) + numbersProducer.abortTransaction() + then: + conditions.eventually { + committedNumbersCollector.numbers.size() == 0 + uncommittedNumbersCollector.numbers.size() >= 20 + } + } + + def "should not receive same numbers"() { + given: + TransactionalProducerRegistry transactionalProducerRegistry = context.getBean(TransactionalProducerRegistry) + def numbersProducer = transactionalProducerRegistry.getTransactionalProducer("tx-nid-producer", "tx-nid-producer", Argument.of(String), Argument.of(Integer)) + IdempotenceNumbersCollector idempotenceNumbersCollector = context.getBean(IdempotenceNumbersCollector) + IdempotenceNumbersNoDelayCollector idempotenceNumbersNoDelayCollector = context.getBean(IdempotenceNumbersNoDelayCollector) + when: + idempotenceNumbersCollector.numbers.clear() + idempotenceNumbersCollector.failed.clear() + numbersProducer.beginTransaction() + (1..30).forEach(it -> { + numbersProducer.send(new ProducerRecord("tx-idempotence-numbers", it.toString(), it)) + }) + numbersProducer.commitTransaction() + then: + conditions.eventually { + idempotenceNumbersCollector.numbers.size() == 30 + idempotenceNumbersNoDelayCollector.numbers.size() == 30 + } + } + + def "should not receive same numbers2"() { + given: + def numbersProducer = context.getBean(IdempotenceNumbersProducer) + IdempotenceNumbersCollector idempotenceNumbersCollector = context.getBean(IdempotenceNumbersCollector) + when: + idempotenceNumbersCollector.numbers.clear() + idempotenceNumbersCollector.failed.clear() + (1..30).forEach(it -> { + numbersProducer.send(it.toString(), it) + }) + then: + conditions.eventually { + idempotenceNumbersCollector.numbers.size() == 30 + } + } + + @Requires(property = 'spec.name', value = 'KafkaTxSpec') + @KafkaListener(isolation = IsolationLevel.READ_COMMITTED, + offsetReset = OffsetReset.EARLIEST, + offsetStrategy = OffsetStrategy.SYNC, + errorStrategy = @ErrorStrategy(value = ErrorStrategyValue.RETRY_ON_ERROR)) + static class IdempotenceNumbersCollector { + + List numbers = new ArrayList<>() + Set failed = new HashSet<>() + + @Topic("tx-idempotence-numbers") + void collect(int number) { + if (number % 2 == 0) { + if (failed.add(number)) { + throw new IllegalAccessException() + } + } + numbers.add(number) + } + } + + @Requires(property = 'spec.name', value = 'KafkaTxSpec') + @KafkaListener(isolation = IsolationLevel.READ_COMMITTED, + offsetReset = OffsetReset.EARLIEST, + offsetStrategy = OffsetStrategy.SYNC, + errorStrategy = @ErrorStrategy(value = ErrorStrategyValue.RETRY_ON_ERROR, retryDelay = "")) + static class IdempotenceNumbersNoDelayCollector { + + List numbers = new ArrayList<>() + Set failed = new HashSet<>() + + @Topic("tx-idempotence-numbers") + void collect(int number) { + if (number % 2 == 0) { + if (failed.add(number)) { + throw new IllegalAccessException() + } + } + numbers.add(number) + } + } + + @Requires(property = 'spec.name', value = 'KafkaTxSpec') + @KafkaClient(transactionalId = "tx-idempotence-number-producer") + static interface IdempotenceNumbersProducer { + + @Topic("tx-idempotence-numbers") + void send(@KafkaKey String id, int number); + } + + @Requires(property = 'spec.name', value = 'KafkaTxSpec') + @KafkaListener(isolation = IsolationLevel.READ_COMMITTED) + static class CommittedNumbersCollector { + + List numbers = new ArrayList<>() + + @Topic("tx-numbers") + void collect(int number) { + numbers.add(number) + } + } + + @Requires(property = 'spec.name', value = 'KafkaTxSpec') + @KafkaListener(isolation = IsolationLevel.READ_UNCOMMITTED) + static class UncommittedNumbersCollector { + + List numbers = new ArrayList<>() + + @Topic("tx-numbers") + void collect(int number) { + numbers.add(number) + } + } + + @Requires(property = 'spec.name', value = 'KafkaTxSpec') + @KafkaListener(isolation = IsolationLevel.READ_COMMITTED) + static class WordCountCollector { + + Map counter = new HashMap<>() + + @Topic("tx-words-count") + void count(@KafkaKey String word, int count) { + counter.put(word, count) + } + } + + @Requires(property = 'spec.name', value = 'KafkaTxSpec') + @KafkaListener(producerClientId = "tx-word-counter", + producerTransactionalId = "tx-word-counter", + offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, isolation = IsolationLevel.READ_COMMITTED) + static class WordCounter { + + @Topic("tx-strings") + @SendTo("tx-words-count") + List wordsCounter(String string) { + Map wordsCount = Stream.of(string.split(" ")) + .map(word -> new AbstractMap.SimpleEntry<>(word, 1)) + .collect(Collectors.toMap((Map.Entry e) -> e.key, (Map.Entry e) -> e.value, (v1, v2) -> v1 + v2)) + List messages = new ArrayList<>() + for (Map.Entry e : wordsCount) { + messages.add(KafkaMessage.Builder.withBody(e.getValue()).key(e.getKey()).build()) + } + return messages + } + + } + + @Requires(property = 'spec.name', value = 'KafkaTxSpec') + @KafkaClient(transactionalId = "tx-string-producer") + static interface StringProducer { + + @Topic("tx-strings") + void send(String strings); + } + +} \ No newline at end of file diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/sendto/WordCounter.java b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/sendto/WordCounter.java new file mode 100644 index 000000000..5b9cf639f --- /dev/null +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/sendto/WordCounter.java @@ -0,0 +1,39 @@ +package io.micronaut.configuration.kafka.docs.consumer.sendto; + +import io.micronaut.configuration.kafka.KafkaMessage; +import io.micronaut.configuration.kafka.annotation.KafkaListener; +import io.micronaut.configuration.kafka.annotation.OffsetStrategy; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.messaging.annotation.SendTo; +import org.apache.kafka.common.IsolationLevel; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +// tag::transactional[] +@KafkaListener(producerClientId = "word-counter-producer", // <1> + producerTransactionalId = "tx-word-counter-id", // <2> + offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // <3> + isolation = IsolationLevel.READ_COMMITTED // <4> +) +public class WordCounter { + + @Topic("tx-incoming-strings") + @SendTo("my-words-count") + List wordsCounter(String string) { + Map wordsCount = Stream.of(string.split(" ")) + .map(word -> new AbstractMap.SimpleEntry<>(word, 1)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Integer::sum)); + List messages = new ArrayList<>(); + for (Map.Entry e : wordsCount.entrySet()) { + messages.add(KafkaMessage.Builder.withBody(e.getValue()).key(e.getKey()).build()); + } + return messages; + } + +} +// end::transactional[] diff --git a/src/main/docs/guide/kafkaClient.adoc b/src/main/docs/guide/kafkaClient.adoc index 5d7945fe6..e69de29bb 100644 --- a/src/main/docs/guide/kafkaClient.adoc +++ b/src/main/docs/guide/kafkaClient.adoc @@ -1,4 +0,0 @@ -The example in the quick start presented a trivial definition of an interface that will be implemented automatically for you using the ann:configuration.kafka.annotation.KafkaClient[] annotation. - -The implementation that powers `@KafkaClient` (defined by the api:configuration.kafka.intercept.KafkaClientIntroductionAdvice[] class) is, however, very flexible and offers a range of options for defining Kafka clients. - diff --git a/src/main/docs/guide/kafkaClient/kafkaClientTx.adoc b/src/main/docs/guide/kafkaClient/kafkaClientTx.adoc new file mode 100644 index 000000000..5d6e1c024 --- /dev/null +++ b/src/main/docs/guide/kafkaClient/kafkaClientTx.adoc @@ -0,0 +1,14 @@ +Transaction processing can be enabled by defining `transactionalId` on `@KafkaClient`, which will initialize the producer for transactional usage and wrap any send operation with a transaction demarcation. + +.Alternative Kafka producer transactional code +[source,java] +---- +producer.initTransactions(); +try { + producer.beginTransaction(); + producer.send(...); + producer.commitTransaction(); +} catch (Exception e) { + producer.abortTransaction(); +} +---- \ No newline at end of file diff --git a/src/main/docs/guide/kafkaListener/kafkaListenerConfiguration.adoc b/src/main/docs/guide/kafkaListener/kafkaListenerConfiguration.adoc index 23f84301e..8df33279c 100644 --- a/src/main/docs/guide/kafkaListener/kafkaListenerConfiguration.adoc +++ b/src/main/docs/guide/kafkaListener/kafkaListenerConfiguration.adoc @@ -96,3 +96,15 @@ kafka: ---- You may want to do this if for example you choose an alternative deserialization format such as Avro or Protobuf. + +=== Transactional properties + +There are a few options that can be enabled for only in the transactional processing: + +==== Isolation + +Use `isolation` member to define if you want to receive messages that haven't been committed yet. + +==== Custom offset strategy + +There is a special offset strategy `OffsetStrategy.SEND_TO_TRANSACTION` that can only be used with an associated producer, only applicable when `SendTo` is used. \ No newline at end of file diff --git a/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc b/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc index e5ccd4d4e..4c07eace8 100644 --- a/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc +++ b/src/main/docs/guide/kafkaListener/kafkaOffsets.adoc @@ -26,6 +26,9 @@ The following table summarizes the enum values and behaviour: |api:configuration.kafka.annotation.OffsetStrategy#ASYNC_PER_RECORD[] |Commits offsets asynchronously after each `ConsumerRecord` is processed. Sets `enable.auto.commit` to `false` +|api:configuration.kafka.annotation.OffsetStrategy#SEND_TO_TRANSACTION[] +|Only available when the transactional producer is enabled for `@SendTo`. Sends offsets to transaction using method `sendOffsetsToTransaction` of `org.apache.kafka.clients.producer.Producer`. + |=== Depending on the your level of paranoia or durability requirements you can choose to tune how and when offsets are committed. diff --git a/src/main/docs/guide/kafkaListener/kafkaSendTo.adoc b/src/main/docs/guide/kafkaListener/kafkaSendTo.adoc index c3a200187..df7f7a04b 100644 --- a/src/main/docs/guide/kafkaListener/kafkaSendTo.adoc +++ b/src/main/docs/guide/kafkaListener/kafkaSendTo.adoc @@ -26,4 +26,17 @@ include::{testskafka}/consumer/sendto/ProductListener.java[tags=reactive, indent <2> The topic to send the result to is `product-quantities` <3> The return is mapped from the single to the value of the quantity -In the reactive case the `poll` loop will continue and will not wait for the record to be sent unless you specifically annotate the method with ann:core.annotation.Blocking[]. \ No newline at end of file +In the reactive case the `poll` loop will continue and will not wait for the record to be sent unless you specifically annotate the method with ann:core.annotation.Blocking[]. + +To enable transactional sending of the messages you need to define `producerTransactionalId` in `@KafkaListener`. + +.Transactional consumer-producer +[source,java] +---- +include::{testskafka}/consumer/sendto/WordCounter.java[tags=transactional, indent=0] +---- + +<1> The id of the producer to load additional config properties +<2> The transactional id that is required to enable transactional processing +<3> Enable offset strategy to commit the offsets to the transaction +<4> Consumer read messages isolation \ No newline at end of file diff --git a/src/main/docs/guide/toc.yml b/src/main/docs/guide/toc.yml index af69f2409..ebf73813c 100644 --- a/src/main/docs/guide/toc.yml +++ b/src/main/docs/guide/toc.yml @@ -8,6 +8,7 @@ kafkaClient: kafkaClientConfiguration: Configuring @KafkaClient beans kafkaClientBatch: Sending Records in Batch kafkaClientScope: Injecting Kafka Producer Beans + kafkaClientTx: Transactions kafkaEmbedded: Embedding Kafka kafkaListener: title: Kafka Consumers Using @KafkaListener diff --git a/tests/tasks-sasl-plaintext/build.gradle b/tests/tasks-sasl-plaintext/build.gradle index e9a0406ea..391f641d8 100644 --- a/tests/tasks-sasl-plaintext/build.gradle +++ b/tests/tasks-sasl-plaintext/build.gradle @@ -25,4 +25,6 @@ dependencies { testImplementation platform("org.testcontainers:testcontainers-bom:$testContainersVersion") testImplementation "org.testcontainers:kafka" testImplementation "org.testcontainers:spock" + testImplementation "io.micronaut.test:micronaut-test-spock:3.0.4" + testImplementation "io.micronaut.test:micronaut-test-core:3.0.4" } \ No newline at end of file