Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10198: guard against recycling dirty state #8924

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
final ProcessorStateManager stateManager = standbyTask.stateMgr;
final LogContext logContext = getLogContext(standbyTask.id);

standbyTask.closeAndRecycleState();
standbyTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.ACTIVE, logContext);

return createActiveTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void closeDirty() {
}

@Override
public void closeAndRecycleState() {
public void closeCleanAndRecycleState() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
if (state() == State.SUSPENDED) {
stateMgr.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ StandbyTask createStandbyTaskFromActive(final StreamTask streamTask,
final InternalProcessorContext context = streamTask.processorContext();
final ProcessorStateManager stateManager = streamTask.stateMgr;

streamTask.closeAndRecycleState();
streamTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.STANDBY, getLogContext(streamTask.id()));

return createStandbyTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ private Map<TopicPartition, Long> extractPartitionTimes() {

@Override
public void closeClean() {
validateClean();
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
close(true);
log.info("Closed clean");
Expand All @@ -482,7 +483,8 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
}

@Override
public void closeAndRecycleState() {
public void closeCleanAndRecycleState() {
validateClean();
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
switch (state()) {
case SUSPENDED:
Expand Down Expand Up @@ -515,17 +517,20 @@ private void writeCheckpoint() {
stateMgr.checkpoint(checkpointableOffsets());
}

/**
* You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
*/
private void close(final boolean clean) {
if (clean && commitNeeded) {
// It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
// closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
private void validateClean() {
// It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
// closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
if (commitNeeded) {
log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
+ " commit and should close as dirty instead");
throw new TaskMigratedException("Tried to close dirty task as clean");
}
}

/**
* You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
*/
private void close(final boolean clean) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This diff turned out a bit awkward, basically I just factored this check out into a separate method that we should call at the beginning of both flavors of clean close

switch (state()) {
case SUSPENDED:
// first close state manager (which is idempotent) then close the record collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ enum TaskType {
/**
* Attempt a clean close but do not close the underlying state
*/
void closeAndRecycleState();
void closeCleanAndRecycleState();

/**
* Revive a closed task to a created one; should never throw an exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,13 +502,13 @@ public void shouldRecycleTask() {
EasyMock.replay(stateManager);

task = createStandbyTask();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED

task.initializeIfNeeded();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING

task.suspend();
task.closeAndRecycleState(); // SUSPENDED
task.closeCleanAndRecycleState(); // SUSPENDED

// Currently, there are no metrics registered for standby tasks.
// This is a regression test so that, if we add some, we will be sure to deregister them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -1752,7 +1753,7 @@ public void shouldUnregisterMetricsInCloseDirty() {
}

@Test
public void shouldUnregisterMetricsInCloseAndRecycle() {
public void shouldUnregisterMetricsInCloseCleanAndRecycleState() {
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
Expand All @@ -1761,7 +1762,7 @@ public void shouldUnregisterMetricsInCloseAndRecycle() {

task.suspend();
assertThat(getTaskMetrics(), not(empty()));
task.closeAndRecycleState();
task.closeCleanAndRecycleState();
assertThat(getTaskMetrics(), empty());
}

Expand Down Expand Up @@ -1798,23 +1799,49 @@ public void shouldUpdatePartitions() {
assertThat(task.inputPartitions(), equalTo(newPartitions));
}

@Test
public void shouldThrowIfCleanClosingDirtyTask() {
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();

task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
task.process(0L);
assertTrue(task.commitNeeded());

assertThrows(TaskMigratedException.class, () -> task.closeClean());
}

@Test
public void shouldThrowIfRecyclingDirtyTask() {
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();

task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
task.process(0L);
assertTrue(task.commitNeeded());

assertThrows(TaskMigratedException.class, () -> task.closeCleanAndRecycleState());
}

@Test
public void shouldOnlyRecycleSuspendedTasks() {
stateManager.recycle();
recordCollector.closeClean();
EasyMock.replay(stateManager, recordCollector);

task = createStatefulTask(createConfig(false, "100"), true);
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED

task.initializeIfNeeded();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RESTORING
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING

task.completeRestoration();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING

task.suspend();
task.closeAndRecycleState(); // SUSPENDED
task.closeCleanAndRecycleState(); // SUSPENDED

EasyMock.verify(stateManager, recordCollector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2723,7 +2723,7 @@ public void closeDirty() {
}

@Override
public void closeAndRecycleState() {
public void closeCleanAndRecycleState() {
transitionTo(State.CLOSED);
}

Expand Down