diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index e8ec5e9fe5f7a..4fd7a591eb660 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -38,6 +38,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -60,6 +61,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob private InternalProcessorContext processorContext; private final int retries; private final long retryBackoffMs; + private final Duration pollTime; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -76,6 +78,7 @@ public GlobalStateManagerImpl(final LogContext logContext, this.stateRestoreListener = stateRestoreListener; this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG); this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG); + this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); } @Override @@ -262,7 +265,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, while (offset < highWatermark) { try { - final ConsumerRecords records = globalConsumer.poll(100); + final ConsumerRecords records = globalConsumer.poll(pollTime); final List> restoreRecords = new ArrayList<>(); for (ConsumerRecord record : records) { if (record.key() != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 112011f47b8eb..9d529c5455c46 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -200,7 +201,7 @@ static class StateConsumer { private final Consumer globalConsumer; private final GlobalStateMaintainer stateMaintainer; private final Time time; - private final long pollMs; + private final Duration pollTime; private final long flushInterval; private final Logger log; @@ -210,13 +211,13 @@ static class StateConsumer { final Consumer globalConsumer, final GlobalStateMaintainer stateMaintainer, final Time time, - final long pollMs, + final Duration pollTime, final long flushInterval) { this.log = logContext.logger(getClass()); this.globalConsumer = globalConsumer; this.stateMaintainer = stateMaintainer; this.time = time; - this.pollMs = pollMs; + this.pollTime = pollTime; this.flushInterval = flushInterval; } @@ -235,7 +236,7 @@ void initialize() { void pollAndUpdate() { try { - final ConsumerRecords received = globalConsumer.poll(pollMs); + final ConsumerRecords received = globalConsumer.poll(pollTime); for (final ConsumerRecord record : received) { stateMaintainer.update(record); } @@ -338,8 +339,9 @@ private StateConsumer initialize() { logContext ), time, - config.getLong(StreamsConfig.POLL_MS_CONFIG), - config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), + config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + ); stateConsumer.initialize(); return stateConsumer; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index bb0ed069670a9..07af8019aefad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.slf4j.Logger; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -49,11 +50,14 @@ public class StoreChangelogReader implements ChangelogReader { private final Map stateRestorers = new HashMap<>(); private final Map needsRestoring = new HashMap<>(); private final Map needsInitializing = new HashMap<>(); + private final Duration pollTime; public StoreChangelogReader(final Consumer restoreConsumer, + final Duration pollTime, final StateRestoreListener userStateRestoreListener, final LogContext logContext) { this.restoreConsumer = restoreConsumer; + this.pollTime = pollTime; this.log = logContext.logger(getClass()); this.userStateRestoreListener = userStateRestoreListener; } @@ -76,7 +80,7 @@ public Collection restore(final RestoringTasks active) { } try { - final ConsumerRecords records = restoreConsumer.poll(10); + final ConsumerRecords records = restoreConsumer.poll(pollTime); final Iterator iterator = needsRestoring.keySet().iterator(); while (iterator.hasNext()) { final TopicPartition partition = iterator.next(); @@ -295,6 +299,7 @@ private boolean hasPartition(final TopicPartition topicPartition) { return true; } } + return false; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e72c4a5de9405..a159e7b6c7a76 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -50,6 +50,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -212,7 +213,7 @@ State setState(final State newState) { if (newState == State.RUNNING) { updateThreadMetadata(taskManager.activeTasks(), taskManager.standbyTasks()); } else { - updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); + updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); } } @@ -555,7 +556,7 @@ static class StreamsMetricsThreadImpl extends StreamsMetricsImpl { } private final Time time; - private final long pollTimeMs; + private final Duration pollTime; private final long commitTimeMs; private final Object stateLock; private final Logger log; @@ -602,7 +603,8 @@ public static StreamThread create(final InternalTopologyBuilder builder, log.info("Creating restore consumer client"); final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(threadClientId); final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); - final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext); + final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); + final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, pollTime, userStateRestoreListener, logContext); Producer threadProducer = null; final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); @@ -710,10 +712,10 @@ public StreamThread(final Time time, this.originalReset = originalReset; this.versionProbingFlag = versionProbingFlag; - this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); + this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); - updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); + updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap()); } /** @@ -801,14 +803,14 @@ long runOnce(final long recordsProcessedBeforeCommit) { if (state == State.PARTITIONS_ASSIGNED) { // try to fetch some records with zero poll millis // to unblock the restoration as soon as possible - records = pollRequests(0L); + records = pollRequests(Duration.ZERO); if (taskManager.updateNewAndRestoringTasks()) { setState(State.RUNNING); } } else { // try to fetch some records if necessary - records = pollRequests(pollTimeMs); + records = pollRequests(pollTime); // if state changed after the poll call, // try to initialize the assigned tasks again @@ -843,15 +845,15 @@ long runOnce(final long recordsProcessedBeforeCommit) { /** * Get the next batch of records by polling. * - * @param pollTimeMs poll time millis parameter for the consumer poll + * @param pollTime how long to block in Consumer#poll * @return Next batch of records or null if no records available. * @throws TaskMigratedException if the task producer got fenced (EOS only) */ - private ConsumerRecords pollRequests(final long pollTimeMs) { + private ConsumerRecords pollRequests(final Duration pollTime) { ConsumerRecords records = null; try { - records = consumer.poll(pollTimeMs); + records = consumer.poll(pollTime); } catch (final InvalidOffsetException e) { resetInvalidOffsets(e); } @@ -1078,7 +1080,11 @@ private void maybeUpdateStandbyTasks(final long now) { } try { - final ConsumerRecords records = restoreConsumer.poll(0); + // poll(0): Since this is during the normal processing, not during restoration. + // We can afford to have slower restore (because we don't wait inside poll for results). + // Instead, we want to proceed to the next iteration to call the main consumer#poll() + // as soon as possible so as to not be kicked out of the group. + final ConsumerRecords records = restoreConsumer.poll(Duration.ZERO); if (!records.isEmpty()) { for (final TopicPartition partition : records.partitions()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 297b2434c06cb..8635b94544ea5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -25,6 +27,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; @@ -42,7 +45,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -234,9 +236,8 @@ public boolean conditionMet() { assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); } - @Ignore // this test cannot pass as long as GST blocks KS.start() @Test - public void testGlobalThreadCloseWithoutConnectingToBroker() { + public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1"); @@ -244,16 +245,26 @@ public void testGlobalThreadCloseWithoutConnectingToBroker() { props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); + // We want to configure request.timeout.ms, but it must be larger than a + // few other configs. + props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 200); + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 201); + props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 202); + final StreamsBuilder builder = new StreamsBuilder(); // make sure we have the global state thread running too builder.globalTable("anyTopic"); final KafkaStreams streams = new KafkaStreams(builder.build(), props); - streams.start(); - streams.close(); + try { + streams.start(); + fail("expected start() to time out and throw an exception."); + } catch (final StreamsException expected) { + // This is a result of not being able to connect to the broker. + } // There's nothing to assert... We're testing that this operation actually completes. } - @Ignore // this test cannot pass until we implement KIP-266 @Test public void testLocalThreadCloseWithoutConnectingToBroker() { final Properties props = new Properties(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index fe897c7ac30e6..86cb331956c07 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -44,6 +44,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -464,7 +465,7 @@ private static List> readRecords(final String topic, while (totalPollTimeMs < waitTime && continueConsuming(consumerRecords.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; - final ConsumerRecords records = consumer.poll(pollIntervalMs); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(pollIntervalMs)); for (final ConsumerRecord record : records) { consumerRecords.add(record); diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 8187467aaa61a..7179293200ed7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -54,6 +54,7 @@ import org.apache.kafka.streams.state.WindowStore; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -334,7 +335,7 @@ private void consumeAndProduce(final String topic) { consumer.seekToBeginning(partitions); while (true) { - final ConsumerRecords records = consumer.poll(POLL_MS); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_MS)); if (records.isEmpty()) { if (processedRecords == numRecords) { break; @@ -372,7 +373,7 @@ private void consume(final String topic) { consumer.seekToBeginning(partitions); while (true) { - final ConsumerRecords records = consumer.poll(POLL_MS); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_MS)); if (records.isEmpty()) { if (processedRecords == numRecords) { break; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index 347e9c4fd75c0..4ed44be47f2d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -42,6 +42,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -233,7 +234,7 @@ private AbstractTask createTask(final Consumer consumer, storeTopicPartitions, ProcessorTopology.withLocalStores(new ArrayList<>(stateStoresToChangelogTopics.keySet()), storeNamesToChangelogTopics), consumer, - new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")), + new StoreChangelogReader(consumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("stream-task-test ")), false, stateDirectory, config) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 93d6a0d931bb0..05d0e3d04eede 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -50,6 +50,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -122,7 +123,12 @@ private StreamsConfig createConfig(final File baseDir) throws IOException { private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test ")); + private final StoreChangelogReader changelogReader = new StoreChangelogReader( + restoreStateConsumer, + Duration.ZERO, + stateRestoreListener, + new LogContext("standby-task-test ") + ); private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); @@ -188,7 +194,7 @@ public void testUpdate() throws IOException { } restoreStateConsumer.seekToBeginning(partition); - task.update(partition2, restoreStateConsumer.poll(100).records(partition2)); + task.update(partition2, restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2)); StandbyContextImpl context = (StandbyContextImpl) task.context(); MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1); @@ -245,7 +251,7 @@ public void testUpdateKTable() throws IOException { } // The commit offset is at 0L. Records should not be processed - List> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition)); + List> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(Duration.ofMillis(100)).records(globalTopicPartition)); assertEquals(5, remaining.size()); committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java index 725211dd268dc..140f705619956 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -52,7 +53,7 @@ public void setUp() { partitionOffsets.put(topicOne, 20L); partitionOffsets.put(topicTwo, 30L); stateMaintainer = new StateMaintainerStub(partitionOffsets); - stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL); + stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, Duration.ofMillis(10L), FLUSH_INTERVAL); } @Test @@ -109,7 +110,7 @@ public void shouldNotFlushOffsetsWhenFlushIntervalHasNotLapsed() { @Test public void shouldNotFlushWhenFlushIntervalIsZero() { - stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, 10L, -1); + stateConsumer = new GlobalStreamThread.StateConsumer(logContext, consumer, stateMaintainer, time, Duration.ofMillis(10L), -1); stateConsumer.initialize(); time.sleep(100); stateConsumer.pollAndUpdate(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index aabe7ff631362..90abf32477f82 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -39,6 +39,7 @@ import org.junit.Test; import org.junit.runner.RunWith; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -71,7 +72,7 @@ public class StoreChangelogReaderTest { private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener(); private final TopicPartition topicPartition = new TopicPartition("topic", 0); private final LogContext logContext = new LogContext("test-reader "); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext); + private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext); @Before public void setUp() { @@ -89,7 +90,7 @@ public Map> listTopics() { } }; - final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener, logContext); + final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); changelogReader.restore(active); assertTrue(functionCalled.get()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 3a0fc4eb1cdd9..5537335b2212c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -56,6 +56,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Map; @@ -116,7 +117,7 @@ public void close() { private final MockProducer producer = new MockProducer<>(false, bytesSerializer, bytesSerializer); private final MockConsumer restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener(); - private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("stream-task-test ")) { + private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, Duration.ZERO, stateRestoreListener, new LogContext("stream-task-test ")) { @Override public Map restoredOffsets() { return Collections.singletonMap(changelogPartition, offset); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index c24122abd13d1..66ea3c42779fd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -49,6 +49,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -177,7 +178,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, Collections.singletonList(new TopicPartition(topicName, taskId.partition)), topology, clientSupplier.consumer, - new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(), new LogContext("test-stream-task ")), + new StoreChangelogReader(clientSupplier.restoreConsumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("test-stream-task ")), streamsConfig, new MockStreamsMetrics(metrics), stateDirectory, diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index e897088beca05..3c8446ca46652 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.ValueMapper; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.Locale; import java.util.Properties; @@ -153,7 +154,7 @@ private static void loopUntilRecordReceived(final String kafka, final boolean eo consumer.subscribe(Collections.singletonList(SINK_TOPIC)); while (true) { - final ConsumerRecords records = consumer.poll(100); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (final ConsumerRecord record : records) { if (record.key().equals("key") && record.value().equals("1")) { return; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index 752cdd696eddd..0b18864d4ab2f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -16,16 +16,18 @@ */ package org.apache.kafka.streams.tests; -import kafka.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; @@ -40,17 +42,18 @@ import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Random; -import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; public class EosTestDriver extends SmokeTestUtil { @@ -59,22 +62,19 @@ public class EosTestDriver extends SmokeTestUtil { private static boolean isRunning = true; - static int numRecordsProduced = 0; + private static int numRecordsProduced = 0; - static synchronized void updateNumRecordsProduces(final int delta) { + private static synchronized void updateNumRecordsProduces(final int delta) { numRecordsProduced += delta; } static void generate(final String kafka) { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.out.println("Terminating"); - System.out.flush(); - isRunning = false; - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("Terminating"); + System.out.flush(); + isRunning = false; + })); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest"); @@ -93,19 +93,16 @@ public void run() { final ProducerRecord record = new ProducerRecord<>("data", key, value); - producer.send(record, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - if (exception instanceof TimeoutException) { - try { - // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time - final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); - updateNumRecordsProduces(-expired); - } catch (Exception ignore) { } - } + producer.send(record, (metadata, exception) -> { + if (exception != null) { + exception.printStackTrace(System.err); + System.err.flush(); + if (exception instanceof TimeoutException) { + try { + // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time + final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); + updateNumRecordsProduces(-expired); + } catch (final Exception ignore) { } } } }); @@ -141,10 +138,6 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio } public static void verify(final String kafka, final boolean withRepartitioning) { - ensureStreamsApplicationDown(kafka); - - final Map committedOffsets = getCommittedOffsets(kafka, withRepartitioning); - final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); @@ -152,6 +145,13 @@ public static void verify(final String kafka, final boolean withRepartitioning) props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + final Map committedOffsets; + try (final AdminClient adminClient = KafkaAdminClient.create(props)) { + ensureStreamsApplicationDown(adminClient); + + committedOffsets = getCommittedOffsets(adminClient, withRepartitioning); + } + final String[] allInputTopics; final String[] allOutputTopics; if (withRepartitioning) { @@ -218,54 +218,42 @@ public static void verify(final String kafka, final boolean withRepartitioning) System.out.flush(); } - private static void ensureStreamsApplicationDown(final String kafka) { - AdminClient adminClient = null; - try { - adminClient = AdminClient.createSimplePlaintext(kafka); + private static void ensureStreamsApplicationDown(final AdminClient adminClient) { - final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) { - if (System.currentTimeMillis() > maxWaitTime) { - throw new RuntimeException("Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds."); - } - sleep(1000); - } - } finally { - if (adminClient != null) { - adminClient.close(); + final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; + ConsumerGroupDescription description; + do { + description = getConsumerGroupDescription(adminClient); + + if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) { + throw new RuntimeException( + "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds. " + + "Group: " + description + ); } - } + sleep(1000); + } while (!description.members().isEmpty()); } - private static Map getCommittedOffsets(final String kafka, + + private static Map getCommittedOffsets(final AdminClient adminClient, final boolean withRepartitioning) { - final Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + final Map topicPartitionOffsetAndMetadataMap; - final Map committedOffsets = new HashMap<>(); - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final Set topics = new HashSet<>(); - topics.add("data"); - if (withRepartitioning) { - topics.add("repartition"); - } - consumer.subscribe(topics); - consumer.poll(0); + try { + final ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID); + topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); + } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } - final Set partitions = new HashSet<>(); - for (final String topic : topics) { - for (final PartitionInfo partition : consumer.partitionsFor(topic)) { - partitions.add(new TopicPartition(partition.topic(), partition.partition())); - } - } + final Map committedOffsets = new HashMap<>(); - for (final TopicPartition tp : partitions) { - final long offset = consumer.position(tp); - committedOffsets.put(tp, offset); + for (final Map.Entry entry : topicPartitionOffsetAndMetadataMap.entrySet()) { + final String topic = entry.getKey().topic(); + if (topic.equals("data") || withRepartitioning && topic.equals("repartition")) { + committedOffsets.put(entry.getKey(), entry.getValue().offset()); } } @@ -284,7 +272,7 @@ private static Map receivedRecords = consumer.poll(100); + final ConsumerRecords receivedRecords = consumer.poll(Duration.ofMillis(100)); for (final ConsumerRecord record : receivedRecords) { maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; @@ -327,19 +315,12 @@ private static void addRecord(final ConsumerRecord record, final TopicPartition partition = new TopicPartition(topic, record.partition()); if (verifyTopic(topic, withRepartitioning)) { - Map>> topicRecordsPerPartition - = recordPerTopicPerPartition.get(topic); + final Map>> topicRecordsPerPartition = + recordPerTopicPerPartition.computeIfAbsent(topic, k -> new HashMap<>()); - if (topicRecordsPerPartition == null) { - topicRecordsPerPartition = new HashMap<>(); - recordPerTopicPerPartition.put(topic, topicRecordsPerPartition); - } + final List> records = + topicRecordsPerPartition.computeIfAbsent(partition, k -> new ArrayList<>()); - List> records = topicRecordsPerPartition.get(partition); - if (records == null) { - records = new ArrayList<>(); - topicRecordsPerPartition.put(partition, records); - } records.add(record); } else { throw new RuntimeException("FAIL: received data from unexpected topic: " + record); @@ -397,7 +378,7 @@ private static void verifyMin(final Map> inputRecords = partitionInput.iterator(); @@ -439,7 +420,7 @@ private static void verifySum(final Map> inputRecords = partitionInput.iterator(); @@ -480,7 +461,7 @@ private static void verifyMax(final Map> inputRecords = partitionInput.iterator(); @@ -501,7 +482,7 @@ private static void verifyMax(final Map but was <" + receivedKey + "," + receivedValue + ">"); } } @@ -521,7 +502,7 @@ private static void verifyCnt(final Map> inputRecords = partitionInput.iterator(); @@ -539,7 +520,7 @@ private static void verifyCnt(final Map but was <" + receivedKey + "," + receivedValue + ">"); } } @@ -574,14 +555,11 @@ private static void verifyAllTransactionFinished(final KafkaConsumer record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value"); - producer.send(record, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - Exit.exit(1); - } + producer.send(record, (metadata, exception) -> { + if (exception != null) { + exception.printStackTrace(System.err); + System.err.flush(); + Exit.exit(1); } }); } @@ -591,7 +569,7 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { - final ConsumerRecords records = consumer.poll(100); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) { System.out.println("No data received."); for (final TopicPartition tp : partitions) { @@ -638,4 +616,18 @@ private static List getAllPartitions(final KafkaConsumer c return partitions; } + + private static ConsumerGroupDescription getConsumerGroupDescription(final AdminClient adminClient) { + final ConsumerGroupDescription description; + try { + description = adminClient.describeConsumerGroups(Collections.singleton(EosTestClient.APP_ID)) + .describedGroups() + .get(EosTestClient.APP_ID) + .get(10, TimeUnit.SECONDS); + } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { + e.printStackTrace(); + throw new RuntimeException("Unexpected Exception getting group description", e); + } + return description; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 50330a08e612c..7533fdd085883 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -36,6 +36,7 @@ import org.apache.kafka.test.TestUtils; import java.io.File; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -289,7 +290,7 @@ public static void verify(String kafka, Map> allData, int m int retry = 0; final long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - ConsumerRecords records = consumer.poll(500); + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { if (verifyMin(min, allData, false) && verifyMax(max, allData, false) diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java index ad19f32fd1d74..33cf1fa34bcf1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -32,6 +32,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.Duration; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -74,7 +75,7 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(3, records.count()); } @@ -90,7 +91,7 @@ public void testResetToSpecificOffsetWhenBeforeBeginningOffset() { streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -106,7 +107,7 @@ public void testResetToSpecificOffsetWhenAfterEndOffset() { streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -122,7 +123,7 @@ public void testShiftOffsetByWhenBetweenBeginningAndEndOffset() { streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -138,7 +139,7 @@ public void testShiftOffsetByWhenBeforeBeginningOffset() { streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(5, records.count()); } @@ -154,7 +155,7 @@ public void testShiftOffsetByWhenAfterEndOffset() { streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -172,7 +173,7 @@ public void testResetUsingPlanWhenBetweenBeginningAndEndOffset() { topicPartitionsAndOffset.put(topicPartition, 3L); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -190,7 +191,7 @@ public void testResetUsingPlanWhenBeforeBeginningOffset() { topicPartitionsAndOffset.put(topicPartition, 1L); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -208,7 +209,7 @@ public void testResetUsingPlanWhenAfterEndOffset() { topicPartitionsAndOffset.put(topicPartition, 5L); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } @@ -226,7 +227,7 @@ public void shouldSeekToEndOffset() { intermediateTopicPartitions.add(topicPartition); streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions); - final ConsumerRecords records = consumer.poll(500); + final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); assertEquals(2, records.count()); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java index 3070e36482f21..00788fd2f98ce 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -85,9 +86,8 @@ public synchronized void assign(Collection partitions) { super.assign(partitions); } - @Deprecated @Override - public ConsumerRecords poll(long timeout) { + public ConsumerRecords poll(final Duration timeout) { // add buffered records to MockConsumer for (ConsumerRecord record : recordBuffer) { super.addRecord(record); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 773cbb4c32373..7f752652da40f 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -66,6 +66,7 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -327,6 +328,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store consumer, new StoreChangelogReader( createRestoreConsumer(processorTopology.storeToChangelogTopic()), + Duration.ZERO, stateRestoreListener, new LogContext("topology-test-driver ")), streamsConfig,