Skip to content

Commit

Permalink
spring-projectsGH-2061: EKB Seek to End Option for Embedded Topics
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Jan 11, 2022
1 parent 6a5fdc6 commit 9644ec8
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 16 deletions.
9 changes: 8 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/testing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafk
----
====

NOTE: Starting with version 2.5, the `consumerProps` method sets the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` to `earliest`.
[NOTE]
====
Starting with version 2.5, the `consumerProps` method sets the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` to `earliest`.
This is because, in most cases, you want the consumer to consume any messages sent in a test case.
The `ConsumerConfig` default is `latest` which means that messages already sent by a test, before the consumer starts, will not receive those records.
To revert to the previous behavior, set the property to `latest` after calling the method.
When using the embedded broker, it is generally best practice to use a different topic for each test, to prevent cross-talk.
If this is not possible for some reason, note that the `consumeFromEmbeddedTopics` method's default behavior is to seek the assigned partitions to the beginning after assignment.
Since it does not have access to the consumer properties, you must use the overloaded method that takes a `seekToEnd` boolean parameter to seek to the end instead of the beginning.
====

A JUnit 4 `@Rule` wrapper for the `EmbeddedKafkaBroker` is provided to create an embedded Kafka and an embedded Zookeeper server.
(See <<embedded-kafka-annotation>> for information about using `@EmbeddedKafka` with JUnit 5).
The following listing shows the signatures of those methods:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -700,6 +699,16 @@ public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) {
consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0]));
}

/**
* Subscribe a consumer to all the embedded topics.
* @param seekToEnd true to seek to the end instead of the beginning.
* @param consumer the consumer.
* @since 2.8.2
*/
public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd) {
consumeFromEmbeddedTopics(consumer, seekToEnd, this.topics.toArray(new String[0]));
}

/**
* Subscribe a consumer to one of the embedded topics.
* @param consumer the consumer.
Expand All @@ -709,6 +718,17 @@ public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) {
consumeFromEmbeddedTopics(consumer, topic);
}

/**
* Subscribe a consumer to one of the embedded topics.
* @param consumer the consumer.
* @param seekToEnd true to seek to the end instead of the beginning.
* @param topic the topic.
* @since 2.8.2
*/
public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic) {
consumeFromEmbeddedTopics(consumer, seekToEnd, topic);
}

/**
* Subscribe a consumer to one or more of the embedded topics.
* @param consumer the consumer.
Expand All @@ -717,13 +737,26 @@ public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) {
* the list of embedded topics (since 2.3.4).
*/
public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume) {
consumeFromEmbeddedTopics(consumer, false, topicsToConsume);
}

/**
* Subscribe a consumer to one or more of the embedded topics.
* @param consumer the consumer.
* @param topicsToConsume the topics.
* @param seekToEnd true to seek to the end instead of the beginning.
* @throws IllegalStateException if you attempt to consume from a topic that is not in
* the list of embedded topics.
* @since 2.8.2
*/
public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume) {
List<String> notEmbedded = Arrays.stream(topicsToConsume)
.filter(topic -> !this.topics.contains(topic))
.collect(Collectors.toList());
if (notEmbedded.size() > 0) {
throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list");
}
final AtomicBoolean assigned = new AtomicBoolean();
final AtomicReference<Collection<TopicPartition>> assigned = new AtomicReference<>();
consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener() {

@Override
Expand All @@ -732,27 +765,30 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
assigned.set(true);
assigned.set(partitions);
logger.debug(() -> "partitions assigned: " + partitions);
}

});
ConsumerRecords<?, ?> records = null;
int n = 0;
while (!assigned.get() && n++ < 600) { // NOSONAR magic #
while (assigned.get() == null && n++ < 600) { // NOSONAR magic #
records = consumer.poll(Duration.ofMillis(100)); // force assignment NOSONAR magic #
}
if (records != null && records.count() > 0) {
if (assigned.get() != null) {
final ConsumerRecords<?, ?> theRecords = records;
logger.debug(() -> "Records received on initial poll for assignment; re-seeking to beginning; "
+ theRecords.partitions().stream()
.flatMap(p -> theRecords.records(p).stream())
// map to same format as send metadata toString()
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
.collect(Collectors.toList()));
consumer.seekToBeginning(records.partitions());
}
if (!assigned.get()) {
logger.debug(() -> "Partitions assigned "
+ assigned.get()
+ "; re-seeking to "
+ (seekToEnd ? "end; " : "beginning"));
if (seekToEnd) {
consumer.seekToEnd(assigned.get());
}
else {
consumer.seekToBeginning(assigned.get());
}
}
else {
throw new IllegalStateException("Failed to be assigned partitions from the embedded topics");
}
logger.debug("Subscription Initiated");
Expand Down

0 comments on commit 9644ec8

Please sign in to comment.