diff --git a/build.gradle b/build.gradle index b086640f72..e3e1a37492 100644 --- a/build.gradle +++ b/build.gradle @@ -74,7 +74,10 @@ subprojects { subproject -> hamcrestVersion = '1.3' jacksonVersion = '2.9.4' jaywayJsonPathVersion = '2.4.0' - junitVersion = '4.12' + junit4Version = '4.12' + junitJupiterVersion = '5.1.0' + junitPlatformVersion = '1.1.0' + junitVintageVersion = '5.1.0' kafkaVersion = '1.0.0' mockitoVersion = '2.15.0' scalaVersion = '2.11' @@ -93,6 +96,19 @@ subprojects { subproject -> toolVersion = '0.7.9' } + // dependencies that are common across all java projects + dependencies { + testCompile "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion" + testRuntime "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion" + testRuntime "org.junit.platform:junit-platform-launcher:$junitPlatformVersion" + + // To support JUnit 4 tests + testRuntime "org.junit.vintage:junit-vintage-engine:$junitVintageVersion" + + // To avoid compiler warnings about @API annotations in JUnit code + testCompileOnly 'org.apiguardian:apiguardian-api:1.0.0' + } + // enable all compiler warnings; individual projects may customize further [compileJava, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options'] @@ -104,6 +120,7 @@ subprojects { subproject -> append = false destinationFile = file("$buildDir/jacoco.exec") } + useJUnitPlatform() } checkstyle { @@ -202,7 +219,7 @@ project ('spring-kafka-test') { exclude group: 'org.slf4j', module: 'slf4j-log4j12' } - compile ("junit:junit:$junitVersion") { + compile ("junit:junit:$junit4Version") { exclude group: 'org.hamcrest', module: 'hamcrest-core' } compile ("org.mockito:mockito-core:$mockitoVersion") { diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 27768f1bba..f6b961fd5a 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 92165eede8..bf3de21830 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.3-bin.zip diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index cfe87249c2..0100865f65 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -420,7 +421,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume this.definedPartitions = new HashMap<>(topicPartitions.size()); for (TopicPartitionInitialOffset topicPartition : topicPartitions) { this.definedPartitions.put(topicPartition.topicPartition(), - new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent())); + new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(), + topicPartition.getPosition())); } consumer.assign(new ArrayList<>(this.definedPartitions.keySet())); } @@ -647,7 +649,12 @@ public void run() { this.count = 0; this.last = System.currentTimeMillis(); if (isRunning() && this.definedPartitions != null) { - initPartitionsIfNeeded(); + try { + initPartitionsIfNeeded(); + } + catch (Exception e) { + this.logger.error("Failed to set initial offsets", e); + } } long lastReceive = System.currentTimeMillis(); long lastAlertAt = lastReceive; @@ -1186,9 +1193,27 @@ private void initPartitionsIfNeeded() { /* * Note: initial position setting is only supported with explicit topic assignment. * When using auto assignment (subscribe), the ConsumerRebalanceListener is not - * called until we poll() the consumer. + * called until we poll() the consumer. Users can use a ConsumerAwareRebalanceListener + * or a ConsumerSeekAware listener in that case. */ - for (Entry entry : this.definedPartitions.entrySet()) { + Map partitions = new HashMap<>(this.definedPartitions); + Set beginnings = partitions.entrySet().stream() + .filter(e -> SeekPosition.BEGINNING.equals(e.getValue().seekPosition)) + .map(e -> e.getKey()) + .collect(Collectors.toSet()); + beginnings.forEach(k -> partitions.remove(k)); + Set ends = partitions.entrySet().stream() + .filter(e -> SeekPosition.END.equals(e.getValue().seekPosition)) + .map(e -> e.getKey()) + .collect(Collectors.toSet()); + ends.forEach(k -> partitions.remove(k)); + if (beginnings.size() > 0) { + this.consumer.seekToBeginning(beginnings); + } + if (ends.size() > 0) { + this.consumer.seekToEnd(ends); + } + for (Entry entry : partitions.entrySet()) { TopicPartition topicPartition = entry.getKey(); OffsetMetadata metadata = entry.getValue(); Long offset = metadata.offset; @@ -1378,9 +1403,12 @@ private static final class OffsetMetadata { private final boolean relativeToCurrent; - OffsetMetadata(Long offset, boolean relativeToCurrent) { + private final SeekPosition seekPosition; + + OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) { this.offset = offset; this.relativeToCurrent = relativeToCurrent; + this.seekPosition = seekPosition; } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/StreamsBuilderFactoryBeanTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/StreamsBuilderFactoryBeanTests.java index 95950a411b..2c3887a84f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/StreamsBuilderFactoryBeanTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/StreamsBuilderFactoryBeanTests.java @@ -26,9 +26,8 @@ import java.util.Map; import org.apache.kafka.streams.StreamsConfig; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -40,13 +39,14 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** * @author Pawel Szymczyk - * @author Artme Bilan + * @author Artem Bilan + * @author Gary Russell */ -@RunWith(SpringRunner.class) +@SpringJUnitConfig @DirtiesContext @EmbeddedKafka public class StreamsBuilderFactoryBeanTests { @@ -55,7 +55,7 @@ public class StreamsBuilderFactoryBeanTests { private static Path stateStoreDir; - @BeforeClass + @BeforeAll public static void setup() throws IOException { stateStoreDir = Files.createTempDirectory("test-state-dir"); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index d5bb484d3c..2a905191dc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -78,6 +79,7 @@ import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.TopicPartitionInitialOffset; +import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.rule.KafkaEmbedded; @@ -1675,6 +1677,47 @@ public void testPauseResume() throws Exception { container.stop(); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testInitialSeek() throws Exception { + ConsumerFactory cf = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer); + ConsumerRecords emptyRecords = new ConsumerRecords<>(Collections.emptyMap()); + final CountDownLatch latch = new CountDownLatch(1); + given(consumer.poll(anyLong())).willAnswer(i -> { + latch.countDown(); + Thread.sleep(50); + return emptyRecords; + }); + TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] { + new TopicPartitionInitialOffset("foo", 0, SeekPosition.BEGINNING), + new TopicPartitionInitialOffset("foo", 1, SeekPosition.END), + new TopicPartitionInitialOffset("foo", 2, 0L), + new TopicPartitionInitialOffset("foo", 3, Long.MAX_VALUE), + new TopicPartitionInitialOffset("foo", 4, SeekPosition.BEGINNING), + new TopicPartitionInitialOffset("foo", 5, SeekPosition.END), + }; + ContainerProperties containerProps = new ContainerProperties(topicPartition); + containerProps.setAckMode(AckMode.RECORD); + containerProps.setClientId("clientId"); + containerProps.setMessageListener((MessageListener) r -> { }); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.start(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(consumer).seekToBeginning(captor.capture()); + assertThat(captor.getValue() + .equals(new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 4))))); + verify(consumer).seekToEnd(captor.capture()); + assertThat(captor.getValue() + .equals(new HashSet<>(Arrays.asList(new TopicPartition("foo", 1), new TopicPartition("foo", 5))))); + verify(consumer).seek(new TopicPartition("foo", 2), 0L); + verify(consumer).seek(new TopicPartition("foo", 3), Long.MAX_VALUE); + container.stop(); + } + private Consumer spyOnConsumer(KafkaMessageListenerContainer container) { Consumer consumer = spy( KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));