Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for batch Acknowledgement. (#54) #692

Merged
merged 3 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ private void createConsumerThreadPollLoop(final ExecutableMethod<?, ?> method,
}

if (isBatch) {
failed = !processConsumerRecordsAsBatch(consumerState, method, boundArguments, consumerRecords);
failed = !processConsumerRecordsAsBatch(consumerState, method, boundArguments, ackArg.orElse(null), consumerRecords);
} else {
failed = !processConsumerRecords(consumerState, method, boundArguments, trackPartitions, ackArg, consumerRecords);
}
Expand Down Expand Up @@ -645,7 +645,18 @@ private Duration computeRetryDelay(ErrorStrategyValue errorStrategy, Duration fi
private boolean processConsumerRecordsAsBatch(final ConsumerState consumerState,
final ExecutableMethod<?, ?> method,
final Map<Argument<?>, Object> boundArguments,
@Nullable final Argument<?> ackArg,
final ConsumerRecords<?, ?> consumerRecords) {
if (ackArg != null) {
Map<TopicPartition, OffsetAndMetadata> batchOffsets = new HashMap<>();
for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1, null);
batchOffsets.put(topicPartition, offsetAndMetadata);
}
boundArguments.put(ackArg, (KafkaAcknowledgement) () -> consumerState.kafkaConsumer.commitSync(batchOffsets));
}

final ExecutableBinder<ConsumerRecords<?, ?>> batchBinder = new DefaultExecutableBinder<>(boundArguments);
final BoundExecutable boundExecutable = batchBinder.bind(method, batchBinderRegistry, consumerRecords);
Object result = boundExecutable.invoke(consumerState.consumerBean);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,4 @@ public Flux<Book> receiveFlux(Flux<Book> books) {
);
}
// end::reactive[]

// tag::manual[]
@Topic("all-the-books")
public void receive(List<Book> books,
List<Long> offsets,
List<Integer> partitions,
List<String> topics,
Consumer kafkaConsumer) { // <1>

for (int i = 0; i < books.size(); i++) {

// process the book
Book book = books.get(i); // <2>

// commit offsets
String topic = topics.get(i);
int partition = partitions.get(i);
long offset = offsets.get(i); // <3>

kafkaConsumer.commitSync(Collections.singletonMap( // <4>
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, "my metadata")
));

}
}
// end::manual[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.micronaut.configuration.kafka.docs.consumer.batch.ack;

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.docs.consumer.batch.Book;
import io.micronaut.messaging.Acknowledgement;

import java.util.List;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED;
// end::imports[]

class BookListener {

// tag::method[]
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1>
@Topic("all-the-books")
public void receive(List<Book> books,
Acknowledgement acknowledgement) { // <2>

//process the books

acknowledgement.ack(); // <3>
}
// end::method[]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.micronaut.configuration.kafka.docs.consumer.batch.manual;

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.docs.consumer.batch.Book;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.List;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED;
// end::imports[]

class BookListener {

// tag::method[]
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1>
@Topic("all-the-books")
public void receive(List<Book> books,
List<Long> offsets,
List<Integer> partitions,
List<String> topics,
Consumer kafkaConsumer) { // <2>

for (int i = 0; i < books.size(); i++) {

// process the book
Book book = books.get(i); // <3>

// commit offsets
String topic = topics.get(i);
int partition = partitions.get(i);
long offset = offsets.get(i); // <4>

kafkaConsumer.commitSync(Collections.singletonMap( // <5>
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, "my metadata")
));

}
}
// end::method[]
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.micronaut.configuration.kafka.docs.consumer.offsets.ack;

// tag::imports[]
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.docs.consumer.config.Product;
import io.micronaut.messaging.Acknowledgement;
// end::imports[]

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.micronaut.configuration.kafka.offsets

import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.Acknowledgement
import io.micronaut.serde.annotation.Serdeable
import jakarta.inject.Singleton

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED
import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS

class BatchManualAckSpec extends AbstractKafkaContainerSpec {

public static final String TOPIC_SYNC = "BatchManualAckSpec-products-sync"

protected Map<String, Object> getConfiguration() {
super.configuration +
[(EMBEDDED_TOPICS): [TOPIC_SYNC]]
}

void "test manual ack"() {
given:
ProductClient client = context.getBean(ProductClient)
ProductListener listener = context.getBean(ProductListener)

when:
client.send(new Product(name: "Apple"))
client.send(new Product(name: "Orange"))

then:
conditions.eventually {
listener.products.size() == 2
listener.products.find() { it.name == "Apple"}
}
}

@Requires(property = 'spec.name', value = 'BatchManualAckSpec')
@KafkaClient
static interface ProductClient {
@Topic(BatchManualAckSpec.TOPIC_SYNC)
void send(Product product)
}

@Requires(property = 'spec.name', value = 'BatchManualAckSpec')
@Singleton
static class ProductListener {

List<Product> products = []

@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true)
@Topic(BatchManualAckSpec.TOPIC_SYNC)
void receive(List<Product> products, Acknowledgement acknowledgement) {
int i = 0
for (p in products) {
this.products << p
}
acknowledgement.ack()
}
}

@Serdeable
static class Product {
String name
}
}
29 changes: 24 additions & 5 deletions src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,37 @@ Note in the previous case offsets will automatically be committed for the whole

