Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,15 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {

// if everything was OK and we consumed complete snapshot then move to next snapshot
if (shouldContinueReading) {
Snapshot nextValid = nextValidSnapshot(curSnapshot);
if (nextValid == null) {
// nextValide implies all the remaining snapshots should be skipped.
shouldContinueReading = false;
break;
}
// we found the next available snapshot, continue from there.
curSnapshot = nextValid;
startPosOfSnapOffset = -1;
curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId());
// if anyhow we are moving to next snapshot we should only scan addedFiles
scanAllFiles = false;
}
Expand All @@ -406,6 +413,27 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset;
}

/**
* Get the next snapshot skiping over rewrite and delete snapshots.
*
* @param curSnapshot the current snapshot
* @return the next valid snapshot (not a rewrite or delete snapshot), returns null if all
* remaining snapshots should be skipped.
*/
private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId());
// skip over rewrite and delete snapshots
while (!shouldProcess(nextSnapshot)) {
LOG.debug("Skipping snapshot: {} of table {}", nextSnapshot.snapshotId(), table.name());
// if the currentSnapShot was also the mostRecentSnapshot then break
if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
return null;
}
nextSnapshot = SnapshotUtil.snapshotAfter(table, nextSnapshot.snapshotId());
}
return nextSnapshot;
}

private long addedFilesCount(Snapshot snapshot) {
long addedFilesCount =
PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -118,7 +121,8 @@ public void setupTable() {
"CREATE TABLE %s "
+ "(id INT, data STRING) "
+ "USING iceberg "
+ "PARTITIONED BY (bucket(3, id))",
+ "PARTITIONED BY (bucket(3, id)) "
+ "TBLPROPERTIES ('commit.manifest.min-count-to-merge'='3', 'commit.manifest-merge.enabled'='true')",
tableName);
this.table = validationCatalog.loadTable(tableIdent);
microBatches.set(0);
Expand Down Expand Up @@ -494,6 +498,86 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception
.hasMessageStartingWith("Cannot process overwrite snapshot");
}

@TestTemplate
public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace() throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();

assertThat(
microBatchCount(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
.isEqualTo(6);
}

@TestTemplate
public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxRows()
throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();

assertThat(
microBatchCount(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
.isEqualTo(2);
}

@TestTemplate
public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxFilesAndRows()
throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();

assertThat(
microBatchCount(
ImmutableMap.of(
SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4",
SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1")))
.isEqualTo(6);
}

@TestTemplate
public void testReadStreamWithSnapshotType2RewriteDataFilesIgnoresReplace() throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();
makeRewriteDataFiles();

assertThat(
microBatchCount(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
.isEqualTo(6);
}

@TestTemplate
public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceFollowedByAppend()
throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();

appendDataAsMultipleSnapshots(expected);

assertThat(
microBatchCount(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
.isEqualTo(12);
}

@TestTemplate
public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
// fill table with some data
Expand Down Expand Up @@ -574,6 +658,29 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

/**
* We are testing that all the files in a rewrite snapshot are skipped Create a rewrite data files
* snapshot using existing files.
*/
public void makeRewriteDataFiles() {
table.refresh();

// we are testing that all the files in a rewrite snapshot are skipped
// create a rewrite data files snapshot using existing files
RewriteFiles rewrite = table.newRewrite();
Iterable<Snapshot> it = table.snapshots();
for (Snapshot snapshot : it) {
if (snapshot.operation().equals(DataOperations.APPEND)) {
Iterable<DataFile> datafiles = snapshot.addedDataFiles(table.io());
for (DataFile datafile : datafiles) {
rewrite.addFile(datafile);
rewrite.deleteFile(datafile);
}
}
}
rewrite.commit();
}

/**
* appends each list as a Snapshot on the iceberg table at the given location. accepts a list of
* lists - each list representing data per snapshot.
Expand Down