diff --git a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java index 89c7f0b6067e..c69f71f2fdec 100644 --- a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java @@ -300,7 +300,7 @@ private DeleteFileIndex planDeletesLocally(List deleteManifests) { } return builder - .specsById(table().specs()) + .specsById(specs()) .filterData(filter()) .caseSensitive(isCaseSensitive()) .scanMetrics(scanMetrics()) diff --git a/core/src/main/java/org/apache/iceberg/DataScan.java b/core/src/main/java/org/apache/iceberg/DataScan.java index 1c48042f52f0..1acbbbf6826a 100644 --- a/core/src/main/java/org/apache/iceberg/DataScan.java +++ b/core/src/main/java/org/apache/iceberg/DataScan.java @@ -53,7 +53,7 @@ protected ManifestGroup newManifestGroup( .caseSensitive(isCaseSensitive()) .select(withColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS) .filterData(filter()) - .specsById(table().specs()) + .specsById(specs()) .scanMetrics(scanMetrics()) .ignoreDeleted() .columnsToKeepStats(columnsToKeepStats()); diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 8463112b7a51..4d23dd525e07 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -74,7 +74,7 @@ public CloseableIterable doPlanFiles() { .caseSensitive(isCaseSensitive()) .select(scanColumns()) .filterData(filter()) - .specsById(table().specs()) + .specsById(specs()) .scanMetrics(scanMetrics()) .ignoreDeleted() .columnsToKeepStats(columnsToKeepStats()); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotScan.java b/core/src/main/java/org/apache/iceberg/SnapshotScan.java index a98a8c9f13b1..8a836b634e70 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotScan.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotScan.java @@ -33,6 +33,7 @@ import org.apache.iceberg.metrics.Timer; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.TypeUtil; @@ -79,6 +80,27 @@ protected ScanMetrics scanMetrics() { return scanMetrics; } + protected Map specs() { + Map specs = table().specs(); + // requires latest schema + if (!useSnapshotSchema() + || snapshotId() == null + || table().currentSnapshot() == null + || snapshotId().equals(table().currentSnapshot().snapshotId())) { + return specs; + } + + // this is a time travel request + Schema snapshotSchema = tableSchema(); + ImmutableMap.Builder newSpecs = + ImmutableMap.builderWithExpectedSize(specs.size()); + for (Map.Entry entry : specs.entrySet()) { + newSpecs.put(entry.getKey(), entry.getValue().toUnbound().bind(snapshotSchema)); + } + + return newSpecs.build(); + } + public ThisT useSnapshot(long scanSnapshotId) { Preconditions.checkArgument( snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId()); diff --git a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java index 790f7c715207..3df370fe6ff4 100644 --- a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java +++ b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.avro.generic.GenericData; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.RandomAvroData; @@ -65,21 +66,26 @@ protected static List formatVersions() { @TempDir private File temp; private DataFile createDataFile(String partValue) throws IOException { - List expected = RandomAvroData.generate(SCHEMA, 100, 0L); + return createDataFile(partValue, SCHEMA, SPEC); + } + + private DataFile createDataFile(String partValue, Schema schema, PartitionSpec spec) + throws IOException { + List expected = RandomAvroData.generate(schema, 100, 0L); OutputFile dataFile = new InMemoryOutputFile(FileFormat.AVRO.addExtension(UUID.randomUUID().toString())); try (FileAppender writer = - Avro.write(dataFile).schema(SCHEMA).named("test").build()) { + Avro.write(dataFile).schema(schema).named("test").build()) { for (GenericData.Record rec : expected) { rec.put("part", partValue); // create just one partition writer.add(rec); } } - PartitionData partition = new PartitionData(SPEC.partitionType()); + PartitionData partition = new PartitionData(spec.partitionType()); partition.set(0, partValue); - return DataFiles.builder(SPEC) + return DataFiles.builder(spec) .withInputFile(dataFile.toInputFile()) .withPartition(partition) .withRecordCount(100) @@ -99,6 +105,7 @@ public void testPartitionSourceRename() throws IOException { DataFile fileTwo = createDataFile("two"); table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); List tasks = Lists.newArrayList(table.newScan().filter(Expressions.equal("part", "one")).planFiles()); @@ -111,6 +118,133 @@ public void testPartitionSourceRename() throws IOException { tasks = Lists.newArrayList(table.newScan().filter(Expressions.equal("p", "one")).planFiles()); assertThat(tasks).hasSize(1); + + // create a new commit + table.newAppend().appendFile(createDataFile("three")).commit(); + + // use fiter with previous partition name + tasks = + Lists.newArrayList( + table + .newScan() + .useSnapshot(firstSnapshotId) + .filter(Expressions.equal("part", "one")) + .planFiles()); + + assertThat(tasks).hasSize(1); + } + + @TestTemplate + public void testPartitionSourceDrop() throws IOException { + Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion); + + DataFile fileOne = createDataFile("one"); + DataFile fileTwo = createDataFile("two"); + + table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.updateSpec().addField("id").commit(); + + List tasks = + Lists.newArrayList( + table.newScan().filter(Expressions.not(Expressions.isNull("id"))).planFiles()); + + assertThat(tasks).hasSize(2); + + DataFile fileThree = createDataFile("three", table.schema(), table.spec()); + table.newAppend().appendFile(fileThree).commit(); + + // remove one field from spec and drop the column + table.updateSpec().removeField("id").commit(); + table.updateSchema().deleteColumn("id").commit(); + + List tasksAtFirstSnapshotId = + Lists.newArrayList( + table + .newScan() + .useSnapshot(firstSnapshotId) + .filter(Expressions.not(Expressions.isNull("id"))) + .planFiles()); + + assertThat( + tasksAtFirstSnapshotId.stream() + .map(ContentScanTask::file) + .map(ContentFile::location) + .collect(Collectors.toList())) + .isEqualTo( + tasks.stream() + .map(ContentScanTask::file) + .map(ContentFile::location) + .collect(Collectors.toList())); + } + + @TestTemplate + public void testColumnRename() throws IOException { + Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion); + + DataFile fileOne = createDataFile("one"); + DataFile fileTwo = createDataFile("two"); + + table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.updateSchema().renameColumn("data", "renamed_data").commit(); + + DataFile fileThree = createDataFile("three", table.schema(), table.spec()); + table.newAppend().appendFile(fileThree).commit(); + long secondSnapshotId = table.currentSnapshot().snapshotId(); + + // generate a new commit + DataFile fileFour = createDataFile("four", table.schema(), table.spec()); + table.newAppend().appendFile(fileFour).commit(); + + // running successfully with the new filter on previous column name + List tasks = + Lists.newArrayList( + table + .newScan() + .useSnapshot(firstSnapshotId) + .filter(Expressions.equal("data", "xyz")) + .planFiles()); + assertThat(tasks).hasSize(2); + + // running successfully with the new filter on renamed column name + tasks = + Lists.newArrayList( + table + .newScan() + .useSnapshot(secondSnapshotId) + .filter(Expressions.equal("renamed_data", "xyz")) + .planFiles()); + assertThat(tasks).hasSize(3); + } + + @TestTemplate + public void testColumnDrop() throws IOException { + Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion); + + DataFile fileOne = createDataFile("one"); + DataFile fileTwo = createDataFile("two"); + + table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.updateSchema().deleteColumn("data").commit(); + + // make sure generating a new commit after dropping a column + DataFile fileThree = createDataFile("three", table.schema(), table.spec()); + table.newAppend().appendFile(fileThree).commit(); + + // running successfully with the new filter on previous column name + List tasks = + Lists.newArrayList( + table + .newScan() + .useSnapshot(firstSnapshotId) + .filter(Expressions.equal("data", "xyz")) + .planFiles()); + assertThat(tasks).hasSize(2); } @TestTemplate