diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index 6a8be60eb078..b12d3e5030b7 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark; import java.util.List; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -37,6 +38,7 @@ import org.apache.spark.sql.types.LongType$; import org.apache.spark.sql.types.MapType$; import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StringType$; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType$; @@ -46,6 +48,8 @@ class TypeToSparkType extends TypeUtil.SchemaVisitor { TypeToSparkType() { } + public static final String METADATA_COL_ATTR_KEY = "__metadata_col"; + @Override public DataType schema(Schema schema, DataType structType) { return structType; @@ -59,8 +63,8 @@ public DataType struct(Types.StructType struct, List fieldResults) { for (int i = 0; i < fields.size(); i += 1) { Types.NestedField field = fields.get(i); DataType type = fieldResults.get(i); - StructField sparkField = StructField.apply( - field.name(), type, field.isOptional(), Metadata.empty()); + Metadata metadata = fieldMetadata(field.fieldId()); + StructField sparkField = StructField.apply(field.name(), type, field.isOptional(), metadata); if (field.doc() != null) { sparkField = sparkField.withComment(field.doc()); } @@ -122,4 +126,14 @@ public DataType primitive(Type.PrimitiveType primitive) { "Cannot convert unknown type to Spark: " + primitive); } } + + private Metadata fieldMetadata(int fieldId) { + if (MetadataColumns.metadataFieldIds().contains(fieldId)) { + return new MetadataBuilder() + .putBoolean(METADATA_COL_ATTR_KEY, true) + .build(); + } + + return Metadata.empty(); + } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java index 8bb32c969842..40c77cbecbad 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java @@ -20,8 +20,12 @@ package org.apache.iceberg.spark; import java.io.IOException; +import java.util.List; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.expressions.AttributeReference; +import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Test; @@ -33,8 +37,15 @@ public class TestSparkSchemaUtil { optional(2, "data", Types.StringType.get()) ); + private static final Schema TEST_SCHEMA_WITH_METADATA_COLS = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + MetadataColumns.FILE_PATH, + MetadataColumns.ROW_POSITION + ); + @Test - public void testEstiamteSizeMaxValue() throws IOException { + public void testEstimateSizeMaxValue() throws IOException { Assert.assertEquals("estimateSize returns Long max value", Long.MAX_VALUE, SparkSchemaUtil.estimateSize( null, @@ -42,14 +53,30 @@ public void testEstiamteSizeMaxValue() throws IOException { } @Test - public void testEstiamteSizeWithOverflow() throws IOException { + public void testEstimateSizeWithOverflow() throws IOException { long tableSize = SparkSchemaUtil.estimateSize(SparkSchemaUtil.convert(TEST_SCHEMA), Long.MAX_VALUE - 1); Assert.assertEquals("estimateSize handles overflow", Long.MAX_VALUE, tableSize); } @Test - public void testEstiamteSize() throws IOException { + public void testEstimateSize() throws IOException { long tableSize = SparkSchemaUtil.estimateSize(SparkSchemaUtil.convert(TEST_SCHEMA), 1); Assert.assertEquals("estimateSize matches with expected approximation", 24, tableSize); } + + @Test + public void testSchemaConversionWithMetaDataColumnSchema() { + StructType structType = SparkSchemaUtil.convert(TEST_SCHEMA_WITH_METADATA_COLS); + List attrRefs = scala.collection.JavaConverters.seqAsJavaList(structType.toAttributes()); + for (AttributeReference attrRef : attrRefs) { + if (MetadataColumns.isMetadataColumn(attrRef.name())) { + Assert.assertTrue("metadata columns should have __metadata_col in attribute metadata", + attrRef.metadata().contains(TypeToSparkType.METADATA_COL_ATTR_KEY) && + attrRef.metadata().getBoolean(TypeToSparkType.METADATA_COL_ATTR_KEY)); + } else { + Assert.assertFalse("non metadata columns should not have __metadata_col in attribute metadata", + attrRef.metadata().contains(TypeToSparkType.METADATA_COL_ATTR_KEY)); + } + } + } }