Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;

Expand All @@ -50,6 +49,7 @@ public class StoreChangelogReader implements ChangelogReader {
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
private Map<TopicPartition, Long> updatedEndOffsets = new HashMap<>();

public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
final StateRestoreListener userStateRestoreListener,
Expand All @@ -66,24 +66,32 @@ public void register(final StateRestorer restorer) {
needsInitializing.put(restorer.partition(), restorer);
}

/**
* @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
*/
public Collection<TopicPartition> restore(final RestoringTasks active) {
if (!needsInitializing.isEmpty()) {
initialize();
final Set<TopicPartition> remainingPartitions = new HashSet<>(needsRestoring.keySet());
remainingPartitions.removeAll(updatedEndOffsets.keySet());
updatedEndOffsets.putAll(restoreConsumer.endOffsets(remainingPartitions));
}

if (needsRestoring.isEmpty()) {
restoreConsumer.unsubscribe();
return completed();
}

final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
try {
final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
for (final TopicPartition partition : restoringPartitions) {
restorePartition(allRecords, partition, active.restoringTaskFor(partition));
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator();
while (iterator.hasNext()) {
final TopicPartition partition = iterator.next();
final StateRestorer restorer = stateRestorers.get(partition);
final long pos = processNext(records.records(partition), restorer, updatedEndOffsets.get(partition));
restorer.setRestoredOffset(pos);
if (restorer.hasCompleted(pos, updatedEndOffsets.get(partition))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also remove the entry in updatedEndOffsets if this is true.

restorer.restoreDone();
updatedEndOffsets.remove(partition);
iterator.remove();
}
}
} catch (final InvalidOffsetException recoverableException) {
log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", recoverableException);
Expand Down Expand Up @@ -240,41 +248,6 @@ public void reset() {
needsInitializing.clear();
}

/**
* @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
*/
private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
final TopicPartition topicPartition,
final Task task) {
final StateRestorer restorer = stateRestorers.get(topicPartition);
final Long endOffset = endOffsets.get(topicPartition);
final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
restorer.setRestoredOffset(pos);
if (restorer.hasCompleted(pos, endOffset)) {
if (pos > endOffset) {
throw new TaskMigratedException(task, topicPartition, endOffset, pos);
}

// need to check for changelog topic
if (restorer.offsetLimit() == Long.MAX_VALUE) {
final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
if (!restorer.hasCompleted(pos, updatedEndOffset)) {
throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos);
}
}


log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
topicPartition,
restorer.restoredNumRecords(),
restorer.startingOffset(),
restorer.restoredOffset());

restorer.restoreDone();
needsRestoring.remove(topicPartition);
}
}

private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
final StateRestorer restorer,
final Long endOffset) {
Expand Down Expand Up @@ -326,7 +299,6 @@ private boolean hasPartition(final TopicPartition topicPartition) {
return true;
}
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ void setConsumer(final Consumer<byte[], byte[]> consumer) {
/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
* @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only)
*/
boolean updateNewAndRestoringTasks() {
active.initializeNewTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.MockStateRestoreListener;
Expand Down Expand Up @@ -377,46 +376,6 @@ public void shouldRestorePartitionsRegisteredPostInitialization() {
assertThat(callbackTwo.restored.size(), equalTo(3));
}

@Test
public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic() {
final int messages = 10;
setupConsumer(messages, topicPartition);
consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 5L));
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));

expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);

try {
changelogReader.restore(active);
fail("Should have thrown TaskMigratedException");
} catch (final TaskMigratedException expected) {
/* ignore */
}
}


@Test
public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() {
final int messages = 10;
setupConsumer(messages, topicPartition);
// in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic
// so a subsequent call to endOffsets returns a value exceeding the expected end value
consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));

expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);

try {
changelogReader.restore(active);
fail("Should have thrown TaskMigratedException");
} catch (final TaskMigratedException expected) {
// verifies second block threw exception with updated end offset
assertTrue(expected.getMessage().contains("end offset 15, current offset 10"));
}
}


@Test
public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
Expand All @@ -434,30 +393,6 @@ public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestore
}


@Test
public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() {
final int totalMessages = 10;
assignPartition(totalMessages, topicPartition);
// records 0..4
addRecords(5, topicPartition, 0);
//EOS enabled commit marker at offset 5 so rest of records 6..10
addRecords(5, topicPartition, 6);
consumer.assign(Collections.<TopicPartition>emptyList());

// end offsets should start after commit marker of 5 from above
consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 6L));
changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));

expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
try {
changelogReader.restore(active);
fail("Should have thrown task migrated exception");
} catch (final TaskMigratedException expected) {
/* ignore */
}
}

@Test
public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
final int totalMessages = 10;
Expand Down