diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 5346b5267c1d..1ed3b64f0297 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -200,6 +200,15 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); } + Snapshot snapshot = table.snapshot(currentOffset.snapshotId()); + + if (snapshot == null) { + throw new IllegalStateException( + String.format( + "Cannot load current offset at snapshot %d, the snapshot was expired or removed", + currentOffset.snapshotId())); + } + if (!shouldProcess(table.snapshot(currentOffset.snapshotId()))) { LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name()); continue; diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 23fdfb09cb83..dd456f22371e 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -19,8 +19,10 @@ package org.apache.iceberg.spark.source; import static org.apache.iceberg.expressions.Expressions.ref; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import java.io.File; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -323,6 +325,46 @@ public void testResumingStreamReadFromCheckpoint() throws Exception { } } + @Test + public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException { + File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder"); + File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint"); + File output = temp.newFolder(); + + DataStreamWriter querySource = + spark + .readStream() + .format("iceberg") + .load(tableName) + .writeStream() + .option("checkpointLocation", writerCheckpoint.toString()) + .format("parquet") + .queryName("checkpoint_test") + .option("path", output.getPath()); + + List firstSnapshotRecordList = Lists.newArrayList(new SimpleRecord(1, "one")); + List secondSnapshotRecordList = Lists.newArrayList(new SimpleRecord(2, "two")); + StreamingQuery startQuery = querySource.start(); + + appendData(firstSnapshotRecordList); + table.refresh(); + long firstSnapshotid = table.currentSnapshot().snapshotId(); + startQuery.processAllAvailable(); + startQuery.stop(); + + appendData(secondSnapshotRecordList); + + table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit(); + + StreamingQuery restartedQuery = querySource.start(); + assertThatThrownBy(restartedQuery::processAllAvailable) + .hasCauseInstanceOf(IllegalStateException.class) + .hasMessageContaining( + String.format( + "Cannot load current offset at snapshot %d, the snapshot was expired or removed", + firstSnapshotid)); + } + @Test public void testParquetOrcAvroDataInOneTable() throws Exception { List parquetFileRecords = diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 972988b6b2c2..816e3d2bf8e5 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -208,6 +208,15 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); } + Snapshot snapshot = table.snapshot(currentOffset.snapshotId()); + + if (snapshot == null) { + throw new IllegalStateException( + String.format( + "Cannot load current offset at snapshot %d, the snapshot was expired or removed", + currentOffset.snapshotId())); + } + if (!shouldProcess(table.snapshot(currentOffset.snapshotId()))) { LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name()); continue; diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 23fdfb09cb83..dd456f22371e 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -19,8 +19,10 @@ package org.apache.iceberg.spark.source; import static org.apache.iceberg.expressions.Expressions.ref; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import java.io.File; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -323,6 +325,46 @@ public void testResumingStreamReadFromCheckpoint() throws Exception { } } + @Test + public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException { + File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder"); + File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint"); + File output = temp.newFolder(); + + DataStreamWriter querySource = + spark + .readStream() + .format("iceberg") + .load(tableName) + .writeStream() + .option("checkpointLocation", writerCheckpoint.toString()) + .format("parquet") + .queryName("checkpoint_test") + .option("path", output.getPath()); + + List firstSnapshotRecordList = Lists.newArrayList(new SimpleRecord(1, "one")); + List secondSnapshotRecordList = Lists.newArrayList(new SimpleRecord(2, "two")); + StreamingQuery startQuery = querySource.start(); + + appendData(firstSnapshotRecordList); + table.refresh(); + long firstSnapshotid = table.currentSnapshot().snapshotId(); + startQuery.processAllAvailable(); + startQuery.stop(); + + appendData(secondSnapshotRecordList); + + table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit(); + + StreamingQuery restartedQuery = querySource.start(); + assertThatThrownBy(restartedQuery::processAllAvailable) + .hasCauseInstanceOf(IllegalStateException.class) + .hasMessageContaining( + String.format( + "Cannot load current offset at snapshot %d, the snapshot was expired or removed", + firstSnapshotid)); + } + @Test public void testParquetOrcAvroDataInOneTable() throws Exception { List parquetFileRecords =