Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KafkaPartition and KafkaPartitionKey annotations #387

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.annotation;

import io.micronaut.core.bind.annotation.Bindable;

import java.lang.annotation.*;

/**
* Parameter level annotation to indicate which parameter is bound to the Kafka Partition.
* It can be used in {@link java.lang.Integer} or {@code int}.
*
* <p>When used in producers, indicates which partition is to be used. If the provided value is {@code null}
* then the configured/default partitioning strategy takes place.</p>
*
* <p>When used in consumers, it is populated with the partition that the record was received from.</p>
*
* <p>Note that while using {@link KafkaPartition} in the same method as {@link KafkaPartitionKey}
* will not throw an exception, the outcome of doing so is left unspecified.</p>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is good behaviour. I'm happy to implement a different one with some advice/guidance

*
* @author André Prata
* @since 3.3.4
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable
public @interface KafkaPartition {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.annotation;

import io.micronaut.core.bind.annotation.Bindable;

import java.lang.annotation.*;

/**
* Parameter level annotation for Kafka producers to indicate which parameter to compute the Kafka Partition from.
*
* <p>The partition is computed by first serializing the object, using an appropriate serializer from
* {@link io.micronaut.configuration.kafka.serde.SerdeRegistry} as determined by, and then computing the partition
* number using the same algorithm as Kafka's own {@code DefaultStrategy} ({@code murmur2})<p/>
*
* <p>If the provided value is {@code null} then the configured/default partitioning strategy takes place.</p>
*
* <p>Note that while using {@link KafkaPartitionKey} in the same method as {@link KafkaPartition}
* will not throw an exception, the outcome of doing so is left unspecified.</p>
*
* @author André Prata
* @since 3.3.4
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable
public @interface KafkaPartitionKey {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.bind;

import io.micronaut.configuration.kafka.annotation.KafkaPartition;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import javax.inject.Singleton;
import java.util.Optional;

/**
* Binder for binding the parameters that is designated the {@link KafkaPartition}.
*
* @param <T> The target type
* @author André Prata
* @since 3.3.4
*/
@Singleton
public class KafkaPartitionBinder<T> implements AnnotatedConsumerRecordBinder<KafkaPartition, T> {

@Override
public Class<KafkaPartition> annotationType() {
return KafkaPartition.class;
}

@Override
public BindingResult<T> bind(ArgumentConversionContext<T> context, ConsumerRecord<?, ?> source) {
Object partition = source.partition();
Optional<T> converted = ConversionService.SHARED.convert(partition, context);
return () -> converted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaPartition;
import io.micronaut.configuration.kafka.annotation.KafkaPartitionKey;
import io.micronaut.configuration.kafka.annotation.KafkaTimestamp;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration;
Expand Down Expand Up @@ -51,6 +53,7 @@
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
Expand All @@ -73,6 +76,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* Implementation of the {@link io.micronaut.configuration.kafka.annotation.KafkaClient} advice annotation.
Expand Down Expand Up @@ -142,6 +146,7 @@ public final Object intercept(MethodInvocationContext<Object, Object> context) {
Object key = null;
Object value = null;
Long timestampArgument = null;
Function<Producer, Integer> partitionSupplier = producer -> null;
for (int i = 0; i < arguments.length; i++) {
Argument argument = arguments[i];
if (ProducerRecord.class.isAssignableFrom(argument.getType()) || argument.isAnnotationPresent(Body.class)) {
Expand All @@ -160,6 +165,20 @@ public final Object intercept(MethodInvocationContext<Object, Object> context) {
if (o instanceof Long) {
timestampArgument = (Long) o;
}
} else if (argument.isAnnotationPresent(KafkaPartition.class)) {
Object o = parameterValues[i];
if (o != null && Integer.class.isAssignableFrom(o.getClass())) {
partitionSupplier = __ -> (Integer) o;
}
} else if (argument.isAnnotationPresent(KafkaPartitionKey.class)) {
String finalTopic = topic;
Object partitionKey = parameterValues[i];
if (partitionKey != null) {
byte[] partitionKeyBytes = Optional.ofNullable(serdeRegistry.pickSerializer(argument))
.orElseGet(ByteArraySerializer::new)
.serialize(finalTopic, parameterValues[i]);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how expensive it might be to pick a serializer from the registry on every single call.

If it's deemed expensive then we can/should keep it in a map, but I'm not entirely sure what it should be keyed by. I suppose for the current implementation it would make the most sense to key by Class, but in the end that's more or less what I'd expect the registry implementations to do.

partitionSupplier = producer -> Utils.toPositive(Utils.murmur2(partitionKeyBytes)) % producer.partitionsFor(finalTopic).size();
}
} else if (argument.isAnnotationPresent(io.micronaut.messaging.annotation.Header.class)) {
final AnnotationMetadata annotationMetadata = argument.getAnnotationMetadata();
String argumentName = argument.getName();
Expand Down Expand Up @@ -224,6 +243,8 @@ public final Object intercept(MethodInvocationContext<Object, Object> context) {

Producer kafkaProducer = getProducer(bodyArgument, keyArgument, context);

Integer partition = partitionSupplier.apply(kafkaProducer);

Long timestamp = client.isTrue("timestamp") ? Long.valueOf(System.currentTimeMillis()) : timestampArgument;
Duration maxBlock = context.getValue(KafkaClient.class, "maxBlock", Duration.class)
.orElse(null);
Expand Down Expand Up @@ -253,6 +274,7 @@ public final Object intercept(MethodInvocationContext<Object, Object> context) {
kafkaHeaders,
reactiveTypeValue,
key,
partition,
value,
timestamp,
maxBlock);
Expand Down Expand Up @@ -291,7 +313,7 @@ public void onComplete() {
});
} else {

ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, value, timestamp);
ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, value, timestamp);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record);
}
Expand Down Expand Up @@ -328,6 +350,7 @@ public void onComplete() {
kafkaHeaders,
reactiveTypeValue,
key,
partition,
value,
timestamp,
maxBlock);
Expand All @@ -351,13 +374,14 @@ public void onComplete() {
String finalTopic = topic;
Argument finalBodyArgument = bodyArgument;
Object finalKey = key;
Integer finalPartition = partition;
Argument finalReactiveTypeValue = reactiveTypeValue;
returnFlowable = bodyEmitter.flatMap(o ->
buildSendFlowable(context, finalTopic, finalBodyArgument, kafkaProducer, kafkaHeaders, finalKey, o, timestamp, finalReactiveTypeValue)
buildSendFlowable(context, finalTopic, finalBodyArgument, kafkaProducer, kafkaHeaders, finalKey, finalPartition, o, timestamp, finalReactiveTypeValue)
);

} else {
returnFlowable = buildSendFlowable(context, topic, bodyArgument, kafkaProducer, kafkaHeaders, key, value, timestamp, reactiveTypeValue);
returnFlowable = buildSendFlowable(context, topic, bodyArgument, kafkaProducer, kafkaHeaders, key, partition, value, timestamp, reactiveTypeValue);
}
}
return interceptedMethod.handleResult(returnFlowable);
Expand All @@ -373,6 +397,7 @@ public void onComplete() {
kafkaHeaders,
returnTypeArgument,
key,
partition,
value,
timestamp,
maxBlock
Expand Down Expand Up @@ -401,7 +426,7 @@ public void onComplete() {

List results = new ArrayList();
for (Object o : batchValue) {
ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, o, timestamp);
ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, o, timestamp);

if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record);
Expand All @@ -425,7 +450,7 @@ public void onComplete() {
}
});
}
ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, value, timestamp);
ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, value, timestamp);

LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record);

Expand Down Expand Up @@ -484,11 +509,12 @@ private Flowable buildSendFlowable(
Producer kafkaProducer,
List<Header> kafkaHeaders,
Object key,
Integer partition,
Object value,
Long timestamp,
Argument reactiveValueType) {
Flowable returnFlowable;
ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, value, timestamp);
ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, value, timestamp);
returnFlowable = Flowable.create(emitter -> kafkaProducer.send(record, (metadata, exception) -> {
if (exception != null) {
emitter.onError(wrapException(context, exception));
Expand All @@ -515,6 +541,7 @@ private Flowable<Object> buildSendFlowable(
List<Header> kafkaHeaders,
Argument<?> returnType,
Object key,
Integer partition,
Object value,
Long timestamp,
Duration maxBlock) {
Expand All @@ -527,7 +554,7 @@ private Flowable<Object> buildSendFlowable(

Class<?> finalJavaReturnType = javaReturnType;
Flowable<Object> sendFlowable = valueFlowable.flatMap(o -> {
ProducerRecord record = buildProducerRecord(topic, kafkaHeaders, key, o, timestamp);
ProducerRecord record = buildProducerRecord(topic, partition, kafkaHeaders, key, o, timestamp);

LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record);

Expand Down Expand Up @@ -565,10 +592,10 @@ private MessagingClientException wrapException(MethodInvocationContext<Object, O
}

@SuppressWarnings("unchecked")
private ProducerRecord buildProducerRecord(String topic, List<Header> kafkaHeaders, Object key, Object value, Long timestamp) {
private ProducerRecord buildProducerRecord(String topic, Integer partition, List<Header> kafkaHeaders, Object key, Object value, Long timestamp) {
return new ProducerRecord(
topic,
null,
partition,
timestamp,
key,
value,
Expand Down Expand Up @@ -660,7 +687,7 @@ private Producer getProducer(Argument bodyArgument, @Nullable Argument keyArgume
/**
* Key used to cache {@link org.apache.kafka.clients.producer.Producer} instances.
*/
private final class ProducerKey {
private static final class ProducerKey {
final Class keyType;
final Class valueType;
final String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import spock.lang.Shared

abstract class AbstractKafkaContainerSpec extends AbstractKafkaSpec {

@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer()
@Shared @AutoCleanup KafkaContainer kafkaContainer = new KafkaContainer().withEnv(getEnvVariables())
@Shared @AutoCleanup ApplicationContext context

void setupSpec() {
Expand All @@ -17,4 +17,9 @@ abstract class AbstractKafkaContainerSpec extends AbstractKafkaSpec {
['kafka.bootstrap.servers': kafkaContainer.bootstrapServers]
)
}

protected Map<String, String> getEnvVariables() {
return [:]
}

}
Loading