Skip to content

Commit

Permalink
KAFKA-10198: guard against recycling dirty state (#8924)
Browse files Browse the repository at this point in the history
We just needed to add the check in StreamTask#closeClean to closeAndRecycleState as well. I also renamed closeAndRecycleState to closeCleanAndRecycleState to drive this point home: it needs to be clean.

This should be cherry-picked back to the 2.6 branch

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>,
  • Loading branch information
ableegoldman authored and guozhangwang committed Jun 25, 2020
1 parent d609aef commit 2c06320
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
final ProcessorStateManager stateManager = standbyTask.stateMgr;
final LogContext logContext = getLogContext(standbyTask.id);

standbyTask.closeAndRecycleState();
standbyTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.ACTIVE, logContext);

return createActiveTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void closeDirty() {
}

@Override
public void closeAndRecycleState() {
public void closeCleanAndRecycleState() {
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
if (state() == State.SUSPENDED) {
stateMgr.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ StandbyTask createStandbyTaskFromActive(final StreamTask streamTask,
final InternalProcessorContext context = streamTask.processorContext();
final ProcessorStateManager stateManager = streamTask.stateMgr;

streamTask.closeAndRecycleState();
streamTask.closeCleanAndRecycleState();
stateManager.transitionTaskType(TaskType.STANDBY, getLogContext(streamTask.id()));

return createStandbyTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ private Map<TopicPartition, Long> extractPartitionTimes() {

@Override
public void closeClean() {
validateClean();
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
close(true);
log.info("Closed clean");
Expand All @@ -482,7 +483,8 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String,
}

@Override
public void closeAndRecycleState() {
public void closeCleanAndRecycleState() {
validateClean();
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString());
switch (state()) {
case SUSPENDED:
Expand Down Expand Up @@ -515,17 +517,20 @@ private void writeCheckpoint() {
stateMgr.checkpoint(checkpointableOffsets());
}

/**
* You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
*/
private void close(final boolean clean) {
if (clean && commitNeeded) {
// It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
// closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
private void validateClean() {
// It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
// closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
if (commitNeeded) {
log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
+ " commit and should close as dirty instead");
throw new TaskMigratedException("Tried to close dirty task as clean");
}
}

/**
* You must commit a task and checkpoint the state manager before closing as this will release the state dir lock
*/
private void close(final boolean clean) {
switch (state()) {
case SUSPENDED:
// first close state manager (which is idempotent) then close the record collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ enum TaskType {
/**
* Attempt a clean close but do not close the underlying state
*/
void closeAndRecycleState();
void closeCleanAndRecycleState();

/**
* Revive a closed task to a created one; should never throw an exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,13 +502,13 @@ public void shouldRecycleTask() {
EasyMock.replay(stateManager);

task = createStandbyTask();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED

task.initializeIfNeeded();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING

task.suspend();
task.closeAndRecycleState(); // SUSPENDED
task.closeCleanAndRecycleState(); // SUSPENDED

// Currently, there are no metrics registered for standby tasks.
// This is a regression test so that, if we add some, we will be sure to deregister them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -1752,7 +1753,7 @@ public void shouldUnregisterMetricsInCloseDirty() {
}

@Test
public void shouldUnregisterMetricsInCloseAndRecycle() {
public void shouldUnregisterMetricsInCloseCleanAndRecycleState() {
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
Expand All @@ -1761,7 +1762,7 @@ public void shouldUnregisterMetricsInCloseAndRecycle() {

task.suspend();
assertThat(getTaskMetrics(), not(empty()));
task.closeAndRecycleState();
task.closeCleanAndRecycleState();
assertThat(getTaskMetrics(), empty());
}

Expand Down Expand Up @@ -1798,23 +1799,49 @@ public void shouldUpdatePartitions() {
assertThat(task.inputPartitions(), equalTo(newPartitions));
}

@Test
public void shouldThrowIfCleanClosingDirtyTask() {
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();

task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
task.process(0L);
assertTrue(task.commitNeeded());

assertThrows(TaskMigratedException.class, () -> task.closeClean());
}

@Test
public void shouldThrowIfRecyclingDirtyTask() {
task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST);
task.initializeIfNeeded();
task.completeRestoration();

task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0)));
task.process(0L);
assertTrue(task.commitNeeded());

assertThrows(TaskMigratedException.class, () -> task.closeCleanAndRecycleState());
}

@Test
public void shouldOnlyRecycleSuspendedTasks() {
stateManager.recycle();
recordCollector.closeClean();
EasyMock.replay(stateManager, recordCollector);

task = createStatefulTask(createConfig(false, "100"), true);
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // CREATED
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // CREATED

task.initializeIfNeeded();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RESTORING
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RESTORING

task.completeRestoration();
assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING
assertThrows(IllegalStateException.class, () -> task.closeCleanAndRecycleState()); // RUNNING

task.suspend();
task.closeAndRecycleState(); // SUSPENDED
task.closeCleanAndRecycleState(); // SUSPENDED

EasyMock.verify(stateManager, recordCollector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2723,7 +2723,7 @@ public void closeDirty() {
}

@Override
public void closeAndRecycleState() {
public void closeCleanAndRecycleState() {
transitionTo(State.CLOSED);
}

Expand Down

0 comments on commit 2c06320

Please sign in to comment.