Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,9 @@ void runOnce() {
return;
}

initializeAndRestorePhase();
if (!stateUpdaterEnabled) {
initializeAndRestorePhase();
}

// TODO: we should record the restore latency and its relative time spent ratio after
// we figure out how to move this method out of the stream thread
Expand All @@ -792,6 +794,11 @@ void runOnce() {
* 6. Otherwise, increment N.
*/
do {

if (stateUpdaterEnabled) {
checkStateUpdater();
}

log.debug("Processing tasks with {} iterations.", numIterations);
final int processed = taskManager.process(numIterations, time);
final long processLatency = advanceNowAndComputeLatency();
Expand Down Expand Up @@ -880,36 +887,32 @@ void runOnce() {
private void initializeAndRestorePhase() {
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = partitions -> resetOffsets(partitions, null);
final State stateSnapshot = state;
if (stateUpdaterEnabled) {
checkStateUpdater();
} else {
// only try to initialize the assigned tasks
// if the state is still in PARTITION_ASSIGNED after the poll call
if (stateSnapshot == State.PARTITIONS_ASSIGNED
|| stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {
// only try to initialize the assigned tasks
// if the state is still in PARTITION_ASSIGNED after the poll call
if (stateSnapshot == State.PARTITIONS_ASSIGNED
|| stateSnapshot == State.RUNNING && taskManager.needsInitializationOrRestoration()) {

log.debug("State is {}; initializing tasks if necessary", stateSnapshot);
log.debug("State is {}; initializing tasks if necessary", stateSnapshot);

if (taskManager.tryToCompleteRestoration(now, offsetResetter)) {
log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs,
taskManager.allTasks().keySet());
setState(State.RUNNING);
}

if (log.isDebugEnabled()) {
log.debug("Initialization call done. State is {}", state);
}
if (taskManager.tryToCompleteRestoration(now, offsetResetter)) {
log.info("Restoration took {} ms for all tasks {}", time.milliseconds() - lastPartitionAssignedMs,
taskManager.allTasks().keySet());
setState(State.RUNNING);
}

if (log.isDebugEnabled()) {
log.debug("Idempotently invoking restoration logic in state {}", state);
log.debug("Initialization call done. State is {}", state);
}
// we can always let changelog reader try restoring in order to initialize the changelogs;
// if there's no active restoring or standby updating it would not try to fetch any data
// After KAFKA-13873, we only restore the not paused tasks.
changelogReader.restore(taskManager.notPausedTasks());
log.debug("Idempotent restore call done. Thread state has not changed.");
}

if (log.isDebugEnabled()) {
log.debug("Idempotently invoking restoration logic in state {}", state);
}
// we can always let changelog reader try restoring in order to initialize the changelogs;
// if there's no active restoring or standby updating it would not try to fetch any data
// After KAFKA-13873, we only restore the not paused tasks.
changelogReader.restore(taskManager.notPausedTasks());
log.debug("Idempotent restore call done. Thread state has not changed.");
}

private void checkStateUpdater() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -3005,6 +3006,21 @@ public void shouldCheckStateUpdater() {
Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any());
}

@Test
public void shouldCheckStateUpdaterInBetweenProcessCalls() {
final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig();
streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, true);
final StreamThread streamThread = setUpThread(streamsConfigProps);
final TaskManager taskManager = streamThread.taskManager();
streamThread.setState(State.STARTING);
// non-zero return of process will cause a second call to process
when(taskManager.process(Mockito.anyInt(), Mockito.any())).thenReturn(1).thenReturn(0);

streamThread.runOnce();

Mockito.verify(taskManager, times(2)).checkStateUpdater(Mockito.anyLong(), Mockito.any());
}

@Test
public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig();
Expand Down