diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java new file mode 100644 index 000000000000..2cdefa731afe --- /dev/null +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericData.Record; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.TestHelpers; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestMetadataTables extends SparkExtensionsTestBase { + + public TestMetadataTables(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception { + // Create table and insert data + sql( + "CREATE TABLE %s (id bigint, data string) " + + "USING iceberg " + + "PARTITIONED BY (data) " + + "TBLPROPERTIES" + + "('format-version'='2', 'write.delete.mode'='merge-on-read')", + tableName); + + List recordsA = + Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a")); + spark + .createDataset(recordsA, Encoders.bean(SimpleRecord.class)) + .coalesce(1) + .writeTo(tableName) + .append(); + + Table table = Spark3Util.loadIcebergTable(spark, tableName); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = + Lists.newArrayList(RowFactory.create(3, "b", "c"), RowFactory.create(4, "b", "c")); + + StructType newSparkSchema = + SparkSchemaUtil.convert( + new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "category", Types.StringType.get()))); + + spark.createDataFrame(newRecords, newSparkSchema).coalesce(1).writeTo(tableName).append(); + + Long currentSnapshotId = table.currentSnapshot().snapshotId(); + + Dataset actualFilesDs = + spark + .read() + .format("iceberg") + .option("snapshot-id", currentSnapshotId) + .load(tableName + ".files") + .orderBy("content"); + + List actualFiles = TestHelpers.selectNonDerived(actualFilesDs).collectAsList(); + Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); + List expectedDataManifests = TestHelpers.dataManifests(table); + List expectedFiles = + expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null); + + Assert.assertEquals("actualFiles size should be 2", 2, actualFiles.size()); + + TestHelpers.assertEqualsSafe( + TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), actualFiles.get(0)); + + TestHelpers.assertEqualsSafe( + TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), actualFiles.get(1)); + + Assert.assertEquals( + "expectedFiles and actualFiles size should be the same", + actualFiles.size(), + expectedFiles.size()); + } + + /** + * Find matching manifest entries of an Iceberg table + * + * @param table iceberg table + * @param expectedContent file content to populate on entries + * @param entriesTableSchema schema of Manifest entries + * @param manifestsToExplore manifests to explore of the table + * @param partValue partition value that manifest entries must match, or null to skip filtering + */ + private List expectedEntries( + Table table, + FileContent expectedContent, + Schema entriesTableSchema, + List manifestsToExplore, + String partValue) + throws IOException { + List expected = Lists.newArrayList(); + for (ManifestFile manifest : manifestsToExplore) { + InputFile in = table.io().newInputFile(manifest.path()); + try (CloseableIterable rows = Avro.read(in).project(entriesTableSchema).build()) { + for (Record record : rows) { + if ((Integer) record.get("status") < 2 /* added or existing */) { + Record file = (Record) record.get("data_file"); + if (partitionMatch(file, partValue)) { + TestHelpers.asMetadataRecord(file, expectedContent); + expected.add(file); + } + } + } + } + } + return expected; + } + + private boolean partitionMatch(Record file, String partValue) { + if (partValue == null) { + return true; + } + Record partition = (Record) file.get(4); + return partValue.equals(partition.get(0).toString()); + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 3f7be9d00658..eebbce94138d 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; +import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Partitioning; import org.apache.iceberg.RowLevelOperationMode; @@ -124,7 +125,11 @@ public String name() { } private Schema snapshotSchema() { - return SnapshotUtil.schemaFor(icebergTable, snapshotId, null); + if (icebergTable instanceof BaseMetadataTable) { + return icebergTable.schema(); + } else { + return SnapshotUtil.schemaFor(icebergTable, snapshotId, null); + } } @Override diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index 42f4c1a1ab42..65539edbe6b1 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -36,15 +36,22 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Stream; import org.apache.arrow.vector.ValueVector; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.storage.serde2.io.DateWritable; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericRow; @@ -109,7 +116,7 @@ public static void assertEqualsBatch( } } - private static void assertEqualsSafe(Types.ListType list, Collection expected, List actual) { + public static void assertEqualsSafe(Types.ListType list, Collection expected, List actual) { Type elementType = list.elementType(); List expectedElements = Lists.newArrayList(expected); for (int i = 0; i < expectedElements.size(); i += 1) { @@ -144,7 +151,7 @@ private static void assertEqualsSafe(Types.MapType map, Map expected, Map< private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); @SuppressWarnings("unchecked") - private static void assertEqualsSafe(Type type, Object expected, Object actual) { + public static void assertEqualsSafe(Type type, Object expected, Object actual) { if (expected == null && actual == null) { return; } @@ -767,4 +774,26 @@ private static void assertEquals(String context, MapType map, MapData expected, actualValues.isNullAt(i) ? null : actualValues.get(i, valueType)); } } + + public static void asMetadataRecord(GenericData.Record file, FileContent content) { + file.put(0, content.id()); + file.put(3, 0); // specId + } + + public static List dataManifests(Table table) { + return table.currentSnapshot().dataManifests(table.io()); + } + + public static Dataset selectNonDerived(Dataset metadataTable) { + StructField[] fields = metadataTable.schema().fields(); + return metadataTable.select( + Stream.of(fields) + .filter(f -> !f.name().equals("readable_metrics")) // derived field + .map(f -> new Column(f.name())) + .toArray(Column[]::new)); + } + + public static Types.StructType nonDerivedSchema(Dataset metadataTable) { + return SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct(); + } }