diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index 75d44936913a..b0ec879bda8d 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -409,49 +409,51 @@ public static Schema schemaFor(Table table, Long snapshotId, Long timestampMilli } /** - * Return the schema of the snapshot at a given branch. + * Return the schema of the snapshot at a given ref. * - *

If branch does not exist, the table schema is returned because it will be the schema when - * the new branch is created. + *

If the ref does not exist or the ref is a branch, the table schema is returned because it + * will be the schema when the new branch is created. If the ref is a tag, then the snapshot + * schema is returned. * * @param table a {@link Table} - * @param branch branch name of the table (nullable) - * @return schema of the specific snapshot at the given branch + * @param ref ref name of the table (nullable) + * @return schema of the specific snapshot at the given ref */ - public static Schema schemaFor(Table table, String branch) { - if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) { + public static Schema schemaFor(Table table, String ref) { + if (ref == null || ref.equals(SnapshotRef.MAIN_BRANCH)) { return table.schema(); } - Snapshot ref = table.snapshot(branch); - if (ref == null) { + SnapshotRef snapshotRef = table.refs().get(ref); + if (null == snapshotRef || snapshotRef.isBranch()) { return table.schema(); } - return schemaFor(table, ref.snapshotId()); + return schemaFor(table, snapshotRef.snapshotId()); } /** - * Return the schema of the snapshot at a given branch. + * Return the schema of the snapshot at a given ref. * - *

If branch does not exist, the table schema is returned because it will be the schema when - * the new branch is created. + *

If the ref does not exist or the ref is a branch, the table schema is returned because it + * will be the schema when the new branch is created. If the ref is a tag, then the snapshot + * schema is returned. * * @param metadata a {@link TableMetadata} - * @param branch branch name of the table (nullable) + * @param ref ref name of the table (nullable) * @return schema of the specific snapshot at the given branch */ - public static Schema schemaFor(TableMetadata metadata, String branch) { - if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) { + public static Schema schemaFor(TableMetadata metadata, String ref) { + if (ref == null || ref.equals(SnapshotRef.MAIN_BRANCH)) { return metadata.schema(); } - SnapshotRef ref = metadata.ref(branch); - if (ref == null) { + SnapshotRef snapshotRef = metadata.ref(ref); + if (snapshotRef == null || snapshotRef.isBranch()) { return metadata.schema(); } - Snapshot snapshot = metadata.snapshot(ref.snapshotId()); + Snapshot snapshot = metadata.snapshot(snapshotRef.snapshotId()); return metadata.schemas().get(snapshot.schemaId()); } diff --git a/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java b/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java index 576df47e7b2f..db6be5bcfe48 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.util; +import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; @@ -187,4 +188,68 @@ private void expectedSnapshots(long[] snapshotIdExpected, Iterable sna .toArray(); assertThat(actualSnapshots).isEqualTo(snapshotIdExpected); } + + @Test + public void schemaForRef() { + Schema initialSchema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get())); + assertThat(table.schema().asStruct()).isEqualTo(initialSchema.asStruct()); + + assertThat(SnapshotUtil.schemaFor(table, null).asStruct()).isEqualTo(initialSchema.asStruct()); + assertThat(SnapshotUtil.schemaFor(table, "non-existing-ref").asStruct()) + .isEqualTo(initialSchema.asStruct()); + assertThat(SnapshotUtil.schemaFor(table, SnapshotRef.MAIN_BRANCH).asStruct()) + .isEqualTo(initialSchema.asStruct()); + } + + @Test + public void schemaForBranch() { + Schema initialSchema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get())); + assertThat(table.schema().asStruct()).isEqualTo(initialSchema.asStruct()); + + String branch = "branch"; + table.manageSnapshots().createBranch(branch).commit(); + + assertThat(SnapshotUtil.schemaFor(table, branch).asStruct()) + .isEqualTo(initialSchema.asStruct()); + + table.updateSchema().addColumn("zip", Types.IntegerType.get()).commit(); + Schema expected = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + optional(3, "zip", Types.IntegerType.get())); + + assertThat(table.schema().asStruct()).isEqualTo(expected.asStruct()); + assertThat(SnapshotUtil.schemaFor(table, branch).asStruct()).isEqualTo(expected.asStruct()); + } + + @Test + public void schemaForTag() { + Schema initialSchema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get())); + assertThat(table.schema().asStruct()).isEqualTo(initialSchema.asStruct()); + + String tag = "tag"; + table.manageSnapshots().createTag(tag, table.currentSnapshot().snapshotId()).commit(); + + assertThat(SnapshotUtil.schemaFor(table, tag).asStruct()).isEqualTo(initialSchema.asStruct()); + + table.updateSchema().addColumn("zip", Types.IntegerType.get()).commit(); + Schema expected = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + optional(3, "zip", Types.IntegerType.get())); + + assertThat(table.schema().asStruct()).isEqualTo(expected.asStruct()); + assertThat(SnapshotUtil.schemaFor(table, tag).asStruct()).isEqualTo(initialSchema.asStruct()); + } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 276fbcd592ae..a161224275ca 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -31,10 +31,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.assertj.core.api.Assertions; import org.junit.AfterClass; @@ -402,16 +404,104 @@ public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { // Deleting a column to indicate schema change table.updateSchema().deleteColumn("data").commit(); - // The data should have the deleted column as it was captured in an earlier snapshot. - Dataset deletedColumnBranchSnapshotResult = + // The data should not have the deleted column + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .orderBy("id") + .collectAsList()) + .containsExactly(RowFactory.create(1), RowFactory.create(2), RowFactory.create(3)); + + // re-introducing the column should not let the data re-appear + table.updateSchema().addColumn("data", Types.StringType.get()).commit(); + + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList()) + .containsExactly( + new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null)); + } + + @Test + public void testWritingToBranchAfterSchemaChange() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + + Dataset branchSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); - List deletedColumnBranchSnapshotRecords = - deletedColumnBranchSnapshotResult - .orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); + List branchSnapshotRecords = + branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); Assert.assertEquals( - "Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords); + "Current snapshot rows should match", expectedRecords, branchSnapshotRecords); + + // Deleting and add a new column of the same type to indicate schema change + table.updateSchema().deleteColumn("data").addColumn("zip", Types.IntegerType.get()).commit(); + + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .orderBy("id") + .collectAsList()) + .containsExactly( + RowFactory.create(1, null), RowFactory.create(2, null), RowFactory.create(3, null)); + + // writing new records into the branch should work with the new column + List records = + Lists.newArrayList( + RowFactory.create(4, 12345), RowFactory.create(5, 54321), RowFactory.create(6, 67890)); + + Dataset dataFrame = + spark.createDataFrame( + records, + SparkSchemaUtil.convert( + new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "zip", Types.IntegerType.get())))); + dataFrame + .select("id", "zip") + .write() + .format("iceberg") + .option("branch", "branch") + .mode("append") + .save(tableLocation); + + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .collectAsList()) + .hasSize(6) + .contains( + RowFactory.create(1, null), RowFactory.create(2, null), RowFactory.create(3, null)) + .containsAll(records); } @Test diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index f1374c050d5c..645afd4542e4 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -37,10 +37,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.assertj.core.api.Assertions; import org.junit.AfterClass; @@ -425,16 +427,104 @@ public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { // Deleting a column to indicate schema change table.updateSchema().deleteColumn("data").commit(); - // The data should have the deleted column as it was captured in an earlier snapshot. - Dataset deletedColumnBranchSnapshotResult = + // The data should not have the deleted column + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .orderBy("id") + .collectAsList()) + .containsExactly(RowFactory.create(1), RowFactory.create(2), RowFactory.create(3)); + + // re-introducing the column should not let the data re-appear + table.updateSchema().addColumn("data", Types.StringType.get()).commit(); + + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList()) + .containsExactly( + new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null)); + } + + @Test + public void testWritingToBranchAfterSchemaChange() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, properties, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + + Dataset branchSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); - List deletedColumnBranchSnapshotRecords = - deletedColumnBranchSnapshotResult - .orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); + List branchSnapshotRecords = + branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); Assert.assertEquals( - "Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords); + "Current snapshot rows should match", expectedRecords, branchSnapshotRecords); + + // Deleting and add a new column of the same type to indicate schema change + table.updateSchema().deleteColumn("data").addColumn("zip", Types.IntegerType.get()).commit(); + + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .orderBy("id") + .collectAsList()) + .containsExactly( + RowFactory.create(1, null), RowFactory.create(2, null), RowFactory.create(3, null)); + + // writing new records into the branch should work with the new column + List records = + Lists.newArrayList( + RowFactory.create(4, 12345), RowFactory.create(5, 54321), RowFactory.create(6, 67890)); + + Dataset dataFrame = + spark.createDataFrame( + records, + SparkSchemaUtil.convert( + new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "zip", Types.IntegerType.get())))); + dataFrame + .select("id", "zip") + .write() + .format("iceberg") + .option("branch", "branch") + .mode("append") + .save(tableLocation); + + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .collectAsList()) + .hasSize(6) + .contains( + RowFactory.create(1, null), RowFactory.create(2, null), RowFactory.create(3, null)) + .containsAll(records); } @Test diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index f1374c050d5c..645afd4542e4 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -37,10 +37,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.assertj.core.api.Assertions; import org.junit.AfterClass; @@ -425,16 +427,104 @@ public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException { // Deleting a column to indicate schema change table.updateSchema().deleteColumn("data").commit(); - // The data should have the deleted column as it was captured in an earlier snapshot. - Dataset deletedColumnBranchSnapshotResult = + // The data should not have the deleted column + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .orderBy("id") + .collectAsList()) + .containsExactly(RowFactory.create(1), RowFactory.create(2), RowFactory.create(3)); + + // re-introducing the column should not let the data re-appear + table.updateSchema().addColumn("data", Types.StringType.get()).commit(); + + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList()) + .containsExactly( + new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null)); + } + + @Test + public void testWritingToBranchAfterSchemaChange() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, properties, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + + Dataset branchSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); - List deletedColumnBranchSnapshotRecords = - deletedColumnBranchSnapshotResult - .orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); + List branchSnapshotRecords = + branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); Assert.assertEquals( - "Current snapshot rows should match", expectedRecords, deletedColumnBranchSnapshotRecords); + "Current snapshot rows should match", expectedRecords, branchSnapshotRecords); + + // Deleting and add a new column of the same type to indicate schema change + table.updateSchema().deleteColumn("data").addColumn("zip", Types.IntegerType.get()).commit(); + + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .orderBy("id") + .collectAsList()) + .containsExactly( + RowFactory.create(1, null), RowFactory.create(2, null), RowFactory.create(3, null)); + + // writing new records into the branch should work with the new column + List records = + Lists.newArrayList( + RowFactory.create(4, 12345), RowFactory.create(5, 54321), RowFactory.create(6, 67890)); + + Dataset dataFrame = + spark.createDataFrame( + records, + SparkSchemaUtil.convert( + new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "zip", Types.IntegerType.get())))); + dataFrame + .select("id", "zip") + .write() + .format("iceberg") + .option("branch", "branch") + .mode("append") + .save(tableLocation); + + Assertions.assertThat( + spark + .read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation) + .collectAsList()) + .hasSize(6) + .contains( + RowFactory.create(1, null), RowFactory.create(2, null), RowFactory.create(3, null)) + .containsAll(records); } @Test