diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 4019fedcbbfa..3ffd9904bbf3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -392,8 +392,15 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { // if everything was OK and we consumed complete snapshot then move to next snapshot if (shouldContinueReading) { + Snapshot nextValid = nextValidSnapshot(curSnapshot); + if (nextValid == null) { + // nextValide implies all the remaining snapshots should be skipped. + shouldContinueReading = false; + break; + } + // we found the next available snapshot, continue from there. + curSnapshot = nextValid; startPosOfSnapOffset = -1; - curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId()); // if anyhow we are moving to next snapshot we should only scan addedFiles scanAllFiles = false; } @@ -406,6 +413,27 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset; } + /** + * Get the next snapshot skiping over rewrite and delete snapshots. + * + * @param curSnapshot the current snapshot + * @return the next valid snapshot (not a rewrite or delete snapshot), returns null if all + * remaining snapshots should be skipped. + */ + private Snapshot nextValidSnapshot(Snapshot curSnapshot) { + Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId()); + // skip over rewrite and delete snapshots + while (!shouldProcess(nextSnapshot)) { + LOG.debug("Skipping snapshot: {} of table {}", nextSnapshot.snapshotId(), table.name()); + // if the currentSnapShot was also the mostRecentSnapshot then break + if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) { + return null; + } + nextSnapshot = SnapshotUtil.snapshotAfter(table, nextSnapshot.snapshotId()); + } + return nextSnapshot; + } + private long addedFilesCount(Snapshot snapshot) { long addedFilesCount = PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 7fe5951ef1be..22e7df0f4e17 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -31,11 +31,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DataOperations; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Files; import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -118,7 +121,8 @@ public void setupTable() { "CREATE TABLE %s " + "(id INT, data STRING) " + "USING iceberg " - + "PARTITIONED BY (bucket(3, id))", + + "PARTITIONED BY (bucket(3, id)) " + + "TBLPROPERTIES ('commit.manifest.min-count-to-merge'='3', 'commit.manifest-merge.enabled'='true')", tableName); this.table = validationCatalog.loadTable(tableIdent); microBatches.set(0); @@ -494,6 +498,86 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception .hasMessageStartingWith("Cannot process overwrite snapshot"); } + @TestTemplate + public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplace() throws Exception { + // fill table with some data + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected); + + makeRewriteDataFiles(); + + assertThat( + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"))) + .isEqualTo(6); + } + + @TestTemplate + public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxRows() + throws Exception { + // fill table with some data + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected); + + makeRewriteDataFiles(); + + assertThat( + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4"))) + .isEqualTo(2); + } + + @TestTemplate + public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceMaxFilesAndRows() + throws Exception { + // fill table with some data + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected); + + makeRewriteDataFiles(); + + assertThat( + microBatchCount( + ImmutableMap.of( + SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, + "4", + SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, + "1"))) + .isEqualTo(6); + } + + @TestTemplate + public void testReadStreamWithSnapshotType2RewriteDataFilesIgnoresReplace() throws Exception { + // fill table with some data + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected); + + makeRewriteDataFiles(); + makeRewriteDataFiles(); + + assertThat( + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"))) + .isEqualTo(6); + } + + @TestTemplate + public void testReadStreamWithSnapshotTypeRewriteDataFilesIgnoresReplaceFollowedByAppend() + throws Exception { + // fill table with some data + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected); + + makeRewriteDataFiles(); + + appendDataAsMultipleSnapshots(expected); + + assertThat( + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"))) + .isEqualTo(12); + } + @TestTemplate public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception { // fill table with some data @@ -574,6 +658,29 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); } + /** + * We are testing that all the files in a rewrite snapshot are skipped Create a rewrite data files + * snapshot using existing files. + */ + public void makeRewriteDataFiles() { + table.refresh(); + + // we are testing that all the files in a rewrite snapshot are skipped + // create a rewrite data files snapshot using existing files + RewriteFiles rewrite = table.newRewrite(); + Iterable it = table.snapshots(); + for (Snapshot snapshot : it) { + if (snapshot.operation().equals(DataOperations.APPEND)) { + Iterable datafiles = snapshot.addedDataFiles(table.io()); + for (DataFile datafile : datafiles) { + rewrite.addFile(datafile); + rewrite.deleteFile(datafile); + } + } + } + rewrite.commit(); + } + /** * appends each list as a Snapshot on the iceberg table at the given location. accepts a list of * lists - each list representing data per snapshot.