From b0aa10af97ba8bb8968da1cfc601753d958a7ed1 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Mon, 23 May 2022 20:21:24 +0800 Subject: [PATCH] Support filter based on Schema --- .../java/org/apache/iceberg/ContentFile.java | 5 + .../java/org/apache/iceberg/DataFile.java | 6 +- .../java/org/apache/iceberg/ManifestFile.java | 11 +- .../iceberg/expressions/SchemaEvaluator.java | 165 ++++++++ .../java/org/apache/iceberg/TestHelpers.java | 10 + .../expressions/TestSchemaEvaluator.java | 371 ++++++++++++++++++ .../java/org/apache/iceberg/BaseFile.java | 16 +- .../java/org/apache/iceberg/DataFiles.java | 10 +- .../org/apache/iceberg/DataTableScan.java | 3 +- .../org/apache/iceberg/DeleteFileIndex.java | 45 ++- .../java/org/apache/iceberg/FileMetadata.java | 11 +- .../java/org/apache/iceberg/FindFiles.java | 1 + .../org/apache/iceberg/GenericDataFile.java | 4 +- .../org/apache/iceberg/GenericDeleteFile.java | 6 +- .../apache/iceberg/GenericManifestFile.java | 19 +- .../iceberg/IncrementalDataTableScan.java | 1 + .../apache/iceberg/ManifestFilterManager.java | 20 +- .../org/apache/iceberg/ManifestGroup.java | 35 +- .../org/apache/iceberg/ManifestReader.java | 39 +- .../org/apache/iceberg/ManifestWriter.java | 17 +- .../iceberg/MergingSnapshotProducer.java | 10 +- .../java/org/apache/iceberg/ScanSummary.java | 1 + .../org/apache/iceberg/SnapshotProducer.java | 3 +- .../java/org/apache/iceberg/V1Metadata.java | 15 +- .../java/org/apache/iceberg/V2Metadata.java | 21 +- .../java/org/apache/iceberg/avro/Avro.java | 22 +- .../iceberg/deletes/EqualityDeleteWriter.java | 9 + .../iceberg/deletes/PositionDeleteWriter.java | 9 + .../org/apache/iceberg/io/DataWriter.java | 8 + .../org/apache/iceberg/TableTestBase.java | 2 + .../org/apache/iceberg/TestDataTableScan.java | 41 ++ .../apache/iceberg/TestDeleteFileIndex.java | 88 +++++ .../iceberg/TestManifestListVersions.java | 9 +- .../apache/iceberg/TestManifestReader.java | 37 +- .../apache/iceberg/TestManifestWriter.java | 33 ++ .../iceberg/TestManifestWriterVersions.java | 6 +- .../iceberg/data/BaseFileWriterFactory.java | 11 + .../flink/sink/TestIcebergFilesCommitter.java | 2 +- .../flink/sink/TestIcebergFilesCommitter.java | 2 +- .../flink/sink/TestIcebergFilesCommitter.java | 2 +- .../main/java/org/apache/iceberg/orc/ORC.java | 21 +- .../org/apache/iceberg/parquet/Parquet.java | 21 +- .../apache/iceberg/spark/SparkDataFile.java | 7 + .../spark/actions/ManifestFileBean.java | 5 + .../apache/iceberg/spark/SparkDataFile.java | 7 + .../spark/actions/ManifestFileBean.java | 5 + .../apache/iceberg/spark/SparkDataFile.java | 7 + .../spark/actions/ManifestFileBean.java | 5 + .../apache/iceberg/spark/SparkDataFile.java | 7 + .../spark/actions/ManifestFileBean.java | 5 + 50 files changed, 1152 insertions(+), 64 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/expressions/SchemaEvaluator.java create mode 100644 api/src/test/java/org/apache/iceberg/expressions/TestSchemaEvaluator.java diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index 9c9e6385b0bb..91460e2bb15e 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -34,6 +34,11 @@ public interface ContentFile { */ Long pos(); + /** + * Return id of the schema when write this file. + */ + int schemaId(); + /** * Returns id of the partition spec used for partition metadata. */ diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 3d75052924bd..30e71951bb19 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -63,11 +63,12 @@ public interface DataFile extends ContentFile { "Equality comparison field IDs"); Types.NestedField SORT_ORDER_ID = optional(140, "sort_order_id", IntegerType.get(), "Sort order ID"); Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID"); + Types.NestedField SCHEMA_ID = optional(142, "schema_id", IntegerType.get(), "Schema ID"); int PARTITION_ID = 102; String PARTITION_NAME = "partition"; String PARTITION_DOC = "Partition data tuple, schema based on the partition spec"; - // NEXT ID TO ASSIGN: 142 + // NEXT ID TO ASSIGN: 143 static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry @@ -88,7 +89,8 @@ static StructType getType(StructType partitionType) { KEY_METADATA, SPLIT_OFFSETS, EQUALITY_IDS, - SORT_ORDER_ID + SORT_ORDER_ID, + SCHEMA_ID ); } diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java index 7eb89f49b5ed..7a85000bb72c 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestFile.java +++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java @@ -64,14 +64,15 @@ public interface ManifestFile { "Summary for each partition"); Types.NestedField KEY_METADATA = optional(519, "key_metadata", Types.BinaryType.get(), "Encryption key metadata blob"); - // next ID to assign: 520 + Types.NestedField SCHEMA_ID = optional(520, "schema_id", Types.IntegerType.get(), ""); + // next ID to assign: 521 Schema SCHEMA = new Schema( PATH, LENGTH, SPEC_ID, MANIFEST_CONTENT, SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID, ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT, ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT, - PARTITION_SUMMARIES, KEY_METADATA); + PARTITION_SUMMARIES, KEY_METADATA, SCHEMA_ID); static Schema schema() { return SCHEMA; @@ -188,6 +189,12 @@ default ByteBuffer keyMetadata() { return null; } + /** + * Returns ID of the {@link Schema} used to write the manifest file. This could be -1 when not all the + * entries have the same schema ID. + */ + int schemaId(); + /** * Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use * this method to make defensive copies. diff --git a/api/src/main/java/org/apache/iceberg/expressions/SchemaEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/SchemaEvaluator.java new file mode 100644 index 000000000000..c9fe82568bd2 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/expressions/SchemaEvaluator.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import java.io.Serializable; +import java.util.Set; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; + +import static org.apache.iceberg.expressions.Expressions.rewriteNot; + +/** + * Evaluates an {@link Expression} on a {@link Schema} to test whether the file with the schema may match. + */ +public class SchemaEvaluator implements Serializable { + private final Expression expr; + + public SchemaEvaluator(Types.StructType struct, Expression unbound) { + this(struct, unbound, true); + } + + public SchemaEvaluator(Types.StructType struct, Expression unbound, boolean caseSensitive) { + this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive); + } + + public boolean eval(Schema schema) { + return new EvalVisitor().eval(schema); + } + + private class EvalVisitor extends ExpressionVisitors.BoundVisitor { + private Schema schema; + + private boolean eval(Schema evalSchema) { + this.schema = evalSchema; + return ExpressionVisitors.visitEvaluator(expr, this); + } + + @Override + public Boolean alwaysTrue() { + return true; + } + + @Override + public Boolean alwaysFalse() { + return false; + } + + @Override + public Boolean not(Boolean result) { + return !result; + } + + @Override + public Boolean and(Boolean leftResult, Boolean rightResult) { + return leftResult && rightResult; + } + + @Override + public Boolean or(Boolean leftResult, Boolean rightResult) { + return leftResult || rightResult; + } + + @Override + public Boolean isNull(Bound valueExpr) { + // column exists, it could be null + // column does not exist, it is null + return true; + } + + @Override + public Boolean notNull(Bound valueExpr) { + return columnExists(valueExpr); + } + + @Override + public Boolean isNaN(Bound valueExpr) { + return columnExists(valueExpr); + } + + @Override + public Boolean notNaN(Bound valueExpr) { + // column exists, it could be NaN + // column does not exist, column value is null. Null could not be NaN. + return true; + } + + @Override + public Boolean lt(Bound valueExpr, Literal lit) { + return columnExists(valueExpr); + } + + @Override + public Boolean ltEq(Bound valueExpr, Literal lit) { + return columnExists(valueExpr); + } + + @Override + public Boolean gt(Bound valueExpr, Literal lit) { + return columnExists(valueExpr); + } + + @Override + public Boolean gtEq(Bound valueExpr, Literal lit) { + return columnExists(valueExpr); + } + + @Override + public Boolean eq(Bound valueExpr, Literal lit) { + // lit could not be null, so it should be return true when column exists + return columnExists(valueExpr); + } + + @Override + public Boolean notEq(Bound valueExpr, Literal lit) { + // column exists, it could be not equal to lit + // column does not exist, column value is null. Null not equal to lit because lit could not be null + return true; + } + + @Override + public Boolean in(Bound valueExpr, Set literalSet) { + return columnExists(valueExpr); + } + + @Override + public Boolean notIn(Bound valueExpr, Set literalSet) { + // column exists, it could be not in literalSet + // column does not exist, column values it null. Null could not be in literalSet + return true; + } + + @Override + public Boolean startsWith(Bound valueExpr, Literal lit) { + return columnExists(valueExpr); + } + + @Override + public Boolean notStartsWith(Bound valueExpr, Literal lit) { + // column exists, it could be not start with lit. + // column does not exist, it is not start with lit. + return true; + } + + private boolean columnExists(Bound valueExpr) { + return schema.findField(valueExpr.ref().fieldId()) != null; + } + } +} diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 401266ba42b4..de4f902aa70f 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -370,6 +370,11 @@ public ByteBuffer keyMetadata() { return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata); } + @Override + public int schemaId() { + return -1; + } + @Override public ManifestFile copy() { return this; @@ -476,6 +481,11 @@ public ByteBuffer keyMetadata() { return null; } + @Override + public int schemaId() { + return -1; + } + @Override public DataFile copy() { return this; diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestSchemaEvaluator.java b/api/src/test/java/org/apache/iceberg/expressions/TestSchemaEvaluator.java new file mode 100644 index 000000000000..5217b5a1cc03 --- /dev/null +++ b/api/src/test/java/org/apache/iceberg/expressions/TestSchemaEvaluator.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.expressions; + +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.expressions.Expressions.alwaysFalse; +import static org.apache.iceberg.expressions.Expressions.alwaysTrue; +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThan; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNaN; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notEqual; +import static org.apache.iceberg.expressions.Expressions.notIn; +import static org.apache.iceberg.expressions.Expressions.notNaN; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.notStartsWith; +import static org.apache.iceberg.expressions.Expressions.or; +import static org.apache.iceberg.expressions.Expressions.startsWith; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestSchemaEvaluator { + private static final Schema OLD_SCHEMA = new Schema( + required(1, "int_col", Types.IntegerType.get()), + optional(2, "double_col", Types.DoubleType.get()), + optional(3, "str_col", Types.StringType.get()) + ); + + private static final Schema NEW_SCHEMA = new Schema( + required(1, "int_col", Types.IntegerType.get()), + optional(2, "double_col", Types.DoubleType.get()), + optional(3, "str_col", Types.StringType.get()), + optional(4, "new_int_col", Types.IntegerType.get()), + optional(5, "new_double_col", Types.DoubleType.get()), + optional(6, "new_str_col", Types.StringType.get()), + optional(7, "struct_col", Types.StructType.of( + Types.NestedField.optional(8, "nest_col1", Types.StructType.of( + Types.NestedField.optional(9, "nest_col2", Types.FloatType.get()))))) + ); + + @Test + public void testLessThan() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThan("int_col", 7)); + Assert.assertTrue("existed int_col could be less than 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThan("double_col", 7)); + Assert.assertTrue("existed double_col could be less than 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThan("new_int_col", 7)); + Assert.assertFalse("not existed new_int_col could not be less than 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThan("new_double_col", 7)); + Assert.assertFalse("not existed new_double_col could not be less than 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThan("struct_col.nest_col1.nest_col2", 7)); + Assert.assertFalse( + "not existed struct_col.nest_col1.nest_col2 could not be less than 7", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testLessThanOrEqual() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThanOrEqual("int_col", 7)); + Assert.assertTrue("existed int_col could be less than or equal 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThanOrEqual("double_col", 7)); + Assert.assertTrue("existed double_col could be less than or equal 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThanOrEqual("new_int_col", 7)); + Assert.assertFalse("not existed new_int_col could not be less than or equal 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThanOrEqual("new_double_col", 7)); + Assert.assertFalse("not existed new_double_col could not be less than or equal 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), lessThanOrEqual("struct_col.nest_col1.nest_col2", 7)); + Assert.assertFalse( + "not existed struct_col.nest_col1.nest_col2 could not be less than or equal 7", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testGreaterThan() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThan("int_col", 7)); + Assert.assertTrue("existed int_col could be greater than 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThan("double_col", 7)); + Assert.assertTrue("existed double_col could be greater than 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThan("new_int_col", 7)); + Assert.assertFalse("not existed new_int_col could not be greater than 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThan("new_double_col", 7)); + Assert.assertFalse("not existed new_double_col could not be greater than 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThan("struct_col.nest_col1.nest_col2", 7)); + Assert.assertFalse( + "not existed struct_col.nest_col1.nest_col2 could not be greater than 7", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testGreaterThanOrEqual() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThanOrEqual("int_col", 7)); + Assert.assertTrue("existed int_col could be greater than or equal 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThanOrEqual("double_col", 7)); + Assert.assertTrue("existed double_col could be greater than or equal 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThanOrEqual("new_int_col", 7)); + Assert.assertFalse("not existed new_int_col could not be greater than or equal 7t", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThanOrEqual("new_double_col", 7)); + Assert.assertFalse("not existed new_double_col could not be greater than or equal 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), greaterThanOrEqual("struct_col.nest_col1.nest_col2", 7)); + Assert.assertFalse( + "not existed struct_col.nest_col1.nest_col2 could not be greater than or equal 7", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testEqual() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), equal("int_col", 7)); + Assert.assertTrue("existed int_col could be equal to 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), equal("double_col", 7)); + Assert.assertTrue("existed double_col could be equal to 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), equal("str_col", "abc")); + Assert.assertTrue("existed str_col could be equal to abc", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), equal("new_int_col", 7)); + Assert.assertFalse("not existed new_int_col could not be equal to 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), equal("new_double_col", 7)); + Assert.assertFalse("not existed new_double_col could not be equal to 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), equal("new_str_col", "abc")); + Assert.assertFalse("not existed new_str_col could not be equal to abc", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), equal("struct_col.nest_col1.nest_col2", 7)); + Assert.assertFalse( + "not existed struct_col.nest_col1.nest_col2 could not be equal to 7", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testNotEqual() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notEqual("int_col", 7)); + Assert.assertTrue("existed int_col could be not equal to 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notEqual("double_col", 7)); + Assert.assertTrue("existed double_col could be not equal to 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notEqual("str_col", "abc")); + Assert.assertTrue("existed str_col could be not equal to abc", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notEqual("new_int_col", 7)); + Assert.assertTrue("not existed new_int_col should be not equal to 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notEqual("new_double_col", 7)); + Assert.assertTrue("not existed new_double_col should be not equal to 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notEqual("new_str_col", "abc")); + Assert.assertTrue("not existed new_str_col should be not equal to abc", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notEqual("struct_col.nest_col1.nest_col2", 7)); + Assert.assertTrue( + "not existed struct_col.nest_col1.nest_col2 should be not equal to 7", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testStartWith() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), startsWith("str_col", "abc")); + Assert.assertTrue("existed str_col could be start with abc", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), startsWith("new_str_col", "abc")); + Assert.assertFalse("not existed new_str_col could not be start with abc", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testNotStartWith() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notStartsWith("str_col", "abc")); + Assert.assertTrue("existed str_col could be not start with abc", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notStartsWith("new_str_col", "abc")); + Assert.assertTrue("not existed new_str_col should not start with exists", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testAlwaysTrue() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), alwaysTrue()); + Assert.assertTrue("always true", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testAlwaysFalse() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), alwaysFalse()); + Assert.assertFalse("always false", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testIsNull() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), isNull("double_col")); + Assert.assertTrue("existed double_col could be null ", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), isNull("str_col")); + Assert.assertTrue("existed str_col could be null", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), isNull("new_double_col")); + Assert.assertTrue("not existed new_double_col col is null", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), isNull("new_str_col")); + Assert.assertTrue("not existed new_str_col col is null", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), isNull("struct_col.nest_col1.nest_col2")); + Assert.assertTrue("not existed struct_col.nest_col1.nest_col2 col is null", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testNotNull() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notNull("double_col")); + Assert.assertTrue("existed double_col could be not null", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notNull("str_col")); + Assert.assertTrue("existed str_col could be not null", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notNull("new_double_col")); + Assert.assertFalse("not existed new_double_col could not be not null", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notNull("new_str_col")); + Assert.assertFalse("not existed new_str_col could not be not null", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notNull("struct_col.nest_col1.nest_col2")); + Assert.assertFalse("not existed struct_col.nest_col1.nest_col2 could not be not null", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testIsNaN() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), isNaN("double_col")); + Assert.assertTrue("existed double_col could be NaN ", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), isNaN("new_double_col")); + Assert.assertFalse("not existed new_double_col could not be NaN ", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testNotNaN() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notNaN("double_col")); + Assert.assertTrue("existed double_col could be not NaN ", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notNaN("new_double_col")); + Assert.assertTrue("not existed new_double_col should not be NaN ", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testIn() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), in("int_col", 5, 6, 7)); + Assert.assertTrue("existed int_col could be in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), in("double_col", 5, 6, 7)); + Assert.assertTrue("existed double_col could be in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), in("new_int_col", 5, 6, 7)); + Assert.assertFalse("not existed new_int_col could not be in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), in("new_double_col", 5, 6, 7)); + Assert.assertFalse("not existed new_double_col could not be in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), in("struct_col.nest_col1.nest_col2", 5, 6, 7)); + Assert.assertFalse( + "not existed struct_col.nest_col1.nest_col2 could not be in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testNotIn() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notIn("int_col", 5, 6, 7)); + Assert.assertTrue("existed int_col could be not in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notIn("double_col", 5, 6, 7)); + Assert.assertTrue("existed double_col could be not in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notIn("new_int_col", 5, 6, 7)); + Assert.assertTrue("not existed new_int_col could be not in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notIn("new_double_col", 5, 6, 7)); + Assert.assertTrue("not existed new_double_col could be not in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), notIn("struct_col.nest_col1.nest_col2", 5, 6, 7)); + Assert.assertTrue( + "not existed struct_col.nest_col1.nest_col2 could be not in 5, 6, 7", evaluator.eval(OLD_SCHEMA)); + } + + + @Test + public void testAnd() { + SchemaEvaluator evaluator = new SchemaEvaluator( + NEW_SCHEMA.asStruct(), and(isNull("double_col"), startsWith("str_col", "abc"))); + Assert.assertTrue( + "existed double_col could be null and existed str_col could be start with abc", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator( + NEW_SCHEMA.asStruct(), and(isNull("new_double_col"), startsWith("new_str_col", "abc"))); + Assert.assertFalse( + "not existed double_col is null and not existed str_col should not be start with abc", + evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testOr() { + SchemaEvaluator evaluator = new SchemaEvaluator( + NEW_SCHEMA.asStruct(), or(isNull("double_col"), startsWith("str_col", "abc"))); + Assert.assertTrue( + "existed double_col could be null or existed str_col could be start with abc", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator( + NEW_SCHEMA.asStruct(), or(isNull("new_double_col"), startsWith("new_str_col", "abc"))); + Assert.assertTrue( + "not existed double_col is null or not existed str_col should not be start with abc", + evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testNot() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), not(lessThan("int_col", 7))); + Assert.assertTrue("existed int_col could be not less than 7", evaluator.eval(OLD_SCHEMA)); + + evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), not(lessThan("new_int_col", 7))); + Assert.assertFalse("not existed new_int_col could not be not less than 7", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testCaseInsensitive() { + SchemaEvaluator evaluator = new SchemaEvaluator(NEW_SCHEMA.asStruct(), startsWith("STR_COL", "abc"), false); + Assert.assertTrue("existed str_col could be start with abc", evaluator.eval(OLD_SCHEMA)); + } + + @Test + public void testCaseSensitive() { + AssertHelpers.assertThrows( + "INT_COL != int_col when case sensitivity is on", + ValidationException.class, + "Cannot find field 'INT_COL' in struct", + () -> new SchemaEvaluator(NEW_SCHEMA.asStruct(), not(lessThan("INT_COL", 7)), true) + ); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index b566b77e1331..493db9f1338c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -54,6 +54,7 @@ public PartitionData copy() { private Types.StructType partitionType; private Long fileOrdinal = null; + private int schemaId = -1; private int partitionSpecId = -1; private FileContent content = FileContent.DATA; private String filePath = null; @@ -116,12 +117,13 @@ public PartitionData copy() { this.partitionData = new PartitionData(partitionType); } - BaseFile(int specId, FileContent content, String filePath, FileFormat format, + BaseFile(int schemaId, int specId, FileContent content, String filePath, FileFormat format, PartitionData partition, long fileSizeInBytes, long recordCount, Map columnSizes, Map valueCounts, Map nullValueCounts, Map nanValueCounts, Map lowerBounds, Map upperBounds, List splitOffsets, int[] equalityFieldIds, Integer sortOrderId, ByteBuffer keyMetadata) { + this.schemaId = schemaId; this.partitionSpecId = specId; this.content = content; this.filePath = filePath; @@ -159,6 +161,7 @@ public PartitionData copy() { */ BaseFile(BaseFile toCopy, boolean fullCopy) { this.fileOrdinal = toCopy.fileOrdinal; + this.schemaId = toCopy.schemaId; this.partitionSpecId = toCopy.partitionSpecId; this.content = toCopy.content; this.filePath = toCopy.filePath; @@ -277,6 +280,9 @@ public void put(int i, Object value) { this.sortOrderId = (Integer) value; return; case 17: + this.schemaId = (int) value; + return; + case 18: this.fileOrdinal = (long) value; return; default: @@ -332,6 +338,8 @@ public Object get(int i) { case 16: return sortOrderId; case 17: + return schemaId; + case 18: return fileOrdinal; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); @@ -353,6 +361,11 @@ public Long pos() { return fileOrdinal; } + @Override + public int schemaId() { + return schemaId; + } + @Override public FileContent content() { return content; @@ -461,6 +474,7 @@ public String toString() { .add("split_offsets", splitOffsets == null ? "null" : splitOffsets()) .add("equality_ids", equalityIds == null ? "null" : equalityFieldIds()) .add("sort_order_id", sortOrderId) + .add("schema_id", schemaId) .toString(); } } diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index a765dc7fb86a..ae642fa7fa31 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -122,6 +122,7 @@ public static class Builder { private long recordCount = -1L; private long fileSizeInBytes = -1L; private Integer sortOrderId = SortOrder.unsorted().orderId(); + private int schemaId = -1; // optional fields private Map columnSizes = null; @@ -156,6 +157,7 @@ public void clear() { this.upperBounds = null; this.splitOffsets = null; this.sortOrderId = SortOrder.unsorted().orderId(); + this.schemaId = -1; } public Builder copy(DataFile toCopy) { @@ -177,6 +179,7 @@ public Builder copy(DataFile toCopy) { : ByteBuffers.copy(toCopy.keyMetadata()); this.splitOffsets = toCopy.splitOffsets() == null ? null : copyList(toCopy.splitOffsets()); this.sortOrderId = toCopy.sortOrderId(); + this.schemaId = toCopy.schemaId(); return this; } @@ -278,6 +281,11 @@ public Builder withSortOrder(SortOrder newSortOrder) { return this; } + public Builder withSchemaId(int newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public DataFile build() { Preconditions.checkArgument(filePath != null, "File path is required"); if (format == null) { @@ -288,7 +296,7 @@ public DataFile build() { Preconditions.checkArgument(recordCount >= 0, "Record count is required"); return new GenericDataFile( - specId, filePath, format, isPartitioned ? partitionData.copy() : null, + schemaId, specId, filePath, format, isPartitioned ? partitionData.copy() : null, fileSizeInBytes, new Metrics( recordCount, columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds), keyMetadata, splitOffsets, sortOrderId); diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 0bd3e0fedd10..a227b5eb2c14 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -28,7 +28,7 @@ public class DataTableScan extends BaseTableScan { static final ImmutableList SCAN_COLUMNS = ImmutableList.of( "snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes", - "file_size_in_bytes", "record_count", "partition", "key_metadata", "split_offsets" + "file_size_in_bytes", "record_count", "partition", "key_metadata", "split_offsets", "schema_id" ); static final ImmutableList SCAN_WITH_STATS_COLUMNS = ImmutableList.builder() .addAll(SCAN_COLUMNS) @@ -83,6 +83,7 @@ public CloseableIterable doPlanFiles() { .caseSensitive(isCaseSensitive()) .select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS) .filterData(filter()) + .schemasById(table().schema(), table().schemas()) .specsById(table().specs()) .ignoreDeleted(); diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index edef7c322896..1384707268cf 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -39,6 +39,7 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ManifestEvaluator; import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.expressions.SchemaEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -329,6 +330,8 @@ static class Builder { private final FileIO io; private final Set deleteManifests; private long minSequenceNumber = 0L; + private Schema currentSchema; + private Map schemasById = null; private Map specsById = null; private Expression dataFilter = Expressions.alwaysTrue(); private Expression partitionFilter = Expressions.alwaysTrue(); @@ -346,6 +349,14 @@ Builder afterSequenceNumber(long seq) { return this; } + Builder schemasById(Schema newCurrentSchema, Map newSchemasById) { + Preconditions.checkState(newCurrentSchema != null, "Current schema should not be null"); + Preconditions.checkState(newSchemasById != null, "newSchemaById should not be null"); + this.currentSchema = newCurrentSchema; + this.schemasById = newSchemasById; + return this; + } + Builder specsById(Map newSpecsById) { this.specsById = newSpecsById; return this; @@ -458,6 +469,17 @@ DeleteFileIndex build() { } private Iterable>> deleteManifestReaders() { + SchemaEvaluator schemaEval = + schemasById == null ? null : new SchemaEvaluator(currentSchema.asStruct(), dataFilter, caseSensitive); + Iterable matchingManifests = schemaEval == null ? deleteManifests : + Iterables.filter(deleteManifests, manifest -> { + if (manifest.schemaId() > -1) { + return schemaEval.eval(schemasById.get(manifest.schemaId())); + } + + return true; + }); + LoadingCache evalCache = specsById == null ? null : Caffeine.newBuilder().build(specId -> { PartitionSpec spec = specsById.get(specId); @@ -466,7 +488,7 @@ private Iterable>> deleteManifestRea spec, caseSensitive); }); - Iterable matchingManifests = evalCache == null ? deleteManifests : + matchingManifests = evalCache == null ? matchingManifests : Iterables.filter(deleteManifests, manifest -> manifest.content() == ManifestContent.DELETES && (manifest.hasAddedFiles() || manifest.hasExistingFiles()) && @@ -474,13 +496,20 @@ private Iterable>> deleteManifestRea return Iterables.transform( matchingManifests, - manifest -> - ManifestFiles.readDeleteManifest(manifest, io, specsById) - .filterRows(dataFilter) - .filterPartitions(partitionFilter) - .filterPartitions(partitionSet) - .caseSensitive(caseSensitive) - .liveEntries() + manifest -> { + ManifestReader readerBuilder = ManifestFiles.readDeleteManifest(manifest, io, specsById) + .filterRows(dataFilter) + .filterPartitions(partitionFilter) + .filterPartitions(partitionSet) + .caseSensitive(caseSensitive); + + if (schemasById != null && manifest.schemaId() == -1) { + // Manifest schema ID is -1 which means the entries maybe have different schema ID + readerBuilder.evaluateSchema(schemasById); + } + + return readerBuilder.liveEntries(); + } ); } } diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java b/core/src/main/java/org/apache/iceberg/FileMetadata.java index 84971b40d970..93db63c863f8 100644 --- a/core/src/main/java/org/apache/iceberg/FileMetadata.java +++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java @@ -49,6 +49,7 @@ public static class Builder { private FileFormat format = null; private long recordCount = -1L; private long fileSizeInBytes = -1L; + private int schemaId = -1; // optional fields private Map columnSizes = null; @@ -65,6 +66,7 @@ public static class Builder { this.specId = spec.specId(); this.isPartitioned = spec.fields().size() > 0; this.partitionData = isPartitioned ? DataFiles.newPartitionData(spec) : null; + this.schemaId = spec.schema().schemaId(); } public void clear() { @@ -82,6 +84,7 @@ public void clear() { this.lowerBounds = null; this.upperBounds = null; this.sortOrderId = null; + this.schemaId = -1; } public Builder copy(DeleteFile toCopy) { @@ -103,6 +106,7 @@ public Builder copy(DeleteFile toCopy) { this.keyMetadata = toCopy.keyMetadata() == null ? null : ByteBuffers.copy(toCopy.keyMetadata()); this.sortOrderId = toCopy.sortOrderId(); + this.schemaId = toCopy.schemaId(); return this; } @@ -207,6 +211,11 @@ public Builder withSortOrder(SortOrder newSortOrder) { return this; } + public Builder withSchemaId(int newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public DeleteFile build() { Preconditions.checkArgument(filePath != null, "File path is required"); if (format == null) { @@ -232,7 +241,7 @@ public DeleteFile build() { } return new GenericDeleteFile( - specId, content, filePath, format, isPartitioned ? DataFiles.copy(spec, partitionData) : null, + schemaId, specId, content, filePath, format, isPartitioned ? DataFiles.copy(spec, partitionData) : null, fileSizeInBytes, new Metrics( recordCount, columnSizes, valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds), equalityFieldIds, sortOrderId, keyMetadata); diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java b/core/src/main/java/org/apache/iceberg/FindFiles.java index 17d0753c36c0..0855a17a68bd 100644 --- a/core/src/main/java/org/apache/iceberg/FindFiles.java +++ b/core/src/main/java/org/apache/iceberg/FindFiles.java @@ -204,6 +204,7 @@ public CloseableIterable collect() { // when snapshot is not null CloseableIterable> entries = new ManifestGroup(ops.io(), snapshot.dataManifests()) + .schemasById(ops.current().schema(), ops.current().schemasById()) .specsById(ops.current().specsById()) .filterData(rowFilter) .filterFiles(fileFilter) diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 40785664d8f5..5f83cf9d91ca 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -34,10 +34,10 @@ class GenericDataFile extends BaseFile implements DataFile { super(avroSchema); } - GenericDataFile(int specId, String filePath, FileFormat format, PartitionData partition, + GenericDataFile(int schemaId, int specId, String filePath, FileFormat format, PartitionData partition, long fileSizeInBytes, Metrics metrics, ByteBuffer keyMetadata, List splitOffsets, Integer sortOrderId) { - super(specId, FileContent.DATA, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), + super(schemaId, specId, FileContent.DATA, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), metrics.nanValueCounts(), metrics.lowerBounds(), metrics.upperBounds(), splitOffsets, null, sortOrderId, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index 2cd5f33d29ac..34a7a0a8062f 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -34,10 +34,10 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { super(avroSchema); } - GenericDeleteFile(int specId, FileContent content, String filePath, FileFormat format, PartitionData partition, - long fileSizeInBytes, Metrics metrics, int[] equalityFieldIds, + GenericDeleteFile(int schemaId, int specId, FileContent content, String filePath, FileFormat format, + PartitionData partition, long fileSizeInBytes, Metrics metrics, int[] equalityFieldIds, Integer sortOrderId, ByteBuffer keyMetadata) { - super(specId, content, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), + super(schemaId, specId, content, filePath, format, partition, fileSizeInBytes, metrics.recordCount(), metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(), metrics.nanValueCounts(), metrics.lowerBounds(), metrics.upperBounds(), null, equalityFieldIds, sortOrderId, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index 7fff183aa046..45f1215bdc09 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -61,6 +61,7 @@ public class GenericManifestFile private Long deletedRowsCount = null; private PartitionFieldSummary[] partitions = null; private byte[] keyMetadata = null; + private int schemaId = -1; /** * Used by Avro reflection to instantiate this class when reading manifest files. @@ -105,13 +106,14 @@ public GenericManifestFile(Schema avroSchema) { this.partitions = null; this.fromProjectionPos = null; this.keyMetadata = null; + this.schemaId = -1; } public GenericManifestFile(String path, long length, int specId, ManifestContent content, long sequenceNumber, long minSequenceNumber, Long snapshotId, int addedFilesCount, long addedRowsCount, int existingFilesCount, long existingRowsCount, int deletedFilesCount, long deletedRowsCount, - List partitions, ByteBuffer keyMetadata) { + List partitions, ByteBuffer keyMetadata, int schemaId) { this.avroSchema = AVRO_SCHEMA; this.manifestPath = path; this.length = length; @@ -129,6 +131,7 @@ public GenericManifestFile(String path, long length, int specId, ManifestContent this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); this.fromProjectionPos = null; this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); + this.schemaId = schemaId; } /** @@ -160,6 +163,7 @@ private GenericManifestFile(GenericManifestFile toCopy) { } this.fromProjectionPos = toCopy.fromProjectionPos; this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length); + this.schemaId = toCopy.schemaId; } /** @@ -256,6 +260,11 @@ public ByteBuffer keyMetadata() { return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata); } + @Override + public int schemaId() { + return schemaId; + } + @Override public int size() { return ManifestFile.schema().columns().size(); @@ -304,6 +313,8 @@ public Object get(int i) { return partitions(); case 14: return keyMetadata(); + case 15: + return schemaId; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -365,6 +376,9 @@ public void set(int i, T value) { case 14: this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); return; + case 15: + this.schemaId = value != null ? (Integer) value : -1; + return; default: // ignore the object, it must be from a newer version of the format } @@ -419,6 +433,7 @@ public String toString() { .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") .add("sequence_number", sequenceNumber) .add("min_sequence_number", minSequenceNumber) + .add("schema_id", schemaId) .toString(); } @@ -438,7 +453,7 @@ private CopyBuilder(ManifestFile toCopy) { toCopy.sequenceNumber(), toCopy.minSequenceNumber(), toCopy.snapshotId(), toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(), toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(), - copyList(toCopy.partitions(), PartitionFieldSummary::copy), toCopy.keyMetadata()); + copyList(toCopy.partitions(), PartitionFieldSummary::copy), toCopy.keyMetadata(), toCopy.schemaId()); } } diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java index fda938f81609..e7f24f93c30c 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java @@ -88,6 +88,7 @@ public CloseableIterable planFiles() { manifestEntry -> snapshotIds.contains(manifestEntry.snapshotId()) && manifestEntry.status() == ManifestEntry.Status.ADDED) + .schemasById(table().schema(), table().schemas()) .specsById(table().specs()) .ignoreDeleted(); diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 81a7fbfad9a3..dc93bdfefbca 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -33,6 +33,7 @@ import org.apache.iceberg.expressions.InclusiveMetricsEvaluator; import org.apache.iceberg.expressions.ManifestEvaluator; import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.expressions.SchemaEvaluator; import org.apache.iceberg.expressions.StrictMetricsEvaluator; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -72,6 +73,8 @@ public String partition() { private final PartitionSet deleteFilePartitions; private final PartitionSet dropPartitions; private final CharSequenceSet deletePaths = CharSequenceSet.empty(); + private final Schema currentSchema; + private final Map schemasById; private Expression deleteExpression = Expressions.alwaysFalse(); private long minSequenceNumber = 0; private boolean hasPathOnlyDeletes = false; @@ -89,8 +92,10 @@ public String partition() { private final Supplier workerPoolSupplier; - protected ManifestFilterManager(Map specsById, - Supplier executorSupplier) { + protected ManifestFilterManager(Schema currentSchema, Map schemasById, + Map specsById, Supplier executorSupplier) { + this.currentSchema = currentSchema; + this.schemasById = schemasById; this.specsById = specsById; this.deleteFilePartitions = PartitionSet.create(specsById); this.dropPartitions = PartitionSet.create(specsById); @@ -317,12 +322,19 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) { } } + @SuppressWarnings("checkstyle:CyclomaticComplexity") private boolean canContainDeletedFiles(ManifestFile manifest) { - boolean canContainExpressionDeletes; + boolean canContainExpressionDeletes = true; if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) { + if (currentSchema != null && schemasById != null && manifest.schemaId() > -1) { + SchemaEvaluator schemaEvaluator = + new SchemaEvaluator(currentSchema.asStruct(), deleteExpression, caseSensitive); + canContainExpressionDeletes = schemaEvaluator.eval(schemasById.get(manifest.schemaId())); + } + ManifestEvaluator manifestEvaluator = ManifestEvaluator.forRowFilter(deleteExpression, specsById.get(manifest.partitionSpecId()), caseSensitive); - canContainExpressionDeletes = manifestEvaluator.eval(manifest); + canContainExpressionDeletes = canContainExpressionDeletes && manifestEvaluator.eval(manifest); } else { canContainExpressionDeletes = false; } diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 1120bdfb36d3..8efff0afc896 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -33,8 +33,10 @@ import org.apache.iceberg.expressions.ManifestEvaluator; import org.apache.iceberg.expressions.Projections; import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.expressions.SchemaEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -49,6 +51,8 @@ class ManifestGroup { private final DeleteFileIndex.Builder deleteIndexBuilder; private Predicate manifestPredicate; private Predicate> manifestEntryPredicate; + private Schema currentSchema; + private Map schemasById; private Map specsById; private Expression dataFilter; private Expression fileFilter; @@ -82,6 +86,15 @@ class ManifestGroup { this.manifestEntryPredicate = e -> true; } + ManifestGroup schemasById(Schema newCurrentSchema, Map newSchemasById) { + Preconditions.checkState(newCurrentSchema != null, "Current schema should not be null"); + Preconditions.checkState(newSchemasById != null, "newSchemaById should not be null"); + this.currentSchema = newCurrentSchema; + this.schemasById = newSchemasById; + deleteIndexBuilder.schemasById(newCurrentSchema, newSchemasById); + return this; + } + ManifestGroup specsById(Map newSpecsById) { this.specsById = newSpecsById; deleteIndexBuilder.specsById(newSpecsById); @@ -201,6 +214,7 @@ public CloseableIterable> entries() { return CloseableIterable.concat(entries((manifest, entries) -> entries)); } + @SuppressWarnings("checkstyle:CyclomaticComplexity") private Iterable> entries( BiFunction>, CloseableIterable> entryFn) { LoadingCache evalCache = specsById == null ? @@ -211,6 +225,9 @@ private Iterable> entries( spec, caseSensitive); }); + SchemaEvaluator schemaEval = + schemasById == null ? null : new SchemaEvaluator(currentSchema.asStruct(), dataFilter, caseSensitive); + Evaluator evaluator; if (fileFilter != null && fileFilter != Expressions.alwaysTrue()) { evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter, caseSensitive); @@ -218,8 +235,17 @@ private Iterable> entries( evaluator = null; } - Iterable matchingManifests = evalCache == null ? dataManifests : - Iterables.filter(dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); + Iterable matchingManifests = schemaEval == null ? dataManifests : + Iterables.filter(dataManifests, manifest -> { + if (manifest.schemaId() > -1) { + return schemaEval.eval(schemasById.get(manifest.schemaId())); + } + + return true; + }); + + matchingManifests = evalCache == null ? matchingManifests : + Iterables.filter(matchingManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); if (ignoreDeleted) { // only scan manifests that have entries other than deletes @@ -248,6 +274,11 @@ private Iterable> entries( .caseSensitive(caseSensitive) .select(columns); + if (schemasById != null && manifest.schemaId() == -1) { + // Manifest schema ID is -1 which means the entries maybe have different schema ID + reader.evaluateSchema(schemasById); + } + CloseableIterable> entries = reader.entries(); if (ignoreDeleted) { entries = reader.liveEntries(); diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 73f4c66b6b05..9d06f1910053 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -32,6 +32,7 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.InclusiveMetricsEvaluator; import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.expressions.SchemaEvaluator; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -80,6 +81,7 @@ private String fileClass() { private final PartitionSpec spec; private final Schema fileSchema; + // updated by configuration methods private PartitionSet partitionSet = null; private Expression partFilter = alwaysTrue(); @@ -87,10 +89,12 @@ private String fileClass() { private Schema fileProjection = null; private Collection columns = null; private boolean caseSensitive = true; + private Map schemasById; // lazily initialized private Evaluator lazyEvaluator = null; private InclusiveMetricsEvaluator lazyMetricsEvaluator = null; + private SchemaEvaluator lazySchemaEvaluator = null; protected ManifestReader(InputFile file, Map specsById, InheritableMetadata inheritableMetadata, FileType content) { @@ -175,12 +179,21 @@ public ManifestReader caseSensitive(boolean isCaseSensitive) { return this; } + public ManifestReader evaluateSchema(Map newSchemasById) { + Preconditions.checkState(newSchemasById != null, + "schemaById should not be null when enable evaluate ManifestEntry for Schema"); + this.schemasById = newSchemasById; + return this; + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") CloseableIterable> entries() { if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) || (partFilter != null && partFilter != Expressions.alwaysTrue()) || (partitionSet != null)) { Evaluator evaluator = evaluator(); InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator(); + SchemaEvaluator schemaEvaluator = schemaEvaluator(); // ensure stats columns are present for metrics evaluation boolean requireStatsProjection = requireStatsProjection(rowFilter, columns); @@ -188,10 +201,17 @@ CloseableIterable> entries() { return CloseableIterable.filter( open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)), - entry -> entry != null && - evaluator.eval(entry.file().partition()) && - metricsEvaluator.eval(entry.file()) && - inPartitionSet(entry.file())); + entry -> { + boolean keep = entry != null; + if (keep && schemaEvaluator != null && entry.file().schemaId() > -1) { + keep = schemaEvaluator.eval(schemasById.get(entry.file().schemaId())); + } + + return keep && + evaluator.eval(entry.file().partition()) && + metricsEvaluator.eval(entry.file()) && + inPartitionSet(entry.file()); + }); } else { return open(projection(fileSchema, fileProjection, columns, caseSensitive)); } @@ -288,6 +308,17 @@ private InclusiveMetricsEvaluator metricsEvaluator() { return lazyMetricsEvaluator; } + private SchemaEvaluator schemaEvaluator() { + if (schemasById == null) { + return null; + } + + if (lazySchemaEvaluator == null) { + this.lazySchemaEvaluator = new SchemaEvaluator(spec.schema().asStruct(), rowFilter, caseSensitive); + } + return lazySchemaEvaluator; + } + private static boolean requireStatsProjection(Expression rowFilter, Collection columns) { // Make sure we have all stats columns for metrics evaluator return rowFilter != Expressions.alwaysTrue() && diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 7c81c15e0cc0..88da9fc720bc 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -51,6 +51,7 @@ public abstract class ManifestWriter> implements FileAp private int deletedFiles = 0; private long deletedRows = 0L; private Long minSequenceNumber = null; + private int schemaId = -2; private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.file = file; @@ -88,6 +89,15 @@ void addEntry(ManifestEntry entry) { if (entry.sequenceNumber() != null && (minSequenceNumber == null || entry.sequenceNumber() < minSequenceNumber)) { this.minSequenceNumber = entry.sequenceNumber(); } + + int fileSchemaId = entry.file().schemaId(); + if (schemaId == -2) { + // the schemaId in initial state + schemaId = fileSchemaId; + } else if (schemaId >= -1 && schemaId != fileSchemaId) { + // Checking whether schemaId is valid which means all the files have same schemaId + schemaId = -1; + } writer.add(prepare(entry)); } @@ -171,9 +181,10 @@ public ManifestFile toManifestFile() { // if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min // sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it. long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ; - return new GenericManifestFile(file.location(), writer.length(), specId, content(), - UNASSIGNED_SEQ, minSeqNumber, snapshotId, - addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries(), null); + int validSchemaId = schemaId == -2 ? -1 : schemaId; + return new GenericManifestFile(file.location(), writer.length(), specId, content(), UNASSIGNED_SEQ, minSeqNumber, + snapshotId, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries(), + null, validSchemaId); } @Override diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 507b21ba86e4..72dfb6f950b1 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -343,7 +343,7 @@ private CloseableIterable> addedDataFiles(TableMetadata .ignoreExisting(); if (dataFilter != null) { - manifestGroup = manifestGroup.filterData(dataFilter); + manifestGroup = manifestGroup.filterData(dataFilter).schemasById(base.schema(), base.schemasById()); } if (partitionSet != null) { @@ -556,7 +556,7 @@ private CloseableIterable> deletedDataFiles(TableMetadat .ignoreExisting(); if (dataFilter != null) { - manifestGroup = manifestGroup.filterData(dataFilter); + manifestGroup = manifestGroup.filterData(dataFilter).schemasById(base.schema(), base.schemasById()); } if (partitionSet != null) { @@ -870,7 +870,8 @@ private List newDeleteFilesAsManifests() { private class DataFileFilterManager extends ManifestFilterManager { private DataFileFilterManager() { - super(ops.current().specsById(), MergingSnapshotProducer.this::workerPool); + super(ops.current().schema(), ops.current().schemasById(), ops.current().specsById(), + MergingSnapshotProducer.this::workerPool); } @Override @@ -922,7 +923,8 @@ protected ManifestReader newManifestReader(ManifestFile manifest) { private class DeleteFileFilterManager extends ManifestFilterManager { private DeleteFileFilterManager() { - super(ops.current().specsById(), MergingSnapshotProducer.this::workerPool); + super(ops.current().schema(), ops.current().schemasById(), ops.current().specsById(), + MergingSnapshotProducer.this::workerPool); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ScanSummary.java b/core/src/main/java/org/apache/iceberg/ScanSummary.java index 2bca3b74d775..3f9f2e633610 100644 --- a/core/src/main/java/org/apache/iceberg/ScanSummary.java +++ b/core/src/main/java/org/apache/iceberg/ScanSummary.java @@ -220,6 +220,7 @@ private Map computeTopPartitionMetrics( limit, throwIfLimited, Comparators.charSequences()); try (CloseableIterable> entries = new ManifestGroup(ops.io(), manifests) + .schemasById(ops.current().schema(), ops.current().schemasById()) .specsById(ops.current().specsById()) .filterData(rowFilter) .ignoreDeleted() diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 0f7eed5e491e..db2fa5d516ec 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -462,7 +462,8 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(), ManifestContent.DATA, manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId, - addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries(), null); + addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries(), + null, manifest.schemaId()); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path()); diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index d368679609f3..4cc124f95af3 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -180,6 +180,11 @@ public List partitions() { return wrapped.partitions(); } + @Override + public int schemaId() { + return wrapped.schemaId(); + } + @Override public ManifestFile copy() { return wrapped.copy(); @@ -215,7 +220,8 @@ static Types.StructType dataFileSchema(Types.StructType partitionType) { DataFile.UPPER_BOUNDS, DataFile.KEY_METADATA, DataFile.SPLIT_OFFSETS, - DataFile.SORT_ORDER_ID + DataFile.SORT_ORDER_ID, + DataFile.SCHEMA_ID ); } @@ -356,6 +362,8 @@ public Object get(int pos) { return wrapped.splitOffsets(); case 14: return wrapped.sortOrderId(); + case 15: + return wrapped.schemaId(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @@ -455,6 +463,11 @@ public Integer sortOrderId() { return wrapped.sortOrderId(); } + @Override + public int schemaId() { + return wrapped.schemaId(); + } + @Override public DataFile copy() { return wrapped.copy(); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 40b8624baa3b..59833a4a3fef 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -47,7 +47,9 @@ private V2Metadata() { ManifestFile.ADDED_ROWS_COUNT.asRequired(), ManifestFile.EXISTING_ROWS_COUNT.asRequired(), ManifestFile.DELETED_ROWS_COUNT.asRequired(), - ManifestFile.PARTITION_SUMMARIES + ManifestFile.PARTITION_SUMMARIES, + ManifestFile.KEY_METADATA, + ManifestFile.SCHEMA_ID ); /** @@ -134,6 +136,8 @@ public Object get(int pos) { return wrapped.partitions(); case 14: return wrapped.keyMetadata(); + case 15: + return wrapped.schemaId(); default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -229,6 +233,11 @@ public ByteBuffer keyMetadata() { return wrapped.keyMetadata(); } + @Override + public int schemaId() { + return wrapped.schemaId(); + } + @Override public ManifestFile copy() { return wrapped.copy(); @@ -263,7 +272,8 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.KEY_METADATA, DataFile.SPLIT_OFFSETS, DataFile.EQUALITY_IDS, - DataFile.SORT_ORDER_ID + DataFile.SORT_ORDER_ID, + DataFile.SCHEMA_ID ); } @@ -419,6 +429,8 @@ public Object get(int pos) { return wrapped.equalityFieldIds(); case 15: return wrapped.sortOrderId(); + case 16: + return wrapped.schemaId(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @@ -518,6 +530,11 @@ public Integer sortOrderId() { return wrapped.sortOrderId(); } + @Override + public int schemaId() { + return wrapped.schemaId(); + } + @Override public F copy() { throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper"); diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 1aceb18bfc28..0366311615f8 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -265,6 +265,7 @@ public static class DataWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private SortOrder sortOrder = null; + private int schemaId = -1; private DataWriteBuilder(OutputFile file) { this.appenderBuilder = write(file); @@ -276,6 +277,8 @@ public DataWriteBuilder forTable(Table table) { withSpec(table.spec()); setAll(table.properties()); metricsConfig(MetricsConfig.forTable(table)); + withSchemaId(table.schema().schemaId()); + return this; } @@ -338,13 +341,19 @@ public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public DataWriteBuilder withSchemaId(int newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public DataWriter build() throws IOException { Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, "Partition must not be null when creating data writer for partitioned spec"); FileAppender fileAppender = appenderBuilder.build(); - return new DataWriter<>(fileAppender, FileFormat.AVRO, location, spec, partition, keyMetadata, sortOrder); + return new DataWriter<>( + fileAppender, FileFormat.AVRO, location, spec, partition, keyMetadata, sortOrder, schemaId); } } @@ -362,6 +371,7 @@ public static class DeleteWriteBuilder { private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; private SortOrder sortOrder; + private int schemaId = -1; private DeleteWriteBuilder(OutputFile file) { this.appenderBuilder = write(file); @@ -373,6 +383,7 @@ public DeleteWriteBuilder forTable(Table table) { withSpec(table.spec()); setAll(table.properties()); metricsConfig(MetricsConfig.forTable(table)); + withSchemaId(table.schema().schemaId()); return this; } @@ -450,6 +461,11 @@ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public DeleteWriteBuilder withSchemaId(int newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public EqualityDeleteWriter buildEqualityWriter() throws IOException { Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema"); Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); @@ -470,7 +486,7 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); return new EqualityDeleteWriter<>( - appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, sortOrder, + appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, sortOrder, schemaId, equalityFieldIds); } @@ -501,7 +517,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); return new PositionDeleteWriter<>( - appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata); + appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, schemaId); } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java index c914ad224f30..fbd72b0799be 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java @@ -42,11 +42,18 @@ public class EqualityDeleteWriter implements FileWriter private final ByteBuffer keyMetadata; private final int[] equalityFieldIds; private final SortOrder sortOrder; + private final int schemaId; private DeleteFile deleteFile = null; public EqualityDeleteWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, SortOrder sortOrder, int... equalityFieldIds) { + this(appender, format, location, spec, partition, keyMetadata, sortOrder, -1, equalityFieldIds); + } + + public EqualityDeleteWriter(FileAppender appender, FileFormat format, String location, + PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, + SortOrder sortOrder, int schemaId, int... equalityFieldIds) { this.appender = appender; this.format = format; this.location = location; @@ -54,6 +61,7 @@ public EqualityDeleteWriter(FileAppender appender, FileFormat format, String this.partition = partition; this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; this.sortOrder = sortOrder; + this.schemaId = schemaId; this.equalityFieldIds = equalityFieldIds; } @@ -100,6 +108,7 @@ public void close() throws IOException { .withFileSizeInBytes(appender.length()) .withMetrics(appender.metrics()) .withSortOrder(sortOrder) + .withSchemaId(schemaId) .build(); } } diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java index a7dff07e7105..79f734a3b466 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java @@ -42,10 +42,17 @@ public class PositionDeleteWriter implements FileWriter, De private final ByteBuffer keyMetadata; private final PositionDelete delete; private final CharSequenceSet referencedDataFiles; + private final int schemaId; private DeleteFile deleteFile = null; public PositionDeleteWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) { + this(appender, format, location, spec, partition, keyMetadata, -1); + } + + public PositionDeleteWriter(FileAppender appender, FileFormat format, String location, + PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, + int schemaId) { this.appender = appender; this.format = format; this.location = location; @@ -54,6 +61,7 @@ public PositionDeleteWriter(FileAppender appender, FileFormat format this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; this.delete = PositionDelete.create(); this.referencedDataFiles = CharSequenceSet.empty(); + this.schemaId = schemaId; } @Override @@ -100,6 +108,7 @@ public void close() throws IOException { .withEncryptionKeyMetadata(keyMetadata) .withFileSizeInBytes(appender.length()) .withMetrics(appender.metrics()) + .withSchemaId(schemaId) .build(); } } diff --git a/core/src/main/java/org/apache/iceberg/io/DataWriter.java b/core/src/main/java/org/apache/iceberg/io/DataWriter.java index 090ccebfa80f..7c1c44db2b6b 100644 --- a/core/src/main/java/org/apache/iceberg/io/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/DataWriter.java @@ -38,6 +38,7 @@ public class DataWriter implements FileWriter { private final StructLike partition; private final ByteBuffer keyMetadata; private final SortOrder sortOrder; + private final int schemaId; private DataFile dataFile = null; public DataWriter(FileAppender appender, FileFormat format, String location, @@ -47,6 +48,11 @@ public DataWriter(FileAppender appender, FileFormat format, String location, public DataWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, SortOrder sortOrder) { + this(appender, format, location, spec, partition, keyMetadata, sortOrder, -1); + } + + public DataWriter(FileAppender appender, FileFormat format, String location, PartitionSpec spec, + StructLike partition, EncryptionKeyMetadata keyMetadata, SortOrder sortOrder, int schemaId) { this.appender = appender; this.format = format; this.location = location; @@ -54,6 +60,7 @@ public DataWriter(FileAppender appender, FileFormat format, String location, this.partition = partition; this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; this.sortOrder = sortOrder; + this.schemaId = schemaId; } @Override @@ -89,6 +96,7 @@ public void close() throws IOException { .withMetrics(appender.metrics()) .withSplitOffsets(appender.splitOffsets()) .withSortOrder(sortOrder) + .withSchemaId(schemaId) .build(); } } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 5abfb2c0330a..9ea8a96bc10d 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -452,6 +452,7 @@ protected DataFile newDataFile(String partitionPath) { .withFileSizeInBytes(10) .withPartitionPath(partitionPath) .withRecordCount(1) + .withSchemaId(table.schema().schemaId()) .build(); } @@ -463,6 +464,7 @@ protected DeleteFile newDeleteFile(int specId, String partitionPath) { .withFileSizeInBytes(10) .withPartitionPath(partitionPath) .withRecordCount(1) + .withSchemaId(table.schema().schemaId()) .build(); } diff --git a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java index 151d48546c8b..b5ac38f24d67 100644 --- a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java +++ b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java @@ -20,11 +20,15 @@ package org.apache.iceberg; import java.io.IOException; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; @@ -130,4 +134,41 @@ public void testTableScanWithPlanExecutor() { Assert.assertEquals(2, Iterables.size(scan.planFiles())); Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0); } + + @Test + public void testTableScanWithSchemaEvaluation() throws IOException { + DataFile fileA = newDataFile("data_bucket=0"); + DataFile fileB = newDataFile("data_bucket=0"); + table.newFastAppend().appendFile(fileA).appendFile(fileB).commit(); + + table.updateSchema().addColumn("name", Types.StringType.get()).commit(); + table.refresh(); + + DataFile fileC = newDataFile("data_bucket=0"); + DataFile fileD = newDataFile("data_bucket=0"); + table.newFastAppend().appendFile(fileC).appendFile(fileD).commit(); + + TableScan scan = table.newScan().filter(Expressions.startsWith("data", "abc")); + try (CloseableIterable files = scan.planFiles()) { + Assert.assertEquals("Should read 4 files", 4, Iterables.size(files)); + + Set dataFiles = + Streams.stream(files).map(fileScanTask -> fileScanTask.file().path().toString()).collect(Collectors.toSet()); + Set expectedFiles = + Sets.newHashSet(fileA, fileB, fileC, fileD).stream() + .map(file -> file.path().toString()) + .collect(Collectors.toSet()); + Assert.assertEquals("Should read all files", expectedFiles, dataFiles); + } + + scan = table.newScan().filter(Expressions.startsWith("name", "abc")); + try (CloseableIterable files = scan.planFiles()) { + Assert.assertEquals("Should read 2 files", 2, Iterables.size(files)); + + Set dataFiles = + Streams.stream(files).map(fileScanTask -> fileScanTask.file().path().toString()).collect(Collectors.toSet()); + Assert.assertEquals("Should only read files with new schema", + Sets.newHashSet(fileC.path().toString(), fileD.path().toString()), dataFiles); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java index 322eb921d267..69c361fa57df 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java @@ -23,17 +23,23 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeWrapper; import org.junit.Assert; import org.junit.Test; +import static org.apache.iceberg.expressions.Expressions.and; import static org.apache.iceberg.expressions.Expressions.bucket; import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.startsWith; public class TestDeleteFileIndex extends TableTestBase { public TestDeleteFileIndex() { @@ -455,4 +461,86 @@ public void testPartitionedTableWithExistingDeleteFile() { Sets.newHashSet(FILE_A_EQ_1.path(), FILE_A_POS_1.path()), Sets.newHashSet(Iterables.transform(task.deletes(), ContentFile::path))); } + + @Test + public void testWithSchemaEvaluation() { + // those files with schema ID: 0 + DataFile dataFileA = newDataFile("data_bucket=0"); + DeleteFile posDeleteFileA = newDeleteFile(table.spec().specId(), "data_bucket=0"); + DeleteFile eqDeleteFileA = newEqualityDeleteFile( + table.spec().specId(), "data_bucket=0", table.schema().asStruct().fields().get(0).fieldId()); + + table.newAppend() + .appendFile(dataFileA) + .commit(); + + table.newRowDelta() + .addDeletes(posDeleteFileA) + .addDeletes(eqDeleteFileA) + .commit(); + + // update table schema + table.updateSchema().addColumn("name", Types.StringType.get()).commit(); + table.refresh(); + + // those files with schema ID: 1 + DataFile dataFileB = newDataFile("data_bucket=0"); + DeleteFile posDeleteFileB = newDeleteFile(table.spec().specId(), "data_bucket=0"); + DeleteFile eqDeleteFileB = newEqualityDeleteFile( + table.spec().specId(), "data_bucket=0", table.schema().asStruct().fields().get(0).fieldId()); + + table.newAppend() + .appendFile(dataFileB) + .commit(); + + table.newRowDelta() + .addDeletes(posDeleteFileB) + .addDeletes(eqDeleteFileB) + .commit(); + + // scan table without filter on new added column + List tasks = + Lists.newArrayList(table.newScan().filter(equal(bucket("data", BUCKETS_NUMBER), 0)) + .planFiles().iterator()); + Assert.assertEquals("Should have two task", 2, tasks.size()); + + Set dataFiles = tasks.stream().map(file -> file.file().path()).collect(Collectors.toSet()); + Assert.assertEquals("Should have the correct data file path", + Sets.newHashSet(dataFileA.path(), dataFileB.path()), dataFiles); + + FileScanTask taskA; + FileScanTask taskB; + if (tasks.get(0).file().path().equals(dataFileA.path())) { + taskA = tasks.get(0); + taskB = tasks.get(1); + } else { + taskA = tasks.get(1); + taskB = tasks.get(0); + } + + Assert.assertEquals("Should have two associated delete files for data file A", + 4, taskA.deletes().size()); + Assert.assertEquals("Should have expected delete files", + Sets.newHashSet(posDeleteFileA.path(), eqDeleteFileA.path(), posDeleteFileB.path(), eqDeleteFileB.path()), + Sets.newHashSet(Iterables.transform(taskA.deletes(), ContentFile::path))); + + Assert.assertEquals("Should have four associated delete files for data file B", + 2, taskB.deletes().size()); + Assert.assertEquals("Should have expected delete files", + Sets.newHashSet(posDeleteFileB.path(), eqDeleteFileB.path()), + Sets.newHashSet(Iterables.transform(taskB.deletes(), ContentFile::path))); + + // scan table filter on new added column + Expression filterExp = and(equal(bucket("data", BUCKETS_NUMBER), 0), startsWith("name", "abc")); + tasks = Lists.newArrayList(table.newScan().filter(filterExp).planFiles().iterator()); + Assert.assertEquals("Should have two task", 1, tasks.size()); + + FileScanTask task = tasks.get(0); + Assert.assertEquals("Should have the correct data file path", dataFileB.path(), task.file().path()); + + Assert.assertEquals("Should have two associated delete files", 2, task.deletes().size()); + Assert.assertEquals("Should have expected delete files", + Sets.newHashSet(posDeleteFileB.path(), eqDeleteFileB.path()), + Sets.newHashSet(Iterables.transform(task.deletes(), ContentFile::path))); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java index 8cc64f0787a4..029889b00389 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java @@ -56,18 +56,19 @@ public class TestManifestListVersions { private static final long EXISTING_ROWS = 857273L; private static final int DELETED_FILES = 1; private static final long DELETED_ROWS = 22910L; + private static final int SCHEMA_ID = 1; private static final List PARTITION_SUMMARIES = ImmutableList.of(); private static final ByteBuffer KEY_METADATA = null; private static final ManifestFile TEST_MANIFEST = new GenericManifestFile( PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - PARTITION_SUMMARIES, KEY_METADATA); + PARTITION_SUMMARIES, KEY_METADATA, SCHEMA_ID); private static final ManifestFile TEST_DELETE_MANIFEST = new GenericManifestFile( PATH, LENGTH, SPEC_ID, ManifestContent.DELETES, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - PARTITION_SUMMARIES, KEY_METADATA); + PARTITION_SUMMARIES, KEY_METADATA, SCHEMA_ID); @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -99,6 +100,7 @@ public void testV1Write() throws IOException { Assert.assertEquals("Added rows count", ADDED_ROWS, (long) manifest.addedRowsCount()); Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) manifest.existingRowsCount()); Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) manifest.deletedRowsCount()); + Assert.assertEquals("Schema id", -1, manifest.schemaId()); } @Test @@ -119,6 +121,7 @@ public void testV2Write() throws IOException { Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) manifest.existingRowsCount()); Assert.assertEquals("Deleted files count", DELETED_FILES, (int) manifest.deletedFilesCount()); Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) manifest.deletedRowsCount()); + Assert.assertEquals("Schema id", SCHEMA_ID, manifest.schemaId()); } @Test @@ -225,7 +228,7 @@ public void testManifestsPartitionSummary() throws IOException { ManifestFile manifest = new GenericManifestFile( PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - partitionFieldSummaries, KEY_METADATA); + partitionFieldSummaries, KEY_METADATA, -1); InputFile manifestList = writeManifestList(manifest, 2); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index 8061e33f5efa..3fa50af66026 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.iceberg.ManifestEntry.Status; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -128,7 +129,7 @@ public void testDataFilePositions() throws IOException { long expectedPos = 0L; for (DataFile file : reader) { Assert.assertEquals("Position should match", (Long) expectedPos, file.pos()); - Assert.assertEquals("Position from field index should match", expectedPos, ((BaseFile) file).get(17)); + Assert.assertEquals("Position from field index should match", expectedPos, ((BaseFile) file).get(18)); expectedPos += 1; } } @@ -142,9 +143,41 @@ public void testDeleteFilePositions() throws IOException { long expectedPos = 0L; for (DeleteFile file : reader) { Assert.assertEquals("Position should match", (Long) expectedPos, file.pos()); - Assert.assertEquals("Position from field index should match", expectedPos, ((BaseFile) file).get(17)); + Assert.assertEquals("Position from field index should match", expectedPos, ((BaseFile) file).get(18)); expectedPos += 1; } } } + + @Test + public void testManifestReaderWithSchemaEvaluation() throws IOException { + table.updateSchema().addColumn("name", Types.StringType.get()).commit(); + table.refresh(); + + DataFile dataFile0 = createDataFile("0", -1); + DataFile dataFile1 = createDataFile("1", 0); + DataFile dataFile2 = createDataFile("2", 0); + DataFile dataFile3 = createDataFile("3", 1); + DataFile dataFile4 = createDataFile("4", 1); + + ManifestFile manifest = writeManifest(1000L, dataFile0, dataFile1, dataFile2, dataFile3, dataFile4); + Expression rowFilter = Expressions.and(Expressions.equal("id", 0), Expressions.equal("name", "abc")); + try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO) + .filterRows(rowFilter).evaluateSchema(table.schemas())) { + List files = Streams.stream(reader).map(file -> file.path().toString()).collect(Collectors.toList()); + + Assert.assertEquals("Should read the expected files", + Lists.newArrayList(dataFile0.path(), dataFile3.path(), dataFile4.path()), files); + } + } + + DataFile createDataFile(String fileName, int schemaId) { + return DataFiles.builder(SPEC) + .withPath(String.format("/path/tp/data-%s.parquet", fileName)) + .withPartitionPath("data_bucket=0") + .withFileSizeInBytes(10) + .withRecordCount(1) + .withSchemaId(schemaId) + .build(); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java index a6866465e6a8..42dd0ffc96a7 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java @@ -108,11 +108,42 @@ public void testWriteManifestWithSequenceNumber() throws IOException { } } + @Test + public void testWriteManifestWithSchemaId() throws IOException { + ManifestFile manifest = writeManifest( + "manifest-without-schema-id.avro", + manifestEntry(Status.ADDED, null, newFile(10, TestHelpers.Row.of(1))), + manifestEntry(Status.EXISTING, null, newFile(15, TestHelpers.Row.of(2))), + manifestEntry(Status.DELETED, null, newFile(2, TestHelpers.Row.of(3)))); + + Assert.assertEquals("Default schema Id should be -1", -1, manifest.schemaId()); + + manifest = writeManifest( + "entries-with-different-schema-id.avro", + manifestEntry(Status.ADDED, null, newFile(10, TestHelpers.Row.of(1), -1)), + manifestEntry(Status.EXISTING, null, newFile(15, TestHelpers.Row.of(2), 0)), + manifestEntry(Status.DELETED, null, newFile(2, TestHelpers.Row.of(3), 1))); + + Assert.assertEquals("The schema ID should be -1 when entries have different schema Id", -1, manifest.schemaId()); + + manifest = writeManifest( + "entries-with-same-schema-id.avro", + manifestEntry(Status.ADDED, null, newFile(10, TestHelpers.Row.of(1), 1)), + manifestEntry(Status.EXISTING, null, newFile(15, TestHelpers.Row.of(2), 1)), + manifestEntry(Status.DELETED, null, newFile(2, TestHelpers.Row.of(3), 1))); + + Assert.assertEquals("The schema ID should be same when entries have same schema Id", 1, manifest.schemaId()); + } + private DataFile newFile(long recordCount) { return newFile(recordCount, null); } private DataFile newFile(long recordCount, StructLike partition) { + return newFile(recordCount, partition, -1); + } + + private DataFile newFile(long recordCount, StructLike partition, int schemaId) { String fileName = UUID.randomUUID().toString(); DataFiles.Builder builder = DataFiles.builder(SPEC) .withPath("data_bucket=0/" + fileName + ".parquet") @@ -121,6 +152,8 @@ private DataFile newFile(long recordCount, StructLike partition) { if (partition != null) { builder.withPartition(partition); } + + builder.withSchemaId(schemaId); return builder.build(); } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 79aa35db974c..adc3106254da 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -72,13 +72,13 @@ public class TestManifestWriterVersions { private static final Integer SORT_ORDER_ID = 2; private static final DataFile DATA_FILE = new GenericDataFile( - 0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS, SORT_ORDER_ID); + -1, 0, PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS, SORT_ORDER_ID); private static final List EQUALITY_IDS = ImmutableList.of(1); private static final int[] EQUALITY_ID_ARR = new int[] { 1 }; - private static final DeleteFile DELETE_FILE = new GenericDeleteFile( - 0, FileContent.EQUALITY_DELETES, PATH, FORMAT, PARTITION, 22905L, METRICS, EQUALITY_ID_ARR, SORT_ORDER_ID, null); + private static final DeleteFile DELETE_FILE = new GenericDeleteFile(-1, 0, FileContent.EQUALITY_DELETES, + PATH, FORMAT, PARTITION, 22905L, METRICS, EQUALITY_ID_ARR, SORT_ORDER_ID, null); @Rule public TemporaryFolder temp = new TemporaryFolder(); diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 3791d348a845..95d6ec4d775f 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -53,6 +53,7 @@ public abstract class BaseFileWriterFactory implements FileWriterFactory { private final Schema equalityDeleteRowSchema; private final SortOrder equalityDeleteSortOrder; private final Schema positionDeleteRowSchema; + private final int schemaId; protected BaseFileWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, SortOrder dataSortOrder, FileFormat deleteFileFormat, @@ -67,6 +68,7 @@ protected BaseFileWriterFactory(Table table, FileFormat dataFileFormat, Schema d this.equalityDeleteRowSchema = equalityDeleteRowSchema; this.equalityDeleteSortOrder = equalityDeleteSortOrder; this.positionDeleteRowSchema = positionDeleteRowSchema; + this.schemaId = table.schema().schemaId(); } protected abstract void configureDataWrite(Avro.DataWriteBuilder builder); @@ -99,6 +101,7 @@ public DataWriter newDataWriter(EncryptedOutputFile file, PartitionSpec spec, .withPartition(partition) .withKeyMetadata(keyMetadata) .withSortOrder(dataSortOrder) + .withSchemaId(schemaId) .overwrite(); configureDataWrite(avroBuilder); @@ -114,6 +117,7 @@ public DataWriter newDataWriter(EncryptedOutputFile file, PartitionSpec spec, .withPartition(partition) .withKeyMetadata(keyMetadata) .withSortOrder(dataSortOrder) + .withSchemaId(schemaId) .overwrite(); configureDataWrite(parquetBuilder); @@ -129,6 +133,7 @@ public DataWriter newDataWriter(EncryptedOutputFile file, PartitionSpec spec, .withPartition(partition) .withKeyMetadata(keyMetadata) .withSortOrder(dataSortOrder) + .withSchemaId(schemaId) .overwrite(); configureDataWrite(orcBuilder); @@ -163,6 +168,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter(EncryptedOutputFile file, .withPartition(partition) .withKeyMetadata(keyMetadata) .withSortOrder(equalityDeleteSortOrder) + .withSchemaId(schemaId) .overwrite(); configureEqualityDelete(avroBuilder); @@ -179,6 +185,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter(EncryptedOutputFile file, .withPartition(partition) .withKeyMetadata(keyMetadata) .withSortOrder(equalityDeleteSortOrder) + .withSchemaId(schemaId) .overwrite(); configureEqualityDelete(parquetBuilder); @@ -195,6 +202,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter(EncryptedOutputFile file, .withPartition(partition) .withKeyMetadata(keyMetadata) .withSortOrder(equalityDeleteSortOrder) + .withSchemaId(schemaId) .overwrite(); configureEqualityDelete(orcBuilder); @@ -227,6 +235,7 @@ public PositionDeleteWriter newPositionDeleteWriter(EncryptedOutputFile file, .withSpec(spec) .withPartition(partition) .withKeyMetadata(keyMetadata) + .withSchemaId(schemaId) .overwrite(); configurePositionDelete(avroBuilder); @@ -241,6 +250,7 @@ public PositionDeleteWriter newPositionDeleteWriter(EncryptedOutputFile file, .withSpec(spec) .withPartition(partition) .withKeyMetadata(keyMetadata) + .withSchemaId(schemaId) .overwrite(); configurePositionDelete(parquetBuilder); @@ -255,6 +265,7 @@ public PositionDeleteWriter newPositionDeleteWriter(EncryptedOutputFile file, .withSpec(spec) .withPartition(partition) .withKeyMetadata(keyMetadata) + .withSchemaId(schemaId) .overwrite(); configurePositionDelete(orcBuilder); diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 135fa84ee94a..28a56a0753c5 100644 --- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -755,7 +755,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { private ManifestFile createTestingManifestFile(Path manifestPath) { return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0, - ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null, null); + ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null, null, -1); } private List assertFlinkManifests(int expectedCount) throws IOException { diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 78ffe72c2dcd..8c531d8d7293 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -765,7 +765,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { private ManifestFile createTestingManifestFile(Path manifestPath) { return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0, - ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null, null); + ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null, null, -1); } private List assertFlinkManifests(int expectedCount) throws IOException { diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index d1fb23720bb3..2d5acf055c3a 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -761,7 +761,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { private ManifestFile createTestingManifestFile(Path manifestPath) { return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0, - ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null, null); + ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null, null, -1); } private List assertFlinkManifests(int expectedCount) throws IOException { diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 6621502af92f..75d1b6c68d8c 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -324,6 +324,7 @@ public static class DataWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private SortOrder sortOrder = null; + private int schemaId = -1; private DataWriteBuilder(OutputFile file) { this.appenderBuilder = write(file); @@ -335,6 +336,7 @@ public DataWriteBuilder forTable(Table table) { withSpec(table.spec()); setAll(table.properties()); metricsConfig(MetricsConfig.forTable(table)); + withSchemaId(table.schema().schemaId()); return this; } @@ -397,13 +399,19 @@ public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public DataWriteBuilder withSchemaId(int newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public DataWriter build() { Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, "Partition must not be null when creating data writer for partitioned spec"); FileAppender fileAppender = appenderBuilder.build(); - return new DataWriter<>(fileAppender, FileFormat.ORC, location, spec, partition, keyMetadata, sortOrder); + return new DataWriter<>( + fileAppender, FileFormat.ORC, location, spec, partition, keyMetadata, sortOrder, schemaId); } } @@ -421,6 +429,7 @@ public static class DeleteWriteBuilder { private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; private SortOrder sortOrder; + private int schemaId = -1; private Function pathTransformFunc = Function.identity(); private DeleteWriteBuilder(OutputFile file) { @@ -433,6 +442,7 @@ public DeleteWriteBuilder forTable(Table table) { withSpec(table.spec()); setAll(table.properties()); metricsConfig(MetricsConfig.forTable(table)); + withSchemaId(table.schema().schemaId()); return this; } @@ -510,6 +520,11 @@ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public DeleteWriteBuilder withSchemaId(int newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public EqualityDeleteWriter buildEqualityWriter() { Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema"); Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); @@ -531,7 +546,7 @@ public EqualityDeleteWriter buildEqualityWriter() { return new EqualityDeleteWriter<>( appenderBuilder.build(), FileFormat.ORC, location, spec, partition, keyMetadata, - sortOrder, equalityFieldIds); + sortOrder, schemaId, equalityFieldIds); } public PositionDeleteWriter buildPositionWriter() { @@ -561,7 +576,7 @@ public PositionDeleteWriter buildPositionWriter() { appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); return new PositionDeleteWriter<>( - appenderBuilder.build(), FileFormat.ORC, location, spec, partition, keyMetadata); + appenderBuilder.build(), FileFormat.ORC, location, spec, partition, keyMetadata, schemaId); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 2db28311a5c9..7ca98883a87a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -438,6 +438,7 @@ public static class DataWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private SortOrder sortOrder = null; + private int schemaId = -1; private DataWriteBuilder(OutputFile file) { this.appenderBuilder = write(file); @@ -449,6 +450,7 @@ public DataWriteBuilder forTable(Table table) { withSpec(table.spec()); setAll(table.properties()); metricsConfig(MetricsConfig.forTable(table)); + withSchemaId(table.schema().schemaId()); return this; } @@ -511,13 +513,19 @@ public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public DataWriteBuilder withSchemaId(int newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public DataWriter build() throws IOException { Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, "Partition must not be null when creating data writer for partitioned spec"); FileAppender fileAppender = appenderBuilder.build(); - return new DataWriter<>(fileAppender, FileFormat.PARQUET, location, spec, partition, keyMetadata, sortOrder); + return new DataWriter<>( + fileAppender, FileFormat.PARQUET, location, spec, partition, keyMetadata, sortOrder, schemaId); } } @@ -535,6 +543,7 @@ public static class DeleteWriteBuilder { private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; private SortOrder sortOrder; + private int schemaId = -1; private Function pathTransformFunc = Function.identity(); private DeleteWriteBuilder(OutputFile file) { @@ -547,6 +556,7 @@ public DeleteWriteBuilder forTable(Table table) { withSpec(table.spec()); setAll(table.properties()); metricsConfig(MetricsConfig.forTable(table)); + withSchemaId(table.schema().schemaId()); return this; } @@ -624,6 +634,11 @@ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public DeleteWriteBuilder withSchemaId(int newSchemaId) { + this.schemaId = newSchemaId; + return this; + } + public EqualityDeleteWriter buildEqualityWriter() throws IOException { Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema"); Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); @@ -646,7 +661,7 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { return new EqualityDeleteWriter<>( appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata, - sortOrder, equalityFieldIds); + sortOrder, schemaId, equalityFieldIds); } public PositionDeleteWriter buildPositionWriter() throws IOException { @@ -684,7 +699,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); return new PositionDeleteWriter<>( - appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata); + appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata, schemaId); } } diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index a6390d39c575..464690bf1526 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -48,6 +48,7 @@ public class SparkDataFile implements DataFile { private final int keyMetadataPosition; private final int splitOffsetsPosition; private final int sortOrderIdPosition; + private final int schemaIdPosition; private final Type lowerBoundsType; private final Type upperBoundsType; private final Type keyMetadataType; @@ -81,6 +82,7 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { keyMetadataPosition = positions.get("key_metadata"); splitOffsetsPosition = positions.get("split_offsets"); sortOrderIdPosition = positions.get("sort_order_id"); + schemaIdPosition = positions.get("schema_id"); } public SparkDataFile wrap(Row row) { @@ -184,6 +186,11 @@ public Integer sortOrderId() { return wrapped.getAs(sortOrderIdPosition); } + @Override + public int schemaId() { + return wrapped.getAs(schemaIdPosition); + } + private int fieldPosition(String name, StructType sparkType) { try { return sparkType.fieldIndex(name); diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java index 0837fb7d39e4..799465a7ddbc 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java @@ -137,6 +137,11 @@ public ByteBuffer keyMetadata() { return null; } + @Override + public int schemaId() { + return -1; + } + @Override public ManifestFile copy() { throw new UnsupportedOperationException("Cannot copy"); diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index a6390d39c575..464690bf1526 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -48,6 +48,7 @@ public class SparkDataFile implements DataFile { private final int keyMetadataPosition; private final int splitOffsetsPosition; private final int sortOrderIdPosition; + private final int schemaIdPosition; private final Type lowerBoundsType; private final Type upperBoundsType; private final Type keyMetadataType; @@ -81,6 +82,7 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { keyMetadataPosition = positions.get("key_metadata"); splitOffsetsPosition = positions.get("split_offsets"); sortOrderIdPosition = positions.get("sort_order_id"); + schemaIdPosition = positions.get("schema_id"); } public SparkDataFile wrap(Row row) { @@ -184,6 +186,11 @@ public Integer sortOrderId() { return wrapped.getAs(sortOrderIdPosition); } + @Override + public int schemaId() { + return wrapped.getAs(schemaIdPosition); + } + private int fieldPosition(String name, StructType sparkType) { try { return sparkType.fieldIndex(name); diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java index 0837fb7d39e4..799465a7ddbc 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java @@ -137,6 +137,11 @@ public ByteBuffer keyMetadata() { return null; } + @Override + public int schemaId() { + return -1; + } + @Override public ManifestFile copy() { throw new UnsupportedOperationException("Cannot copy"); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index a6390d39c575..464690bf1526 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -48,6 +48,7 @@ public class SparkDataFile implements DataFile { private final int keyMetadataPosition; private final int splitOffsetsPosition; private final int sortOrderIdPosition; + private final int schemaIdPosition; private final Type lowerBoundsType; private final Type upperBoundsType; private final Type keyMetadataType; @@ -81,6 +82,7 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { keyMetadataPosition = positions.get("key_metadata"); splitOffsetsPosition = positions.get("split_offsets"); sortOrderIdPosition = positions.get("sort_order_id"); + schemaIdPosition = positions.get("schema_id"); } public SparkDataFile wrap(Row row) { @@ -184,6 +186,11 @@ public Integer sortOrderId() { return wrapped.getAs(sortOrderIdPosition); } + @Override + public int schemaId() { + return wrapped.getAs(schemaIdPosition); + } + private int fieldPosition(String name, StructType sparkType) { try { return sparkType.fieldIndex(name); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java index 0837fb7d39e4..799465a7ddbc 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java @@ -137,6 +137,11 @@ public ByteBuffer keyMetadata() { return null; } + @Override + public int schemaId() { + return -1; + } + @Override public ManifestFile copy() { throw new UnsupportedOperationException("Cannot copy"); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index a6390d39c575..464690bf1526 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -48,6 +48,7 @@ public class SparkDataFile implements DataFile { private final int keyMetadataPosition; private final int splitOffsetsPosition; private final int sortOrderIdPosition; + private final int schemaIdPosition; private final Type lowerBoundsType; private final Type upperBoundsType; private final Type keyMetadataType; @@ -81,6 +82,7 @@ public SparkDataFile(Types.StructType type, StructType sparkType) { keyMetadataPosition = positions.get("key_metadata"); splitOffsetsPosition = positions.get("split_offsets"); sortOrderIdPosition = positions.get("sort_order_id"); + schemaIdPosition = positions.get("schema_id"); } public SparkDataFile wrap(Row row) { @@ -184,6 +186,11 @@ public Integer sortOrderId() { return wrapped.getAs(sortOrderIdPosition); } + @Override + public int schemaId() { + return wrapped.getAs(schemaIdPosition); + } + private int fieldPosition(String name, StructType sparkType) { try { return sparkType.fieldIndex(name); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java index 269130496dc9..9a640f66ce78 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java @@ -146,6 +146,11 @@ public ByteBuffer keyMetadata() { return null; } + @Override + public int schemaId() { + return -1; + } + @Override public ManifestFile copy() { throw new UnsupportedOperationException("Cannot copy");