Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private DeleteFileIndex planDeletesLocally(List<ManifestFile> deleteManifests) {
}

return builder
.specsById(table().specs())
.specsById(specs())
.filterData(filter())
.caseSensitive(isCaseSensitive())
.scanMetrics(scanMetrics())
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/DataScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected ManifestGroup newManifestGroup(
.caseSensitive(isCaseSensitive())
.select(withColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
.filterData(filter())
.specsById(table().specs())
.specsById(specs())
.scanMetrics(scanMetrics())
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
.caseSensitive(isCaseSensitive())
.select(scanColumns())
.filterData(filter())
.specsById(table().specs())
.specsById(specs())
.scanMetrics(scanMetrics())
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.metrics.Timer;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.TypeUtil;
Expand Down Expand Up @@ -79,6 +80,27 @@ protected ScanMetrics scanMetrics() {
return scanMetrics;
}

protected Map<Integer, PartitionSpec> specs() {
Map<Integer, PartitionSpec> specs = table().specs();
// requires latest schema
if (!useSnapshotSchema()
|| snapshotId() == null
|| table().currentSnapshot() == null
|| snapshotId().equals(table().currentSnapshot().snapshotId())) {
return specs;
}

// this is a time travel request
Schema snapshotSchema = tableSchema();
ImmutableMap.Builder<Integer, PartitionSpec> newSpecs =
ImmutableMap.builderWithExpectedSize(specs.size());
for (Map.Entry<Integer, PartitionSpec> entry : specs.entrySet()) {
newSpecs.put(entry.getKey(), entry.getValue().toUnbound().bind(snapshotSchema));
}

return newSpecs.build();
}

public ThisT useSnapshot(long scanSnapshotId) {
Preconditions.checkArgument(
snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
Expand Down
142 changes: 138 additions & 4 deletions core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.RandomAvroData;
Expand Down Expand Up @@ -65,21 +66,26 @@ protected static List<Integer> formatVersions() {
@TempDir private File temp;

private DataFile createDataFile(String partValue) throws IOException {
List<GenericData.Record> expected = RandomAvroData.generate(SCHEMA, 100, 0L);
return createDataFile(partValue, SCHEMA, SPEC);
}

private DataFile createDataFile(String partValue, Schema schema, PartitionSpec spec)
throws IOException {
List<GenericData.Record> expected = RandomAvroData.generate(schema, 100, 0L);

OutputFile dataFile =
new InMemoryOutputFile(FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
try (FileAppender<GenericData.Record> writer =
Avro.write(dataFile).schema(SCHEMA).named("test").build()) {
Avro.write(dataFile).schema(schema).named("test").build()) {
for (GenericData.Record rec : expected) {
rec.put("part", partValue); // create just one partition
writer.add(rec);
}
}

PartitionData partition = new PartitionData(SPEC.partitionType());
PartitionData partition = new PartitionData(spec.partitionType());
partition.set(0, partValue);
return DataFiles.builder(SPEC)
return DataFiles.builder(spec)
.withInputFile(dataFile.toInputFile())
.withPartition(partition)
.withRecordCount(100)
Expand All @@ -99,6 +105,7 @@ public void testPartitionSourceRename() throws IOException {
DataFile fileTwo = createDataFile("two");

table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
long firstSnapshotId = table.currentSnapshot().snapshotId();

List<FileScanTask> tasks =
Lists.newArrayList(table.newScan().filter(Expressions.equal("part", "one")).planFiles());
Expand All @@ -111,6 +118,133 @@ public void testPartitionSourceRename() throws IOException {
tasks = Lists.newArrayList(table.newScan().filter(Expressions.equal("p", "one")).planFiles());

assertThat(tasks).hasSize(1);

// create a new commit
table.newAppend().appendFile(createDataFile("three")).commit();

// use fiter with previous partition name
tasks =
Lists.newArrayList(
table
.newScan()
.useSnapshot(firstSnapshotId)
.filter(Expressions.equal("part", "one"))
.planFiles());

assertThat(tasks).hasSize(1);
}

@TestTemplate
public void testPartitionSourceDrop() throws IOException {
Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion);

DataFile fileOne = createDataFile("one");
DataFile fileTwo = createDataFile("two");

table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
long firstSnapshotId = table.currentSnapshot().snapshotId();

table.updateSpec().addField("id").commit();

List<FileScanTask> tasks =
Lists.newArrayList(
table.newScan().filter(Expressions.not(Expressions.isNull("id"))).planFiles());

assertThat(tasks).hasSize(2);

DataFile fileThree = createDataFile("three", table.schema(), table.spec());
table.newAppend().appendFile(fileThree).commit();

// remove one field from spec and drop the column
table.updateSpec().removeField("id").commit();
table.updateSchema().deleteColumn("id").commit();

List<FileScanTask> tasksAtFirstSnapshotId =
Lists.newArrayList(
table
.newScan()
.useSnapshot(firstSnapshotId)
.filter(Expressions.not(Expressions.isNull("id")))
.planFiles());

assertThat(
tasksAtFirstSnapshotId.stream()
.map(ContentScanTask::file)
.map(ContentFile::location)
.collect(Collectors.toList()))
.isEqualTo(
tasks.stream()
.map(ContentScanTask::file)
.map(ContentFile::location)
.collect(Collectors.toList()));
}

@TestTemplate
public void testColumnRename() throws IOException {
Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion);

DataFile fileOne = createDataFile("one");
DataFile fileTwo = createDataFile("two");

table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
long firstSnapshotId = table.currentSnapshot().snapshotId();

table.updateSchema().renameColumn("data", "renamed_data").commit();

DataFile fileThree = createDataFile("three", table.schema(), table.spec());
table.newAppend().appendFile(fileThree).commit();
long secondSnapshotId = table.currentSnapshot().snapshotId();

// generate a new commit
DataFile fileFour = createDataFile("four", table.schema(), table.spec());
table.newAppend().appendFile(fileFour).commit();

// running successfully with the new filter on previous column name
List<FileScanTask> tasks =
Lists.newArrayList(
table
.newScan()
.useSnapshot(firstSnapshotId)
.filter(Expressions.equal("data", "xyz"))
.planFiles());
assertThat(tasks).hasSize(2);

// running successfully with the new filter on renamed column name
tasks =
Lists.newArrayList(
table
.newScan()
.useSnapshot(secondSnapshotId)
.filter(Expressions.equal("renamed_data", "xyz"))
.planFiles());
assertThat(tasks).hasSize(3);
}

@TestTemplate
public void testColumnDrop() throws IOException {
Table table = TestTables.create(temp, "test", SCHEMA, SPEC, formatVersion);

DataFile fileOne = createDataFile("one");
DataFile fileTwo = createDataFile("two");

table.newAppend().appendFile(fileOne).appendFile(fileTwo).commit();
long firstSnapshotId = table.currentSnapshot().snapshotId();

table.updateSchema().deleteColumn("data").commit();

// make sure generating a new commit after dropping a column
DataFile fileThree = createDataFile("three", table.schema(), table.spec());
table.newAppend().appendFile(fileThree).commit();

// running successfully with the new filter on previous column name
List<FileScanTask> tasks =
Lists.newArrayList(
table
.newScan()
.useSnapshot(firstSnapshotId)
.filter(Expressions.equal("data", "xyz"))
.planFiles());
assertThat(tasks).hasSize(2);
}

@TestTemplate
Expand Down