== Manually Committing Offsets with Batch

As with one by one message processing, if you set the `OffsetStrategy` to api:configuration.kafka.annotation.OffsetStrategy#DISABLED[] it becomes your responsibility to commit offsets.

If you want to commit the entire batch of offsets at once during the course of processing, then the simplest approach is to add an argument of type link:{apimicronaut}messaging/Acknowledgement.html[Acknowledgement] and call the `ack()` method to commit the batch of offsets synchronously:

.Committing a Batch of Offsets Manually with ack()
[source,java]
----
include::{testskafka}/consumer/batch/ack/BookListener.java[tags=imports, indent=0]

include::{testskafka}/consumer/batch/ack/BookListener.java[tags=method, indent=0]
----

<1> Committing offsets automatically is disabled
<2> The listener method specifies a parameter of type link:{apimicronaut}messaging/Acknowledgement.html[Acknowledgement]
<3> The `ack()` method is called once the records have been processed

You can also take more control of committing offsets when doing batch processing by specifying a method that receives the offsets in addition to the batch:

.Committing Offsets Manually with Batch
[source,java]
----
include::{testskafka}/consumer/batch/BookListener.java[tags=manual, indent=0]
include::{testskafka}/consumer/batch/manual/BookListener.java[tags=imports, indent=0]

include::{testskafka}/consumer/batch/manual/BookListener.java[tags=method, indent=0]
----

<1> The method receives the batch of records as well as the offsets, partitions and topics
<2> Each record is processed
<3> The offset, partition and topic is read for the record
<4> Offsets are committed
<1> Committing offsets automatically is disabled
<2> The method receives the batch of records as well as the offsets, partitions and topics
<3> Each record is processed
<4> The offset, partition and topic is read for the record
<5> Offsets are committed

This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application.

Expand Down
4 changes: 2 additions & 2 deletions src/main/docs/guide/kafkaListener/kafkaOffsets.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ If you set the `OffsetStrategy` to api:configuration.kafka.annotation.OffsetStra

There are a couple of ways that can be achieved.

The simplest way is to define an argument of type api:configuration.kafka.Acknowledgement[] and call the `ack()` method to commit offsets synchronously:
The simplest way is to define an argument of type link:{apimicronaut}messaging/Acknowledgement.html[Acknowledgement] and call the `ack()` method to commit offsets synchronously:

.Committing offsets with `ack()`
[source,java]
Expand All @@ -49,7 +49,7 @@ include::{testskafka}/consumer/offsets/ack/ProductListener.java[tags=method, ind
----

<1> Committing offsets automatically is disabled
<2> The listener method specifies a parameter of type api:configuration.kafka.Acknowledgement[]
<2> The listener method specifies a parameter of type link:{apimicronaut}messaging/Acknowledgement.html[Acknowledgement]
<3> The `ack()` method is called once the record has been processed

Alternatively, you an supply a `KafkaConsumer` method argument and then call `commitSync` (or `commitAsync`) yourself when you are ready to commit offsets:
Expand Down