diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 1ea75d17dc09f..2d433eb3c8274 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -770,7 +770,9 @@ void runOnce() { return; } - initializeAndRestorePhase(); + if (!stateUpdaterEnabled) { + initializeAndRestorePhase(); + } // TODO: we should record the restore latency and its relative time spent ratio after // we figure out how to move this method out of the stream thread @@ -792,6 +794,11 @@ void runOnce() { * 6. Otherwise, increment N. */ do { + + if (stateUpdaterEnabled) { + checkStateUpdater(); + } + log.debug("Processing tasks with {} iterations.", numIterations); final int processed = taskManager.process(numIterations, time); final long processLatency = advanceNowAndComputeLatency(); @@ -880,36 +887,32 @@ void runOnce() { private void initializeAndRestorePhase() { final java.util.function.Consumer> offsetResetter = partitions -> resetOffsets(partitions, null); final State stateSnapshot = state; - if (stateUpdaterEnabled) { - checkStateUpdater(); - } else { - // only try to initialize the assigned tasks - // if the state is still in PARTITION_ASSIGNED after the poll call - if (stateSnapshot == State.PARTITIONS_ASSIGNED - || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { + // only try to initialize the assigned tasks + // if the state is still in PARTITION_ASSIGNED after the poll call + if (stateSnapshot == State.PARTITIONS_ASSIGNED + || stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) { - log.debug("State is {}; initializing tasks if necessary", stateSnapshot); + log.debug("State is {}; initializing tasks if necessary", stateSnapshot); - if (taskManager.tryToCompleteRestoration(now, offsetResetter)) { - log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs, - taskManager.allTasks().keySet()); - setState(State.RUNNING); - } - - if (log.isDebugEnabled()) { - log.debug("Initialization call done. State is {}", state); - } + if (taskManager.tryToCompleteRestoration(now, offsetResetter)) { + log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs, + taskManager.allTasks().keySet()); + setState(State.RUNNING); } if (log.isDebugEnabled()) { - log.debug("Idempotently invoking restoration logic in state {}", state); + log.debug("Initialization call done. State is {}", state); } - // we can always let changelog reader try restoring in order to initialize the changelogs; - // if there's no active restoring or standby updating it would not try to fetch any data - // After KAFKA-13873, we only restore the not paused tasks. - changelogReader.restore(taskManager.notPausedTasks()); - log.debug("Idempotent restore call done. Thread state has not changed."); } + + if (log.isDebugEnabled()) { + log.debug("Idempotently invoking restoration logic in state {}", state); + } + // we can always let changelog reader try restoring in order to initialize the changelogs; + // if there's no active restoring or standby updating it would not try to fetch any data + // After KAFKA-13873, we only restore the not paused tasks. + changelogReader.restore(taskManager.notPausedTasks()); + log.debug("Idempotent restore call done. Thread state has not changed."); } private void checkStateUpdater() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 7ecac4d7ecf87..76574cccb7057 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -149,6 +149,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -3005,6 +3006,21 @@ public void shouldCheckStateUpdater() { Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any()); } + @Test + public void shouldCheckStateUpdaterInBetweenProcessCalls() { + final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig(); + streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, true); + final StreamThread streamThread = setUpThread(streamsConfigProps); + final TaskManager taskManager = streamThread.taskManager(); + streamThread.setState(State.STARTING); + // non-zero return of process will cause a second call to process + when(taskManager.process(Mockito.anyInt(), Mockito.any())).thenReturn(1).thenReturn(0); + + streamThread.runOnce(); + + Mockito.verify(taskManager, times(2)).checkStateUpdater(Mockito.anyLong(), Mockito.any()); + } + @Test public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() { final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig();