diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index ef262e11f02b..85c71827d7ac 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -94,6 +94,14 @@ public Long endSnapshotId() { return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional(); } + public String branch() { + return confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional(); + } + + public String tag() { + return confParser.stringConf().option(SparkReadOptions.TAG).parseOptional(); + } + public String fileScanTaskSetId() { return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 9515a48bc297..96e09d70ef65 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -35,6 +35,12 @@ private SparkReadOptions() {} // A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. public static final String AS_OF_TIMESTAMP = "as-of-timestamp"; + // Branch to read from + public static final String BRANCH = "branch"; + + // Tag to read from + public static final String TAG = "tag"; + // Overrides the table's read.split.target-size and read.split.metadata-target-size public static final String SPLIT_SIZE = "split-size"; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 59dd8759968f..6dcbd5091212 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -67,6 +67,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering private final Long startSnapshotId; private final Long endSnapshotId; private final Long asOfTimestamp; + private final String branch; + private final String tag; private final List runtimeFilterExpressions; private Set specIds = null; // lazy cache of scanned spec IDs @@ -88,6 +90,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering this.startSnapshotId = readConf.startSnapshotId(); this.endSnapshotId = readConf.endSnapshotId(); this.asOfTimestamp = readConf.asOfTimestamp(); + this.branch = readConf.branch(); + this.tag = readConf.tag(); this.runtimeFilterExpressions = Lists.newArrayList(); if (scan == null) { @@ -244,6 +248,14 @@ public Statistics estimateStatistics() { Snapshot snapshot = table().snapshot(snapshotIdAsOfTime); return estimateStatistics(snapshot); + } else if (branch != null) { + Snapshot snapshot = table().snapshot(branch); + return estimateStatistics(snapshot); + + } else if (tag != null) { + Snapshot snapshot = table().snapshot(tag); + return estimateStatistics(snapshot); + } else { Snapshot snapshot = table().currentSnapshot(); return estimateStatistics(snapshot); @@ -251,6 +263,7 @@ public Statistics estimateStatistics() { } @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") public boolean equals(Object o) { if (this == o) { return true; @@ -269,7 +282,9 @@ && readSchema().equals(that.readSchema()) && Objects.equals(snapshotId, that.snapshotId) && Objects.equals(startSnapshotId, that.startSnapshotId) && Objects.equals(endSnapshotId, that.endSnapshotId) - && Objects.equals(asOfTimestamp, that.asOfTimestamp); + && Objects.equals(asOfTimestamp, that.asOfTimestamp) + && Objects.equals(branch, that.branch) + && Objects.equals(tag, that.tag); } @Override @@ -282,7 +297,9 @@ public int hashCode() { snapshotId, startSnapshotId, endSnapshotId, - asOfTimestamp); + asOfTimestamp, + branch, + tag); } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index b291a8e2679e..150da814ba9e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -183,6 +183,8 @@ private Schema schemaWithMetadataColumns() { public Scan build() { Long snapshotId = readConf.snapshotId(); Long asOfTimestamp = readConf.asOfTimestamp(); + String branch = readConf.branch(); + String tag = readConf.tag(); Preconditions.checkArgument( snapshotId == null || asOfTimestamp == null, @@ -226,6 +228,14 @@ public Scan build() { scan = scan.asOfTime(asOfTimestamp); } + if (branch != null) { + scan = scan.useRef(branch); + } + + if (tag != null) { + scan = scan.useRef(tag); + } + if (startSnapshotId != null) { if (endSnapshotId != null) { scan = scan.appendsBetween(startSnapshotId, endSnapshotId); @@ -241,10 +251,15 @@ public Scan build() { public Scan buildChangelogScan() { Preconditions.checkArgument( - readConf.snapshotId() == null && readConf.asOfTimestamp() == null, - "Cannot set neither %s nor %s for changelogs", + readConf.snapshotId() == null + && readConf.asOfTimestamp() == null + && readConf.branch() == null + && readConf.tag() == null, + "Cannot set neither %s, %s, %s and %s for changelogs", SparkReadOptions.SNAPSHOT_ID, - SparkReadOptions.AS_OF_TIMESTAMP); + SparkReadOptions.AS_OF_TIMESTAMP, + SparkReadOptions.BRANCH, + SparkReadOptions.TAG); Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); @@ -273,10 +288,15 @@ public Scan buildChangelogScan() { public Scan buildMergeOnReadScan() { Preconditions.checkArgument( - readConf.snapshotId() == null && readConf.asOfTimestamp() == null, - "Cannot set time travel options %s and %s for row-level command scans", + readConf.snapshotId() == null + && readConf.asOfTimestamp() == null + && readConf.branch() == null + && readConf.tag() == null, + "Cannot set time travel options %s, %s, %s and %s for row-level command scans", SparkReadOptions.SNAPSHOT_ID, - SparkReadOptions.AS_OF_TIMESTAMP); + SparkReadOptions.AS_OF_TIMESTAMP, + SparkReadOptions.BRANCH, + SparkReadOptions.TAG); Preconditions.checkArgument( readConf.startSnapshotId() == null && readConf.endSnapshotId() == null, diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 9661cfe20b1c..0b7348fa078a 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -226,4 +226,148 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .hasMessageContaining("Cannot specify both snapshot-id") .hasMessageContaining("and as-of-timestamp"); } + + @Test + public void testSnapshotSelectionByTag() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); + + // produce the second snapshot + List secondBatchRecords = + Lists.newArrayList( + new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")); + Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); + secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + // verify records in the current snapshot by tag + Dataset currentSnapshotResult = + spark.read().format("iceberg").option("tag", "tag").load(tableLocation); + List currentSnapshotRecords = + currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + } + + @Test + public void testSnapshotSelectionByBranch() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + + // produce the second snapshot + List secondBatchRecords = + Lists.newArrayList( + new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")); + Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); + secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + // verify records in the current snapshot by branch + Dataset currentSnapshotResult = + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); + List currentSnapshotRecords = + currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + } + + @Test + public void testSnapshotSelectionByBranchAndTagFails() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); + + Assertions.assertThatThrownBy( + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.TAG, "tag") + .option(SparkReadOptions.BRANCH, "branch") + .load(tableLocation) + .show()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + } + + @Test + public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + long timestamp = System.currentTimeMillis(); + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); + + Assertions.assertThatThrownBy( + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) + .option(SparkReadOptions.BRANCH, "branch") + .load(tableLocation) + .show()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + + Assertions.assertThatThrownBy( + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) + .option(SparkReadOptions.TAG, "tag") + .load(tableLocation) + .show()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + } }