diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java index a60e0f5d9316..21439163848d 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.spark.extensions; +import static org.apache.iceberg.types.Types.NestedField.optional; + import java.io.IOException; import java.util.Comparator; import java.util.List; @@ -42,10 +44,13 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.catalyst.util.DateTimeUtils; +import org.apache.spark.sql.types.StructType; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -488,6 +493,70 @@ public void testMetadataLogEntries() throws Exception { metadataLogWithProjection); } + @Test + public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception { + // Create table and insert data + sql( + "CREATE TABLE %s (id bigint, data string) " + + "USING iceberg " + + "PARTITIONED BY (data) " + + "TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read')", + tableName); + + List recordsA = + Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a")); + spark + .createDataset(recordsA, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = + Lists.newArrayList(RowFactory.create(3, "b", "c"), RowFactory.create(4, "b", "c")); + + StructType newSparkSchema = + SparkSchemaUtil.convert( + new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "category", Types.StringType.get()))); + + spark.createDataFrame(newRecords, newSparkSchema).coalesce(1).writeTo(tableName).append(); + + Long currentSnapshotId = table.currentSnapshot().snapshotId(); + + Dataset actualFilesDs = + spark.sql( + "SELECT * FROM " + + tableName + + ".files VERSION AS OF " + + currentSnapshotId + + " ORDER BY content"); + List actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList(); + Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + List expectedDataManifests = TestHelpers.dataManifests(table); + List expectedFiles = + expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null); + + Assert.assertEquals("actualFiles size should be 2", 2, actualFiles.size()); + + TestHelpers.assertEqualsSafe( + TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), actualFiles.get(0)); + + TestHelpers.assertEqualsSafe( + TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), actualFiles.get(1)); + + Assert.assertEquals( + "expectedFiles and actualFiles size should be the same", + actualFiles.size(), + expectedFiles.size()); + } + @Test public void testSnapshotReferencesMetatable() throws Exception { // Create table and insert data diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 004be8b95951..c5e367de03bb 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Map; import java.util.Set; +import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; @@ -159,7 +160,11 @@ public SparkTable copyWithSnapshotId(long newSnapshotId) { } private Schema snapshotSchema() { - return SnapshotUtil.schemaFor(icebergTable, snapshotId, null); + if (icebergTable instanceof BaseMetadataTable) { + return icebergTable.schema(); + } else { + return SnapshotUtil.schemaFor(icebergTable, snapshotId, null); + } } @Override