diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index e8906819ab61..b9ccbe3e9b71 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -19,10 +19,6 @@ package org.apache.iceberg; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -37,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +44,6 @@ abstract class BaseTableScan implements TableScan { private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class); - private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); - private final TableOperations ops; private final Table table; private final Schema schema; @@ -132,19 +127,7 @@ public TableScan asOfTime(long timestampMillis) { Preconditions.checkArgument(context.snapshotId() == null, "Cannot override snapshot, already set to id=%s", context.snapshotId()); - Long lastSnapshotId = null; - for (HistoryEntry logEntry : ops.current().snapshotLog()) { - if (logEntry.timestampMillis() <= timestampMillis) { - lastSnapshotId = logEntry.snapshotId(); - } - } - - // the snapshot ID could be null if no entries were older than the requested time. in that case, - // there is no valid snapshot to read. - Preconditions.checkArgument(lastSnapshotId != null, - "Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis)); - - return useSnapshot(lastSnapshotId); + return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis)); } @Override @@ -199,7 +182,7 @@ public CloseableIterable planFiles() { Snapshot snapshot = snapshot(); if (snapshot != null) { LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table, - snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()), + snapshot.snapshotId(), SnapshotUtil.formatTimestampMillis(snapshot.timestampMillis()), context.rowFilter()); Listeners.notifyAll( @@ -304,8 +287,4 @@ private Schema lazyColumnProjection() { return schema; } - - private static String formatTimestampMillis(long millis) { - return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC)); - } } diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index fdcd5ed35645..58a7c9bbdbd6 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -23,6 +23,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; public class DataTableScan extends BaseTableScan { @@ -62,6 +63,15 @@ public TableScan appendsAfter(long fromSnapshotId) { return appendsBetween(fromSnapshotId, currentSnapshot.snapshotId()); } + @Override + public TableScan useSnapshot(long scanSnapshotId) { + // call method in superclass just for the side effect of argument validation; + // we do not use its return value + super.useSnapshot(scanSnapshotId); + Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId); + return newRefinedScan(tableOps(), table(), snapshotSchema, context().useSnapshotId(scanSnapshotId)); + } + @Override protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) { return new DataTableScan(ops, table, schema, context); diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index beafcc29c90c..00c73bfec922 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -19,9 +19,15 @@ package org.apache.iceberg.util; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.function.Function; import org.apache.iceberg.DataFile; +import org.apache.iceberg.HistoryEntry; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; @@ -33,6 +39,8 @@ public class SnapshotUtil { private SnapshotUtil() { } + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + /** * Returns whether ancestorSnapshotId is an ancestor of snapshotId. */ @@ -144,4 +152,80 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) { throw new IllegalStateException( String.format("Cannot find snapshot after %s: not an ancestor of table's current snapshot", snapshotId)); } + + /** + * Returns the ID of the most recent snapshot for the table as of the timestamp. + * + * @param table a {@link Table} + * @param timestampMillis the timestamp in millis since the Unix epoch + * @return the snapshot ID + * @throws IllegalArgumentException when no snapshot is found in the table + * older than the timestamp + */ + public static long snapshotIdAsOfTime(Table table, long timestampMillis) { + Long snapshotId = null; + for (HistoryEntry logEntry : table.history()) { + if (logEntry.timestampMillis() <= timestampMillis) { + snapshotId = logEntry.snapshotId(); + } + } + + Preconditions.checkArgument(snapshotId != null, + "Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis)); + return snapshotId; + } + + /** + * Returns the schema of the table for the specified snapshot. + * + * @param table a {@link Table} + * @param snapshotId the ID of the snapshot + * @return the schema + */ + public static Schema schemaFor(Table table, long snapshotId) { + Snapshot snapshot = table.snapshot(snapshotId); + Preconditions.checkArgument(snapshot != null, "Cannot find snapshot with ID %s", snapshotId); + Integer schemaId = snapshot.schemaId(); + + // schemaId could be null, if snapshot was created before Iceberg added schema id to snapshot + if (schemaId != null) { + Schema schema = table.schemas().get(schemaId); + Preconditions.checkState(schema != null, + "Cannot find schema with schema id %s", schemaId); + return schema; + } + + // TODO: recover the schema by reading previous metadata files + return table.schema(); + } + + /** + * Convenience method for returning the schema of the table for a snapshot, + * when we have a snapshot id or a timestamp. Only one of them should be specified + * (non-null), or an IllegalArgumentException is thrown. + * + * @param table a {@link Table} + * @param snapshotId the ID of the snapshot + * @param timestampMillis the timestamp in millis since the Unix epoch + * @return the schema + * @throws IllegalArgumentException if both snapshotId and timestampMillis are non-null + */ + public static Schema schemaFor(Table table, Long snapshotId, Long timestampMillis) { + Preconditions.checkArgument(snapshotId == null || timestampMillis == null, + "Cannot use both snapshot id and timestamp to find a schema"); + + if (snapshotId != null) { + return schemaFor(table, snapshotId); + } + + if (timestampMillis != null) { + return schemaFor(table, snapshotIdAsOfTime(table, timestampMillis)); + } + + return table.schema(); + } + + public static String formatTimestampMillis(long millis) { + return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC)); + } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 8a94beea12d4..e16940fc4875 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -49,7 +48,6 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TableTestBase; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestHelpers; diff --git a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index f0bc4ee521d4..8976d666f168 100644 --- a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -68,8 +68,8 @@ public DataSourceReader createReader(StructType readSchema, DataSourceOptions op Reader reader = new Reader(lazySparkSession(), table, Boolean.parseBoolean(caseSensitive), options); if (readSchema != null) { - // convert() will fail if readSchema contains fields not in table.schema() - SparkSchemaUtil.convert(table.schema(), readSchema); + // convert() will fail if readSchema contains fields not in reader.snapshotSchema() + SparkSchemaUtil.convert(reader.snapshotSchema(), readSchema); reader.pruneColumns(readSchema); } diff --git a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index a8a2b2f0e51f..2ac570dc8530 100644 --- a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -86,14 +86,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private final JavaSparkContext sparkContext; private final Table table; private final SparkReadConf readConf; - private final Long snapshotId; - private final Long startSnapshotId; - private final Long endSnapshotId; - private final Long asOfTimestamp; - private final Long splitSize; - private final Integer splitLookback; - private final Long splitOpenFileCost; - private final boolean caseSensitive; + private final TableScan baseScan; private StructType requestedSchema = null; private List filterExpressions = null; private Filter[] pushedFilters = NO_FILTERS; @@ -111,31 +104,9 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.readConf = new SparkReadConf(spark, table, options.asMap()); - this.snapshotId = readConf.snapshotId(); - this.asOfTimestamp = readConf.asOfTimestamp(); - if (snapshotId != null && asOfTimestamp != null) { - throw new IllegalArgumentException( - "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); - } - - this.startSnapshotId = readConf.startSnapshotId(); - this.endSnapshotId = readConf.endSnapshotId(); - if (snapshotId != null || asOfTimestamp != null) { - if (startSnapshotId != null || endSnapshotId != null) { - throw new IllegalArgumentException( - "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " + - "as-of-timestamp is specified"); - } - } else { - if (startSnapshotId == null && endSnapshotId != null) { - throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan"); - } - } - // look for split behavior overrides in options - this.splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null); - this.splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null); - this.splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null); + this.baseScan = configureBaseScan(caseSensitive, options); + this.schema = baseScan.schema(); if (table.io() instanceof HadoopFileIO) { String fsscheme = "no_exist"; @@ -157,18 +128,84 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.localityPreferred = false; } - this.schema = table.schema(); - this.caseSensitive = caseSensitive; this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone(); } + private void validateOptions( + Long snapshotId, Long asOfTimestamp, Long startSnapshotId, Long endSnapshotId) { + if (snapshotId != null && asOfTimestamp != null) { + throw new IllegalArgumentException( + "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); + } + + if ((snapshotId != null || asOfTimestamp != null) && + (startSnapshotId != null || endSnapshotId != null)) { + throw new IllegalArgumentException( + "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " + + "as-of-timestamp is specified"); + } + + if (startSnapshotId == null && endSnapshotId != null) { + throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan"); + } + } + + private TableScan configureBaseScan(boolean caseSensitive, DataSourceOptions options) { + Long snapshotId = readConf.snapshotId(); + Long asOfTimestamp = readConf.asOfTimestamp(); + Long startSnapshotId = readConf.startSnapshotId(); + Long endSnapshotId = readConf.endSnapshotId(); + validateOptions(snapshotId, asOfTimestamp, startSnapshotId, endSnapshotId); + + TableScan scan = table.newScan().caseSensitive(caseSensitive); + + if (snapshotId != null) { + scan = scan.useSnapshot(snapshotId); + } + + if (asOfTimestamp != null) { + scan = scan.asOfTime(asOfTimestamp); + } + + if (startSnapshotId != null) { + if (endSnapshotId != null) { + scan = scan.appendsBetween(startSnapshotId, endSnapshotId); + } else { + scan = scan.appendsAfter(startSnapshotId); + } + } + + // look for split behavior overrides in options + Long splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null); + if (splitSize != null) { + scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); + } + + Integer splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null); + if (splitLookback != null) { + scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); + } + + Long splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null); + if (splitOpenFileCost != null) { + scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); + } + + return scan; + } + + protected Schema snapshotSchema() { + return baseScan.schema(); + } + private Schema lazySchema() { if (schema == null) { if (requestedSchema != null) { // the projection should include all columns that will be returned, including those only used in filters - this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema, filterExpression(), caseSensitive); + this.schema = SparkSchemaUtil.prune( + baseScan.schema(), requestedSchema, filterExpression(), baseScan.isCaseSensitive()); } else { - this.schema = table.schema(); + this.schema = baseScan.schema(); } } return schema; @@ -211,6 +248,7 @@ public List> planBatchInputPartitions() { Broadcast tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); List scanTasks = tasks(); + boolean caseSensitive = baseScan.isCaseSensitive(); InputPartition[] readTasks = new InputPartition[scanTasks.size()]; Tasks.range(readTasks.length) @@ -235,6 +273,7 @@ public List> planInputPartitions() { Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); List scanTasks = tasks(); + boolean caseSensitive = baseScan.isCaseSensitive(); InputPartition[] readTasks = new InputPartition[scanTasks.size()]; Tasks.range(readTasks.length) @@ -378,38 +417,7 @@ private static void mergeIcebergHadoopConfs( private List tasks() { if (tasks == null) { - TableScan scan = table - .newScan() - .caseSensitive(caseSensitive) - .project(lazySchema()); - - if (snapshotId != null) { - scan = scan.useSnapshot(snapshotId); - } - - if (asOfTimestamp != null) { - scan = scan.asOfTime(asOfTimestamp); - } - - if (startSnapshotId != null) { - if (endSnapshotId != null) { - scan = scan.appendsBetween(startSnapshotId, endSnapshotId); - } else { - scan = scan.appendsAfter(startSnapshotId); - } - } - - if (splitSize != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); - } - - if (splitLookback != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); - } - - if (splitOpenFileCost != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); - } + TableScan scan = baseScan.project(lazySchema()); if (filterExpressions != null) { for (Expression filter : filterExpressions) { @@ -430,8 +438,8 @@ private List tasks() { @Override public String toString() { return String.format( - "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s, batchedReads=%s)", - table, lazySchema().asStruct(), filterExpressions, caseSensitive, enableBatchRead()); + "IcebergScan(table=%s, type=%s, filters=%s, batchedReads=%s)", + table, lazySchema().asStruct(), filterExpressions, enableBatchRead()); } private static class ReadTask implements Serializable, InputPartition { diff --git a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 10b9d6f3030c..93a3bf13189f 100644 --- a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -43,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTestBase; @@ -52,7 +53,9 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -68,6 +71,17 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { optional(2, "data", Types.StringType.get()) ); + private static final Schema SCHEMA2 = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "category", Types.StringType.get()) + ); + + private static final Schema SCHEMA3 = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(3, "category", Types.StringType.get()) + ); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build(); @Rule @@ -1111,7 +1125,7 @@ public void testPartitionsTable() { // check time travel List actualAfterFirstCommit = spark.read() .format("iceberg") - .option("snapshot-id", String.valueOf(firstCommitId)) + .option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId)) .load(loadLocation(tableIdentifier, "partitions")) .orderBy("partition.id") .collectAsList(); @@ -1139,6 +1153,223 @@ public void testPartitionsTable() { } } + @Test + public synchronized void testSnapshotReadAfterAddColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x"), + RowFactory.create(2, "y"), + RowFactory.create(3, "z")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + Snapshot snapshot1 = table.currentSnapshot(); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf2 = spark.createDataFrame(newRecords, newSparkSchema); + inputDf2.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "x", null), + RowFactory.create(2, "y", null), + RowFactory.create(3, "z", null), + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId()) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf3.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema()); + } + + @Test + public synchronized void testSnapshotReadAfterDropColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA2, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x", "A"), + RowFactory.create(2, "y", "A"), + RowFactory.create(3, "z", "B")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + long tsBeforeDropColumn = waitUntilAfter(System.currentTimeMillis()); + table.updateSchema().deleteColumn("data").commit(); + long tsAfterDropColumn = waitUntilAfter(System.currentTimeMillis()); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA3); + Dataset inputDf2 = spark.createDataFrame(newRecords, newSparkSchema); + inputDf2.select("id", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "A"), + RowFactory.create(2, "A"), + RowFactory.create(3, "B"), + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, tsBeforeDropColumn) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf3.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema()); + + // At tsAfterDropColumn, there has been a schema change, but no new snapshot, + // so the snapshot as of tsAfterDropColumn is the same as that as of tsBeforeDropColumn. + Dataset resultDf4 = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, tsAfterDropColumn) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf4.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema()); + } + + @Test + public synchronized void testSnapshotReadAfterAddAndDropColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x"), + RowFactory.create(2, "y"), + RowFactory.create(3, "z")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + Snapshot snapshot1 = table.currentSnapshot(); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + StructType sparkSchemaAfterAddColumn = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf2 = spark.createDataFrame(newRecords, sparkSchemaAfterAddColumn); + inputDf2.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "x", null), + RowFactory.create(2, "y", null), + RowFactory.create(3, "z", null), + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + table.updateSchema().deleteColumn("data").commit(); + + List recordsAfterDropColumn = Lists.newArrayList( + RowFactory.create(1, null), + RowFactory.create(2, null), + RowFactory.create(3, null), + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", recordsAfterDropColumn, + resultDf3.orderBy("id").collectAsList()); + + Dataset resultDf4 = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId()) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf4.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema()); + } + @Test public void testRemoveOrphanFilesActionSupport() throws InterruptedException { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); diff --git a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 8da4c427d115..8102be5e196a 100644 --- a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -19,14 +19,18 @@ package org.apache.iceberg.spark; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -39,6 +43,7 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -46,6 +51,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -80,6 +87,9 @@ */ public class SparkCatalog extends BaseCatalog { private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); + private static final Splitter COMMA = Splitter.on(","); + private static final Pattern AT_TIME = Pattern.compile("at(?:_(?:time(?:stamp)?)?)?_?(\\d+)"); + private static final Pattern SNAPSHOT_ID = Pattern.compile("s(?:nap(?:shot)?)?(?:_id)?_?(\\d+)"); private String catalogName = null; private Catalog icebergCatalog = null; @@ -118,8 +128,8 @@ protected TableIdentifier buildIdentifier(Identifier identifier) { @Override public SparkTable loadTable(Identifier ident) throws NoSuchTableException { try { - Table icebergTable = load(ident); - return new SparkTable(icebergTable, !cacheEnabled); + Pair icebergTable = load(ident); + return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -220,7 +230,7 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) throws No } try { - Table table = load(ident); + Table table = load(ident).first(); commitChanges(table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); @@ -256,7 +266,7 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept @Override public void invalidateTable(Identifier ident) { try { - load(ident).refresh(); + load(ident).first().refresh(); } catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) { // ignore if the table doesn't exist, it is not cached } @@ -456,10 +466,97 @@ private static void checkNotPathIdentifier(Identifier identifier, String method) } } - private Table load(Identifier ident) { - return isPathIdentifier(ident) ? - tables.load(((PathIdentifier) ident).location()) : - icebergCatalog.loadTable(buildIdentifier(ident)); + private Pair load(Identifier ident) { + if (isPathIdentifier(ident)) { + return loadFromPathIdentifier((PathIdentifier) ident); + } + + try { + return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null); + + } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + // if the original load didn't work, the identifier may be extended and include a snapshot selector + TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); + Table table; + try { + table = icebergCatalog.loadTable(namespaceAsIdent); + } catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) { + // the namespace does not identify a table, so it cannot be a table with a snapshot selector + // throw the original exception + throw e; + } + + // loading the namespace as a table worked, check the name to see if it is a valid selector + Matcher at = AT_TIME.matcher(ident.name()); + if (at.matches()) { + long asOfTimestamp = Long.parseLong(at.group(1)); + return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + } + + Matcher id = SNAPSHOT_ID.matcher(ident.name()); + if (id.matches()) { + long snapshotId = Long.parseLong(id.group(1)); + return Pair.of(table, snapshotId); + } + + // the name wasn't a valid snapshot selector. throw the original exception + throw e; + } + } + + private Pair> parseLocationString(String location) { + int hashIndex = location.lastIndexOf('#'); + if (hashIndex != -1 && !location.endsWith("#")) { + String baseLocation = location.substring(0, hashIndex); + List metadata = COMMA.splitToList(location.substring(hashIndex + 1)); + return Pair.of(baseLocation, metadata); + } else { + return Pair.of(location, ImmutableList.of()); + } + } + + private Pair loadFromPathIdentifier(PathIdentifier ident) { + Pair> parsed = parseLocationString(ident.location()); + + String metadataTableName = null; + Long asOfTimestamp = null; + Long snapshotId = null; + for (String meta : parsed.second()) { + if (MetadataTableType.from(meta) != null) { + metadataTableName = meta; + continue; + } + + Matcher at = AT_TIME.matcher(meta); + if (at.matches()) { + asOfTimestamp = Long.parseLong(at.group(1)); + continue; + } + + Matcher id = SNAPSHOT_ID.matcher(meta); + if (id.matches()) { + snapshotId = Long.parseLong(id.group(1)); + } + } + + Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null, + "Cannot specify as-of-timestamp and snapshot-id: %s", ident.location()); + + Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : "")); + + if (snapshotId != null) { + return Pair.of(table, snapshotId); + } else if (asOfTimestamp != null) { + return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + } else { + return Pair.of(table, null); + } + } + + private Identifier namespaceToIdentifier(String[] namespace) { + String[] ns = Arrays.copyOf(namespace, namespace.length - 1); + String name = namespace[ns.length]; + return Identifier.of(ns, name); } private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) { diff --git a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 98fcc0e85cf4..f8d80df17c5f 100644 --- a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -19,11 +19,13 @@ package org.apache.iceberg.spark.source; +import java.util.Arrays; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.PathIdentifier; import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSessionCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -56,6 +58,8 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions { private static final String DEFAULT_CATALOG_NAME = "default_iceberg"; private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME; + private static final String AT_TIMESTAMP = "at_timestamp_"; + private static final String SNAPSHOT_ID = "snapshot_id_"; @Override public String shortName() { @@ -101,12 +105,26 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri SparkSession spark = SparkSession.active(); setupDefaultSparkCatalog(spark); String path = options.get("path"); + + Long snapshotId = Spark3Util.propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID, null); + Long asOfTimestamp = Spark3Util.propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP, null); + Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null, + "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp); + String selector = ""; + if (snapshotId != null) { + selector = SNAPSHOT_ID + snapshotId; + } + if (asOfTimestamp != null) { + selector = AT_TIMESTAMP + asOfTimestamp; + } + CatalogManager catalogManager = spark.sessionState().catalogManager(); if (path.contains("/")) { // contains a path. Return iceberg default catalog and a PathIdentifier + String newPath = selector.equals("") ? path : path + "#" + selector; return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), - new PathIdentifier(path)); + new PathIdentifier(newPath)); } final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier( @@ -115,10 +133,28 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri if (catalogAndIdentifier.catalog().name().equals("spark_catalog") && !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) { // catalog is a session catalog but does not support Iceberg. Use Iceberg instead. + Identifier ident = catalogAndIdentifier.identifier(); return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), - catalogAndIdentifier.identifier()); - } else { + newIdentifier(ident, selector)); + } else if (snapshotId == null && asOfTimestamp == null) { return catalogAndIdentifier; + } else { + CatalogPlugin catalog = catalogAndIdentifier.catalog(); + Identifier ident = catalogAndIdentifier.identifier(); + return new Spark3Util.CatalogAndIdentifier(catalog, + newIdentifier(ident, selector)); + } + } + + private Identifier newIdentifier(Identifier ident, String newName) { + if (newName.equals("")) { + return ident; + } else { + String[] namespace = ident.namespace(); + String name = ident.name(); + String[] ns = Arrays.copyOf(namespace, namespace.length + 1); + ns[namespace.length] = name; + return Identifier.of(ns, newName); } } diff --git a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 82fbdea8d6ea..45c9452d3ad0 100644 --- a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -36,6 +36,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; @@ -54,6 +55,8 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S private final SparkReadConf readConf; private final CaseInsensitiveStringMap options; private final List metaColumns = Lists.newArrayList(); + private final Long snapshotId; + private final Long asOfTimestamp; private Schema schema = null; private StructType requestedProjection; @@ -67,16 +70,22 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S this.table = table; this.readConf = new SparkReadConf(spark, table, options); this.options = options; + this.snapshotId = readConf.snapshotId(); + this.asOfTimestamp = readConf.asOfTimestamp(); this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); } + private Schema snapshotSchema() { + return SnapshotUtil.schemaFor(table, snapshotId, asOfTimestamp); + } + private Schema lazySchema() { if (schema == null) { if (requestedProjection != null) { // the projection should include all columns that will be returned, including those only used in filters - this.schema = SparkSchemaUtil.prune(table.schema(), requestedProjection, filterExpression(), caseSensitive); + this.schema = SparkSchemaUtil.prune(snapshotSchema(), requestedProjection, filterExpression(), caseSensitive); } else { - this.schema = table.schema(); + this.schema = snapshotSchema(); } } return schema; @@ -108,7 +117,7 @@ public Filter[] pushFilters(Filter[] filters) { Expression expr = SparkFilters.convert(filter); if (expr != null) { try { - Binder.bind(table.schema().asStruct(), expr, caseSensitive); + Binder.bind(snapshotSchema().asStruct(), expr, caseSensitive); expressions.add(expr); pushed.add(filter); } catch (ValidationException e) { diff --git a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index c535a3534954..53b736234476 100644 --- a/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -27,8 +27,10 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkFilters; @@ -36,6 +38,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.SupportsWrite; @@ -76,7 +79,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, TableCapability.OVERWRITE_DYNAMIC); private final Table icebergTable; - private final StructType requestedSchema; + private final Long snapshotId; private final boolean refreshEagerly; private StructType lazyTableSchema = null; private SparkSession lazySpark = null; @@ -85,15 +88,10 @@ public SparkTable(Table icebergTable, boolean refreshEagerly) { this(icebergTable, null, refreshEagerly); } - public SparkTable(Table icebergTable, StructType requestedSchema, boolean refreshEagerly) { + public SparkTable(Table icebergTable, Long snapshotId, boolean refreshEagerly) { this.icebergTable = icebergTable; - this.requestedSchema = requestedSchema; + this.snapshotId = snapshotId; this.refreshEagerly = refreshEagerly; - - if (requestedSchema != null) { - // convert the requested schema to throw an exception if any requested fields are unknown - SparkSchemaUtil.convert(icebergTable.schema(), requestedSchema); - } } private SparkSession sparkSession() { @@ -113,14 +111,14 @@ public String name() { return icebergTable.toString(); } + private Schema snapshotSchema() { + return SnapshotUtil.schemaFor(icebergTable, snapshotId, null); + } + @Override public StructType schema() { if (lazyTableSchema == null) { - if (requestedSchema != null) { - this.lazyTableSchema = SparkSchemaUtil.convert(SparkSchemaUtil.prune(icebergTable.schema(), requestedSchema)); - } else { - this.lazyTableSchema = SparkSchemaUtil.convert(icebergTable.schema()); - } + this.lazyTableSchema = SparkSchemaUtil.convert(snapshotSchema()); } return lazyTableSchema; @@ -171,17 +169,15 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { icebergTable.refresh(); } - SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options); - - if (requestedSchema != null) { - scanBuilder.pruneColumns(requestedSchema); - } - - return scanBuilder; + return new SparkScanBuilder(sparkSession(), icebergTable, addSnapshotId(options, snapshotId)); } @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { + Preconditions.checkArgument( + snapshotId == null, + "Cannot write to table at a specific snapshot: %s", snapshotId); + if (info.options().containsKey(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID)) { // replace data files in the given file scan task set with new files return new SparkRewriteBuilder(sparkSession(), icebergTable, info); @@ -192,6 +188,10 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { @Override public MergeBuilder newMergeBuilder(String operation, LogicalWriteInfo info) { + Preconditions.checkArgument( + snapshotId == null, + "Cannot write to table at a specific snapshot: %s", snapshotId); + String mode = getRowLevelOperationMode(operation); ValidationException.check(mode.equals("copy-on-write"), "Unsupported mode for %s: %s", operation, mode); return new SparkMergeBuilder(sparkSession(), icebergTable, operation, info); @@ -212,6 +212,10 @@ private String getRowLevelOperationMode(String operation) { @Override public boolean canDeleteWhere(Filter[] filters) { + Preconditions.checkArgument( + snapshotId == null, + "Cannot delete from table at a specific snapshot: %s", snapshotId); + if (table().specs().size() > 1) { // cannot guarantee a metadata delete will be successful if we have multiple specs return false; @@ -283,4 +287,34 @@ public int hashCode() { // use only name in order to correctly invalidate Spark cache return icebergTable.name().hashCode(); } + + private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) { + if (snapshotId != null) { + // If the table is loaded using the Spark DataFrame API, and option("snapshot-id", ) + // or option("as-of-timestamp", ) is applied to the DataFrameReader, SparkTable will be + // constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called + // with the options, which will include "snapshot-id" or "as-of-timestamp". + // On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot + // or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but + // SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option. + // We therefore add a "snapshot-id" option here in this latter case. + // As a consistency check, if "snapshot-id" is in the options, the id must match what we already + // have. + Long snapshotIdFromOptions = Spark3Util.propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID, null); + Long asOfTimestamp = Spark3Util.propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP, null); + Preconditions.checkArgument( + snapshotIdFromOptions == null || snapshotIdFromOptions.longValue() == snapshotId.longValue(), + "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions); + + Map scanOptions = Maps.newHashMap(); + scanOptions.putAll(options.asCaseSensitiveMap()); + if (snapshotIdFromOptions == null && asOfTimestamp == null) { + scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId)); + } + + return new CaseInsensitiveStringMap(scanOptions); + } + + return options; + } } diff --git a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 846e234cba07..8fb83582d991 100644 --- a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -120,4 +120,45 @@ public void testMetadataTables() { ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)), sql("SELECT * FROM %s.snapshots", tableName)); } + + @Test + public void testSnapshotInTableName() { + Assume.assumeFalse( + "Spark session catalog does not support extended table names", + "spark_catalog".equals(catalogName)); + + // get the snapshot ID of the last write and get the current row set as expected + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + for (String prefix : new String[] { + "s", "s_", "snap", "snap_", "snapshot", "snapshot_", "snapshot_id", "snapshot_id_" }) { + // read the table at the snapshot + List actual = sql("SELECT * FROM %s.%s", tableName, prefix + snapshotId); + assertEquals("Snapshot at specific ID, prefix " + prefix, expected, actual); + } + } + + @Test + public void testTimestampInTableName() { + Assume.assumeFalse( + "Spark session catalog does not support extended table names", + "spark_catalog".equals(catalogName)); + + // get a timestamp just after the last write and get the current row set as expected + long timestamp = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis() + 2; + List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + for (String prefix : new String[] {"at", "at_", "at_time", "at_time_", "at_timestamp", "at_timestamp_" }) { + // read the table at the snapshot + List actual = sql("SELECT * FROM %s.%s", tableName, prefix + timestamp); + assertEquals("Snapshot at time, prefix " + prefix, expected, actual); + } + } }