Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka transactions #447

Merged
merged 5 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.micronaut.inject.FieldInjectionPoint;
import io.micronaut.inject.InjectionPoint;
import io.micronaut.inject.qualifiers.Qualifiers;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -54,7 +55,7 @@
* @since 1.0
*/
@Factory
public class KafkaProducerFactory implements ProducerRegistry {
public class KafkaProducerFactory implements ProducerRegistry, TransactionalProducerRegistry {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFactory.class);
private final Map<ClientKey, Producer> clients = new ConcurrentHashMap<>();
private final BeanContext beanContext;
Expand Down Expand Up @@ -126,16 +127,16 @@ public <K, V> Producer<K, V> getProducer(
throw new ConfigurationException("@KafkaClient used on type missing generic argument values for Key and Value: " + injectionPoint);
}
final String id = injectionPoint.getAnnotationMetadata().stringValue(KafkaClient.class).orElse(null);
return getKafkaProducer(id, k, v);
return getKafkaProducer(id, null, k, v, false);
}

@SuppressWarnings("unchecked")
private <T> T getKafkaProducer(@Nullable String id, Argument<?> keyType, Argument<?> valueType) {
private <T> T getKafkaProducer(@Nullable String id, @Nullable String transactionalId, Argument<?> keyType, Argument<?> valueType, boolean transactional) {
ClientKey key = new ClientKey(
id,
keyType.getType(),
valueType.getType()
);
valueType.getType(),
transactional);

return (T) clients.computeIfAbsent(key, clientKey -> {
Supplier<AbstractKafkaProducerConfiguration> defaultResolver = () -> beanContext.getBean(AbstractKafkaProducerConfiguration.class);
Expand Down Expand Up @@ -163,10 +164,21 @@ private <T> T getKafkaProducer(@Nullable String id, Argument<?> keyType, Argumen
newConfig.setValueSerializer(valueSerializer);
}

if (StringUtils.isNotEmpty(transactionalId)) {
properties.putIfAbsent(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
properties.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
}

if (hasId) {
properties.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, id);
}
return beanContext.createBean(Producer.class, newConfig);

Producer producer = beanContext.createBean(Producer.class, newConfig);
if (transactional) {
dstepanov marked this conversation as resolved.
Show resolved Hide resolved
producer.initTransactions();
}

return producer;
});
}

