Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,14 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {

// if everything was OK and we consumed complete snapshot then move to next snapshot
if (shouldContinueReading) {
Snapshot nextValid = nextValidSnapshot(curSnapshot);
if (nextValid == null) {
// nextValid implies all the remaining snapshots should be skipped.
break;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original, there was a

          shouldContinueReading = false;

before the break, but that was removed as unnecessary by afda8be.

}
// we found the next available snapshot, continue from there.
curSnapshot = nextValid;
startPosOfSnapOffset = -1;
curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId());
// if anyhow we are moving to next snapshot we should only scan addedFiles
scanAllFiles = false;
}
Expand All @@ -448,6 +454,27 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset;
}

/**
* Get the next snapshot skiping over rewrite and delete snapshots.
*
* @param curSnapshot the current snapshot
* @return the next valid snapshot (not a rewrite or delete snapshot), returns null if all
* remaining snapshots should be skipped.
*/
private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId());
// skip over rewrite and delete snapshots
while (!shouldProcess(nextSnapshot)) {
LOG.debug("Skipping snapshot: {} of table {}", nextSnapshot.snapshotId(), table.name());
// if the currentSnapShot was also the mostRecentSnapshot then break
if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
return null;
}
nextSnapshot = SnapshotUtil.snapshotAfter(table, nextSnapshot.snapshotId());
}
return nextSnapshot;
}

private long addedFilesCount(Snapshot snapshot) {
long addedFilesCount =
PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -121,7 +123,8 @@ public void setupTable() {
"CREATE TABLE %s "
+ "(id INT, data STRING) "
+ "USING iceberg "
+ "PARTITIONED BY (bucket(3, id))",
+ "PARTITIONED BY (bucket(3, id)) "
+ "TBLPROPERTIES ('commit.manifest.min-count-to-merge'='3', 'commit.manifest-merge.enabled'='true')",
tableName);
this.table = validationCatalog.loadTable(tableIdent);
microBatches.set(0);
Expand Down Expand Up @@ -531,6 +534,86 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception
.hasMessageStartingWith("Cannot process overwrite snapshot");
}

@TestTemplate
public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace() throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();

assertThat(
microBatchCount(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
.isEqualTo(6);
}

@TestTemplate
public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxRows()
throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();

assertThat(
microBatchCount(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
.isEqualTo(2);
}

@TestTemplate
public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxFilesAndRows()
throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();

assertThat(
microBatchCount(
ImmutableMap.of(
SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4",
SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1")))
.isEqualTo(6);
}

@TestTemplate
public void testReadStreamWithSnapshotType2RewriteDataFilesIgnoresReplace() throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();
makeRewriteDataFiles();

assertThat(
microBatchCount(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
.isEqualTo(6);
}

@TestTemplate
public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceFollowedByAppend()
throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);

makeRewriteDataFiles();

appendDataAsMultipleSnapshots(expected);

assertThat(
microBatchCount(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
.isEqualTo(12);
}

@TestTemplate
public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
// fill table with some data
Expand Down Expand Up @@ -623,6 +706,29 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

/**
* We are testing that all the files in a rewrite snapshot are skipped Create a rewrite data files
* snapshot using existing files.
*/
public void makeRewriteDataFiles() {
table.refresh();

// we are testing that all the files in a rewrite snapshot are skipped
// create a rewrite data files snapshot using existing files
RewriteFiles rewrite = table.newRewrite();
Iterable<Snapshot> it = table.snapshots();
for (Snapshot snapshot : it) {
if (snapshot.operation().equals(DataOperations.APPEND)) {
Iterable<DataFile> datafiles = snapshot.addedDataFiles(table.io());
for (DataFile datafile : datafiles) {
rewrite.addFile(datafile);
rewrite.deleteFile(datafile);
}
}
}
rewrite.commit();
}

/**
* appends each list as a Snapshot on the iceberg table at the given location. accepts a list of
* lists - each list representing data per snapshot.
Expand Down