From 9644ec8258d38038a1b644550c2eb4f1f7cdf006 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 11 Jan 2022 15:58:24 -0500 Subject: [PATCH] GH-2061: EKB Seek to End Option for Embedded Topics Resolves https://github.com/spring-projects/spring-kafka/issues/2061 --- .../src/main/asciidoc/testing.adoc | 9 ++- .../kafka/test/EmbeddedKafkaBroker.java | 66 ++++++++++++++----- 2 files changed, 59 insertions(+), 16 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/testing.adoc b/spring-kafka-docs/src/main/asciidoc/testing.adoc index 8b0c640ad9..886d2ccc7f 100644 --- a/spring-kafka-docs/src/main/asciidoc/testing.adoc +++ b/spring-kafka-docs/src/main/asciidoc/testing.adoc @@ -37,11 +37,18 @@ public static Map producerProps(EmbeddedKafkaBroker embeddedKafk ---- ==== -NOTE: Starting with version 2.5, the `consumerProps` method sets the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` to `earliest`. +[NOTE] +==== +Starting with version 2.5, the `consumerProps` method sets the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` to `earliest`. This is because, in most cases, you want the consumer to consume any messages sent in a test case. The `ConsumerConfig` default is `latest` which means that messages already sent by a test, before the consumer starts, will not receive those records. To revert to the previous behavior, set the property to `latest` after calling the method. +When using the embedded broker, it is generally best practice to use a different topic for each test, to prevent cross-talk. +If this is not possible for some reason, note that the `consumeFromEmbeddedTopics` method's default behavior is to seek the assigned partitions to the beginning after assignment. +Since it does not have access to the consumer properties, you must use the overloaded method that takes a `seekToEnd` boolean parameter to seek to the end instead of the beginning. +==== + A JUnit 4 `@Rule` wrapper for the `EmbeddedKafkaBroker` is provided to create an embedded Kafka and an embedded Zookeeper server. (See <> for information about using `@EmbeddedKafka` with JUnit 5). The following listing shows the signatures of those methods: diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java index 91ad327b97..477755abf6 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -700,6 +699,16 @@ public void consumeFromAllEmbeddedTopics(Consumer consumer) { consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0])); } + /** + * Subscribe a consumer to all the embedded topics. + * @param seekToEnd true to seek to the end instead of the beginning. + * @param consumer the consumer. + * @since 2.8.2 + */ + public void consumeFromAllEmbeddedTopics(Consumer consumer, boolean seekToEnd) { + consumeFromEmbeddedTopics(consumer, seekToEnd, this.topics.toArray(new String[0])); + } + /** * Subscribe a consumer to one of the embedded topics. * @param consumer the consumer. @@ -709,6 +718,17 @@ public void consumeFromAnEmbeddedTopic(Consumer consumer, String topic) { consumeFromEmbeddedTopics(consumer, topic); } + /** + * Subscribe a consumer to one of the embedded topics. + * @param consumer the consumer. + * @param seekToEnd true to seek to the end instead of the beginning. + * @param topic the topic. + * @since 2.8.2 + */ + public void consumeFromAnEmbeddedTopic(Consumer consumer, boolean seekToEnd, String topic) { + consumeFromEmbeddedTopics(consumer, seekToEnd, topic); + } + /** * Subscribe a consumer to one or more of the embedded topics. * @param consumer the consumer. @@ -717,13 +737,26 @@ public void consumeFromAnEmbeddedTopic(Consumer consumer, String topic) { * the list of embedded topics (since 2.3.4). */ public void consumeFromEmbeddedTopics(Consumer consumer, String... topicsToConsume) { + consumeFromEmbeddedTopics(consumer, false, topicsToConsume); + } + + /** + * Subscribe a consumer to one or more of the embedded topics. + * @param consumer the consumer. + * @param topicsToConsume the topics. + * @param seekToEnd true to seek to the end instead of the beginning. + * @throws IllegalStateException if you attempt to consume from a topic that is not in + * the list of embedded topics. + * @since 2.8.2 + */ + public void consumeFromEmbeddedTopics(Consumer consumer, boolean seekToEnd, String... topicsToConsume) { List notEmbedded = Arrays.stream(topicsToConsume) .filter(topic -> !this.topics.contains(topic)) .collect(Collectors.toList()); if (notEmbedded.size() > 0) { throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list"); } - final AtomicBoolean assigned = new AtomicBoolean(); + final AtomicReference> assigned = new AtomicReference<>(); consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener() { @Override @@ -732,27 +765,30 @@ public void onPartitionsRevoked(Collection partitions) { @Override public void onPartitionsAssigned(Collection partitions) { - assigned.set(true); + assigned.set(partitions); logger.debug(() -> "partitions assigned: " + partitions); } }); ConsumerRecords records = null; int n = 0; - while (!assigned.get() && n++ < 600) { // NOSONAR magic # + while (assigned.get() == null && n++ < 600) { // NOSONAR magic # records = consumer.poll(Duration.ofMillis(100)); // force assignment NOSONAR magic # } - if (records != null && records.count() > 0) { + if (assigned.get() != null) { final ConsumerRecords theRecords = records; - logger.debug(() -> "Records received on initial poll for assignment; re-seeking to beginning; " - + theRecords.partitions().stream() - .flatMap(p -> theRecords.records(p).stream()) - // map to same format as send metadata toString() - .map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()) - .collect(Collectors.toList())); - consumer.seekToBeginning(records.partitions()); - } - if (!assigned.get()) { + logger.debug(() -> "Partitions assigned " + + assigned.get() + + "; re-seeking to " + + (seekToEnd ? "end; " : "beginning")); + if (seekToEnd) { + consumer.seekToEnd(assigned.get()); + } + else { + consumer.seekToBeginning(assigned.get()); + } + } + else { throw new IllegalStateException("Failed to be assigned partitions from the embedded topics"); } logger.debug("Subscription Initiated");