Expand All @@ -187,9 +199,23 @@ protected void stop() {

@Override
public <K, V> Producer<K, V> getProducer(String id, Argument<K> keyType, Argument<V> valueType) {
return getKafkaProducer(id, keyType, valueType);
return getKafkaProducer(id, null, keyType, valueType, false);
}

@Override
public <K, V> Producer<K, V> getTransactionalProducer(String id, String transactionalId, Argument<K> keyType, Argument<V> valueType) {
return getKafkaProducer(id, transactionalId, keyType, valueType, true);
}

@Override
public void close(Producer<?, ?> producer) {
for (Map.Entry<ClientKey, Producer> e : clients.entrySet()) {
if (e.getValue() == producer) {
clients.remove(e.getKey());
break;
}
}
}

/**
* key for retrieving built producers.
Expand All @@ -201,11 +227,13 @@ private static final class ClientKey {
private final String id;
private final Class<?> keyType;
private final Class<?> valueType;
private final boolean transactional;

ClientKey(String id, Class<?> keyType, Class<?> valueType) {
ClientKey(String id, Class<?> keyType, Class<?> valueType, boolean transactional) {
this.id = id;
this.keyType = keyType;
this.valueType = valueType;
this.transactional = transactional;
}

@Override
Expand All @@ -219,13 +247,13 @@ public boolean equals(Object o) {
ClientKey clientKey = (ClientKey) o;
return Objects.equals(id, clientKey.id) &&
Objects.equals(keyType, clientKey.keyType) &&
Objects.equals(valueType, clientKey.valueType);
Objects.equals(valueType, clientKey.valueType) &&
Objects.equals(transactional, clientKey.transactional);
}

@Override
public int hashCode() {

return Objects.hash(id, keyType, valueType);
return Objects.hash(id, keyType, valueType, transactional);
}
}
}
154 changes: 154 additions & 0 deletions kafka/src/main/java/io/micronaut/configuration/kafka/Message.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2017-2021 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka;

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;

import java.util.Map;

/**
* Message payload representation.
*
* @author Denis Stepanov
* @since 4.1.0
*/
public final class Message {
dstepanov marked this conversation as resolved.
Show resolved Hide resolved

private final String topic;
private final Object key;
private final Object body;
private final Integer partition;
private final Long timestamp;
private final Map<String, Object> headers;

/**
* The default contructor.
*
* @param topic The topic
* @param key The key
* @param body The body
* @param partition The partition
* @param timestamp The timestamp
* @param headers The headers
*/
public Message(@Nullable String topic, @Nullable Object key, @Nullable Object body, @Nullable Integer partition,
@Nullable Long timestamp, @Nullable Map<String, Object> headers) {
this.topic = topic;
this.key = key;
this.body = body;
this.partition = partition;
this.timestamp = timestamp;
this.headers = headers;
}

@Nullable
public String getTopic() {
return topic;
}

@Nullable
public Object getKey() {
return key;
}

@Nullable
public Object getBody() {
return body;
}

@Nullable
public Integer getPartition() {
return partition;
}

@Nullable
public Long getTimestamp() {
return timestamp;
}

@Nullable
public Map<String, Object> getHeaders() {
return headers;
}

/**
* The message builder.
*/
public static final class Builder {
private String topic;
private Object key;
private Object body;
private Integer partition;
private Long timestamp;
private Map<String, Object> headers;

@NonNull
public static Builder withBody(@Nullable Object body) {
Builder builder = new Builder();
builder.body = body;
return builder;
}

@NonNull
public static Builder withoutBody() {
Builder builder = new Builder();
return builder;
}

@NonNull
public Builder topic(@Nullable String topic) {
this.topic = topic;
return this;
}

@NonNull
public Builder key(@Nullable Object key) {
this.key = key;
return this;
}

@NonNull
public Builder body(@Nullable Object body) {
this.body = body;
return this;
}

@NonNull
public Builder header(@Nullable Map<String, Object> headers) {
this.headers = headers;
return this;
}

@NonNull
public Builder partition(@Nullable Integer partition) {
this.partition = partition;
return this;
}

@NonNull
public Builder timestamp(@Nullable Long timestamp) {
this.timestamp = timestamp;
return this;
}

@NonNull
public Message build() {
return new Message(topic, key, body, partition, timestamp, headers);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka;

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.type.Argument;
import org.apache.kafka.clients.producer.Producer;

/**
* A registry of managed transactional {@link Producer} instances key by id and type.
*
* The producer instance returned will have transactions initialized by executing {@link Producer#initTransactions()}.
*
* @author Denis Stepanov
* @since 4.1.0
*/
public interface TransactionalProducerRegistry {

/**
* Returns a transactional managed Producer.
*
* @param clientId The client id of the producer.
* @param transactionalId The transactional id of the producer.
* @param keyType The key type
* @param valueType The value type
* @param <K> The key generic type
* @param <V> The value generic type
* @return The producer
* @since
*/
@NonNull
<K, V> Producer<K, V> getTransactionalProducer(@Nullable String clientId, String transactionalId, Argument<K> keyType, Argument<V> valueType);

/**
* Closed the producer.
* Should be used for cases when {@link org.apache.kafka.common.errors.ProducerFencedException} is thrown.
*
* @param producer The producer
*/
void close(Producer<?, ?> producer);
dstepanov marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
@AliasFor(member = "value")
String id() default "";

/**
* The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer
* sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions.
* If no TransactionalId is provided, then the producer is limited to idempotent delivery.
* If a TransactionalId is configured, <code>enable.idempotence</code> is implied.
* By default, the TransactionId is not configured, which means transactions cannot be used.
*
* @return true to enable transaction
*/
String transactionalId() default "";

/**
* The maximum duration to block synchronous send operations.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.micronaut.context.annotation.*;
import io.micronaut.messaging.annotation.MessageListener;
import org.apache.kafka.common.IsolationLevel;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand Down Expand Up @@ -83,6 +85,35 @@
*/
OffsetReset offsetReset() default OffsetReset.LATEST;

/**
* The client id of the producer that is used for {@link io.micronaut.messaging.annotation.SendTo}.
*
* @return the producer client id
*/
String producerClientId() default "";

/**
* This setting applies only for the producer that is used for {@link io.micronaut.messaging.annotation.SendTo}.
*
* The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer
* sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions.
* If no TransactionalId is provided, then the producer is limited to idempotent delivery.
* If a TransactionalId is configured, <code>enable.idempotence</code> is implied.
* By default, the TransactionId is not configured, which means transactions cannot be used.
*
* @return the producer transaction id
*/
String producerTransactionalId() default "";

/**
* Kafka consumer isolation level to control how to read messages written transactionally.
*
* See {@link org.apache.kafka.clients.consumer.ConsumerConfig#ISOLATION_LEVEL_CONFIG}.
*
* @return The isolation level
*/
IsolationLevel isolation() default IsolationLevel.READ_UNCOMMITTED;

/**
* Setting the error strategy allows you to resume at the next offset
* or to seek the consumer (stop on error) to the failed offset so that
Expand Down
Loading