From 413e75a8a9cc0604f249f03c64872a7c481073a9 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Tue, 29 Mar 2022 21:20:02 -0700 Subject: [PATCH 01/20] Core, API: Add getting refs and snapshot by ref to the Table API --- .../main/java/org/apache/iceberg/Table.java | 21 +++++++++++++++++++ .../org/apache/iceberg/BaseMetadataTable.java | 5 +++++ .../java/org/apache/iceberg/BaseTable.java | 5 +++++ .../org/apache/iceberg/BaseTransaction.java | 5 +++++ .../org/apache/iceberg/SerializableTable.java | 7 +++++++ 5 files changed, 43 insertions(+) diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 1e9667f4ccd8..dd561119d8a8 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -297,4 +297,25 @@ default AppendFiles newFastAppend() { * Returns a {@link LocationProvider} to provide locations for new data files. */ LocationProvider locationProvider(); + + /** + * Returns the current refs for the table + * + * @return the current refs for the table + */ + Map refs(); + + /** + * Returns the snapshot referenced by the given name or null if no such reference exists. + * + * @return the snapshot which is referenced by the given name or null if no such reference exists. + */ + default Snapshot snapshot(String name) { + SnapshotRef ref = refs().get(name); + if (ref != null) { + return snapshot(ref.snapshotId()); + } + + return null; + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index f4966352ecd6..ab53683bc864 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -156,6 +156,11 @@ public List history() { return table().history(); } + @Override + public Map refs() { + return table().refs(); + } + @Override public UpdateSchema updateSchema() { throw new UnsupportedOperationException("Cannot update the schema of a metadata table"); diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index b9886f0a1e68..2e8cc9d41d43 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -224,6 +224,11 @@ public LocationProvider locationProvider() { return operations().locationProvider(); } + @Override + public Map refs() { + return ops.current().refs(); + } + @Override public String toString() { return name(); diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index b64f328c32cd..a91b0025cd48 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -733,6 +733,11 @@ public LocationProvider locationProvider() { return transactionOps.locationProvider(); } + @Override + public Map refs() { + return current.refs(); + } + @Override public String toString() { return name(); diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 403e2e39de37..4de823070edd 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -61,6 +61,7 @@ public class SerializableTable implements Table, Serializable { private final FileIO io; private final EncryptionManager encryption; private final LocationProvider locationProvider; + private final Map refs; private transient volatile Table lazyTable = null; private transient volatile Schema lazySchema = null; @@ -81,6 +82,7 @@ private SerializableTable(Table table) { this.io = fileIO(table); this.encryption = table.encryption(); this.locationProvider = table.locationProvider(); + this.refs = table.refs(); } /** @@ -232,6 +234,11 @@ public LocationProvider locationProvider() { return locationProvider; } + @Override + public Map refs() { + return refs; + } + @Override public void refresh() { throw new UnsupportedOperationException(errorMsg("refresh")); From 8132d205f895208f36344dcc8dcc2794052146ab Mon Sep 17 00:00:00 2001 From: nkeshavaprakash Date: Tue, 28 Jun 2022 12:22:04 -0700 Subject: [PATCH 02/20] Spark 3.2 Integration to read from Snapshot ref --- api/src/main/java/org/apache/iceberg/TableScan.java | 10 ++++++++++ .../main/java/org/apache/iceberg/BaseTableScan.java | 7 +++++++ .../java/org/apache/iceberg/spark/SparkReadConf.java | 6 ++++++ .../org/apache/iceberg/spark/SparkReadOptions.java | 3 +++ .../iceberg/spark/source/SparkBatchQueryScan.java | 7 +++++-- .../apache/iceberg/spark/source/SparkScanBuilder.java | 5 +++++ 6 files changed, 36 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java index 81f7aa91315f..250e38baa14e 100644 --- a/api/src/main/java/org/apache/iceberg/TableScan.java +++ b/api/src/main/java/org/apache/iceberg/TableScan.java @@ -54,6 +54,16 @@ public interface TableScan extends Scan { */ TableScan asOfTime(long timestampMillis); + /** + * Create a new {@link TableScan} from this scan's configuration that will use the given snapshot + * by ID. + * + * @param snapshotRef a snapshot Ref + * @return a new scan based on this with the given snapshot Ref + * @throws IllegalArgumentException if the snapshot cannot be found + */ + TableScan useSnapshotRef(String snapshotRef); + /** * Create a new {@link TableScan} from this that will read the given data columns. This produces * an expected schema that includes all fields that are either selected or used by this scan's diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index 83b3ba8159d8..d3106092d356 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -143,6 +143,13 @@ public TableScan asOfTime(long timestampMillis) { return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis)); } + @Override + public TableScan useSnapshotRef(String snapshotRef) { + Preconditions.checkArgument(ops.current().ref(snapshotRef) != null, + "Cannot find ref with name %s", snapshotRef); + return useSnapshot(table.snapshot(snapshotRef).snapshotId()); + } + @Override public TableScan option(String property, String value) { return newRefinedScan(ops, table, schema, context.withOption(property, value)); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index c7c01758c3ee..0f0aef732fd5 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -103,6 +103,12 @@ public Long endSnapshotId() { .parseOptional(); } + public String snapshotRef() { + return confParser.stringConf() + .option(SparkReadOptions.SNAPSHOT_REF) + .parseOptional(); + } + public String fileScanTaskSetId() { return confParser.stringConf() .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index edcc2300344a..a58a2081a0c5 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -39,6 +39,9 @@ private SparkReadOptions() { // A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. public static final String AS_OF_TIMESTAMP = "as-of-timestamp"; + // A snapshot ref name that will be used to fetch snapshot pointed by ref + public static final String SNAPSHOT_REF = "snapshot-ref"; + // Overrides the table's read.split.target-size and read.split.metadata-target-size public static final String SPLIT_SIZE = "split-size"; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 651a411ebd7b..9ff37aa78694 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -68,6 +68,7 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering private final Long startSnapshotId; private final Long endSnapshotId; private final Long asOfTimestamp; + private final String snapshotRef; private final List runtimeFilterExpressions; private Set specIds = null; // lazy cache of scanned spec IDs @@ -84,6 +85,7 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering this.startSnapshotId = readConf.startSnapshotId(); this.endSnapshotId = readConf.endSnapshotId(); this.asOfTimestamp = readConf.asOfTimestamp(); + this.snapshotRef = readConf.snapshotRef(); this.runtimeFilterExpressions = Lists.newArrayList(); if (scan == null) { @@ -256,14 +258,15 @@ public boolean equals(Object o) { Objects.equals(snapshotId, that.snapshotId) && Objects.equals(startSnapshotId, that.startSnapshotId) && Objects.equals(endSnapshotId, that.endSnapshotId) && - Objects.equals(asOfTimestamp, that.asOfTimestamp); + Objects.equals(asOfTimestamp, that.asOfTimestamp) && + Objects.equals(snapshotRef, that.snapshotRef); } @Override public int hashCode() { return Objects.hash( table().name(), readSchema(), filterExpressions().toString(), runtimeFilterExpressions.toString(), - snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp); + snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, snapshotRef); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 30c033f2994d..2cb0f84958c7 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -153,6 +153,7 @@ private Schema schemaWithMetadataColumns() { public Scan build() { Long snapshotId = readConf.snapshotId(); Long asOfTimestamp = readConf.asOfTimestamp(); + String snapshotRef = readConf.snapshotRef(); Preconditions.checkArgument(snapshotId == null || asOfTimestamp == null, "Cannot set both %s and %s to select which table snapshot to scan", @@ -187,6 +188,10 @@ public Scan build() { scan = scan.asOfTime(asOfTimestamp); } + if(snapshotRef != null){ + scan = scan.useSnapshotRef(snapshotRef); + } + if (startSnapshotId != null) { if (endSnapshotId != null) { scan = scan.appendsBetween(startSnapshotId, endSnapshotId); From c6cdf5cd70bf70d1cdd57cce10496b0435d7851f Mon Sep 17 00:00:00 2001 From: nkeshavaprakash Date: Tue, 28 Jun 2022 22:45:17 -0700 Subject: [PATCH 03/20] checkStyle fixes --- .../java/org/apache/iceberg/spark/source/SparkScanBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 2cb0f84958c7..f2ee5216439c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -188,7 +188,7 @@ public Scan build() { scan = scan.asOfTime(asOfTimestamp); } - if(snapshotRef != null){ + if (snapshotRef != null) { scan = scan.useSnapshotRef(snapshotRef); } From 235ff35b36ecb16ee201c587888ec2f52ab1e0e2 Mon Sep 17 00:00:00 2001 From: nkeshavaprakash Date: Fri, 5 Aug 2022 15:55:28 -0700 Subject: [PATCH 04/20] Spark 3.2 Integration to read from Snapshot ref --- core/src/main/java/org/apache/iceberg/BaseTableScan.java | 4 ++-- .../main/java/org/apache/iceberg/spark/SparkReadConf.java | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index 3b6090fc3858..36d4cdcb77b0 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -96,9 +96,9 @@ public TableScan useSnapshot(long scanSnapshotId) { @Override public TableScan useSnapshotRef(String snapshotRef) { - Preconditions.checkArgument(ops.current().ref(snapshotRef) != null, + Preconditions.checkArgument(table().snapshot(snapshotRef) != null, "Cannot find ref with name %s", snapshotRef); - return useSnapshot(table.snapshot(snapshotRef).snapshotId()); + return useSnapshot(table().snapshot(snapshotRef).snapshotId()); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index a05193655ca3..52553e6214ad 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -94,11 +94,7 @@ public Long endSnapshotId() { return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional(); } - public String snapshotRef() { - return confParser.stringConf() - .option(SparkReadOptions.SNAPSHOT_REF) - .parseOptional(); - } + public String snapshotRef() { return confParser.stringConf().option(SparkReadOptions.SNAPSHOT_REF).parseOptional(); } public String fileScanTaskSetId() { return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional(); From 7483cf93dfbeae182902ec4e117b7ec82141bcf4 Mon Sep 17 00:00:00 2001 From: nkeshavaprakash Date: Fri, 5 Aug 2022 15:57:41 -0700 Subject: [PATCH 05/20] Spark 3.2 Integration to read from Snapshot ref --- api/src/main/java/org/apache/iceberg/TableScan.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java index e6b911cd51b9..b811de5aa6d8 100644 --- a/api/src/main/java/org/apache/iceberg/TableScan.java +++ b/api/src/main/java/org/apache/iceberg/TableScan.java @@ -50,8 +50,8 @@ public interface TableScan extends Scan Date: Fri, 5 Aug 2022 16:06:39 -0700 Subject: [PATCH 06/20] Spark 3.1 Integration to read from Snapshot ref --- .../apache/iceberg/spark/SparkConfParser.java | 4 ++ .../apache/iceberg/spark/SparkReadConf.java | 2 + .../iceberg/spark/SparkReadOptions.java | 3 + .../spark/source/SparkBatchQueryScan.java | 7 +++ .../spark/source/TestSnapshotSelection.java | 58 +++++++++++++++++++ .../spark/source/TestSnapshotSelection.java | 58 +++++++++++++++++++ 6 files changed, 132 insertions(+) diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index 33e5ca936800..2a89c742249a 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -132,6 +132,10 @@ public String parse() { Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Function.identity(), defaultValue); } + + public String parseOptional() { + return parse(Function.identity(), null); + } } abstract class ConfParser { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 937c31e45960..984bdf47c4f6 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -88,6 +88,8 @@ public Long endSnapshotId() { return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional(); } + public String snapshotRef() {return confParser.stringConf().option(SparkReadOptions.SNAPSHOT_REF).parseOptional();} + public boolean streamingSkipDeleteSnapshots() { return confParser .booleanConf() diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index d13e80d40004..2e104d43345a 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -35,6 +35,9 @@ private SparkReadOptions() {} // A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. public static final String AS_OF_TIMESTAMP = "as-of-timestamp"; + // Snapshot Ref of the table snapshot to read from + public static final String SNAPSHOT_REF = "snapshot-ref"; + // Overrides the table's read.split.target-size and read.split.metadata-target-size public static final String SPLIT_SIZE = "split-size"; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 4fcab5517d44..f8c52011d6a6 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -41,6 +41,7 @@ class SparkBatchQueryScan extends SparkBatchScan { private final Long snapshotId; private final Long startSnapshotId; private final Long endSnapshotId; + private final String snapshotRef; private final Long asOfTimestamp; private final Long splitSize; private final Integer splitLookback; @@ -61,6 +62,8 @@ class SparkBatchQueryScan extends SparkBatchScan { this.snapshotId = readConf.snapshotId(); this.asOfTimestamp = readConf.asOfTimestamp(); + this.snapshotRef = readConf.snapshotRef(); + if (snapshotId != null && asOfTimestamp != null) { throw new IllegalArgumentException( @@ -103,6 +106,10 @@ protected List tasks() { scan = scan.asOfTime(asOfTimestamp); } + if (snapshotRef != null) { + scan = scan.useSnapshotRef(snapshotRef); + } + if (startSnapshotId != null) { if (endSnapshotId != null) { scan = scan.appendsBetween(startSnapshotId, endSnapshotId); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 9661cfe20b1c..bb5c0ef1492f 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -226,4 +226,62 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .hasMessageContaining("Cannot specify both snapshot-id") .hasMessageContaining("and as-of-timestamp"); } + + @Test + public void testSnapshotSelectionByRef() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + // produce the second snapshot + List secondBatchRecords = Lists.newArrayList( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e"), + new SimpleRecord(6, "f") + ); + Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); + secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + Assert.assertEquals("Expected 2 snapshots", 2, Iterables.size(table.snapshots())); + + // verify records in the current snapshot + table.manageSnapshots().createTag("firstag", table.currentSnapshot().snapshotId()).commit(); + Dataset currentSnapshotResult = spark.read() + .format("iceberg") + .option("snapshot-ref", "firstag") + .load(tableLocation); + currentSnapshotResult.show(); + List currentSnapshotRecords = currentSnapshotResult.orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + expectedRecords.addAll(secondBatchRecords); + Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + + // verify records in the previous snapshot + Snapshot currentSnapshot = table.currentSnapshot(); + Long parentSnapshotId = currentSnapshot.parentId(); + table.manageSnapshots().createTag("secondtag", parentSnapshotId).commit(); + Dataset previousSnapshotResult = spark.read() + .format("iceberg") + .option("snapshot-ref", "secondtag") + .load(tableLocation); + previousSnapshotResult.show(); + List previousSnapshotRecords = previousSnapshotResult.orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + Assert.assertEquals("Previous snapshot rows should match", firstBatchRecords, previousSnapshotRecords); + } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 9661cfe20b1c..bb5c0ef1492f 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -226,4 +226,62 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .hasMessageContaining("Cannot specify both snapshot-id") .hasMessageContaining("and as-of-timestamp"); } + + @Test + public void testSnapshotSelectionByRef() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + // produce the second snapshot + List secondBatchRecords = Lists.newArrayList( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e"), + new SimpleRecord(6, "f") + ); + Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); + secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + Assert.assertEquals("Expected 2 snapshots", 2, Iterables.size(table.snapshots())); + + // verify records in the current snapshot + table.manageSnapshots().createTag("firstag", table.currentSnapshot().snapshotId()).commit(); + Dataset currentSnapshotResult = spark.read() + .format("iceberg") + .option("snapshot-ref", "firstag") + .load(tableLocation); + currentSnapshotResult.show(); + List currentSnapshotRecords = currentSnapshotResult.orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + expectedRecords.addAll(secondBatchRecords); + Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + + // verify records in the previous snapshot + Snapshot currentSnapshot = table.currentSnapshot(); + Long parentSnapshotId = currentSnapshot.parentId(); + table.manageSnapshots().createTag("secondtag", parentSnapshotId).commit(); + Dataset previousSnapshotResult = spark.read() + .format("iceberg") + .option("snapshot-ref", "secondtag") + .load(tableLocation); + previousSnapshotResult.show(); + List previousSnapshotRecords = previousSnapshotResult.orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + Assert.assertEquals("Previous snapshot rows should match", firstBatchRecords, previousSnapshotRecords); + } } From 26003f322853785bf4d6a42bfef0e7d824d359d2 Mon Sep 17 00:00:00 2001 From: nkeshavaprakash Date: Fri, 5 Aug 2022 16:28:47 -0700 Subject: [PATCH 07/20] Adding checks to snapshot ref --- .../iceberg/spark/source/SparkBatchQueryScan.java | 12 ++++++++++-- .../iceberg/spark/source/SparkScanBuilder.java | 13 ++++++++++++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index f8c52011d6a6..e8d3e3b03607 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -68,18 +68,26 @@ class SparkBatchQueryScan extends SparkBatchScan { if (snapshotId != null && asOfTimestamp != null) { throw new IllegalArgumentException( "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); + } else if (snapshotId != null && snapshotRef != null) { + throw new IllegalArgumentException( + "Cannot scan using both snapshot-id and snapshot-ref to select the table snapshot"); + } else if(asOfTimestamp!= null && snapshotRef != null) { + throw new IllegalArgumentException( + "Cannot scan using both as-of-timestamp and snapshot-ref to select the table snapshot"); } this.startSnapshotId = readConf.startSnapshotId(); this.endSnapshotId = readConf.endSnapshotId(); - if (snapshotId != null || asOfTimestamp != null) { + if (snapshotId != null || asOfTimestamp != null || snapshotRef != null) { if (startSnapshotId != null || endSnapshotId != null) { throw new IllegalArgumentException( "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either " + SparkReadOptions.SNAPSHOT_ID + " or " + SparkReadOptions.AS_OF_TIMESTAMP - + " is specified"); + + " or " + + SparkReadOptions.SNAPSHOT_REF + + " is specified "); } } else if (startSnapshotId == null && endSnapshotId != null) { throw new IllegalArgumentException( diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 42fed5dc58ab..524d883da694 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -189,11 +189,22 @@ public Scan build() { "Cannot set both %s and %s to select which table snapshot to scan", SparkReadOptions.SNAPSHOT_ID, SparkReadOptions.AS_OF_TIMESTAMP); + Preconditions.checkArgument( + snapshotId == null || snapshotRef == null, + "Cannot set both %s and %s to select which table snapshot to scan", + SparkReadOptions.SNAPSHOT_ID, + SparkReadOptions.SNAPSHOT_REF); + Preconditions.checkArgument( + asOfTimestamp == null || snapshotRef == null, + "Cannot set both %s and %s to select which table snapshot to scan", + SparkReadOptions.AS_OF_TIMESTAMP, + SparkReadOptions.SNAPSHOT_REF); + Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); - if (snapshotId != null || asOfTimestamp != null) { + if (snapshotId != null || asOfTimestamp != null || snapshotRef != null) { Preconditions.checkArgument( startSnapshotId == null && endSnapshotId == null, "Cannot set %s and %s for incremental scans when either %s or %s is set", From 2fe1e0dc0b693845d8004d00e34ffb618c142195 Mon Sep 17 00:00:00 2001 From: namrathamk <85421654+namrathamk@users.noreply.github.com> Date: Thu, 11 Aug 2022 05:46:44 +0530 Subject: [PATCH 08/20] Adding support to only spark 3.3 --- .../org/apache/iceberg/BaseTableScan.java | 4 +- .../apache/iceberg/spark/SparkConfParser.java | 3 - .../apache/iceberg/spark/SparkReadConf.java | 2 - .../iceberg/spark/SparkReadOptions.java | 3 - .../spark/source/SparkBatchQueryScan.java | 15 +--- .../spark/source/TestSnapshotSelection.java | 58 ------------- .../apache/iceberg/spark/SparkReadConf.java | 2 - .../iceberg/spark/SparkReadOptions.java | 3 - .../spark/source/TestSnapshotSelection.java | 58 ------------- .../apache/iceberg/spark/SparkReadConf.java | 8 ++ .../iceberg/spark/SparkReadOptions.java | 6 ++ .../spark/source/SparkBatchQueryScan.java | 19 ++++- .../spark/source/SparkScanBuilder.java | 27 +++++- .../spark/source/TestSnapshotSelection.java | 84 +++++++++++++++++++ 14 files changed, 144 insertions(+), 148 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index 36d4cdcb77b0..1ebdc8075f67 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -96,8 +96,8 @@ public TableScan useSnapshot(long scanSnapshotId) { @Override public TableScan useSnapshotRef(String snapshotRef) { - Preconditions.checkArgument(table().snapshot(snapshotRef) != null, - "Cannot find ref with name %s", snapshotRef); + Preconditions.checkArgument( + table().snapshot(snapshotRef) != null, "Cannot find ref with name %s", snapshotRef); return useSnapshot(table().snapshot(snapshotRef).snapshotId()); } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index 2a89c742249a..bf19dd0d8e47 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -133,9 +133,6 @@ public String parse() { return parse(Function.identity(), defaultValue); } - public String parseOptional() { - return parse(Function.identity(), null); - } } abstract class ConfParser { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 984bdf47c4f6..937c31e45960 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -88,8 +88,6 @@ public Long endSnapshotId() { return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional(); } - public String snapshotRef() {return confParser.stringConf().option(SparkReadOptions.SNAPSHOT_REF).parseOptional();} - public boolean streamingSkipDeleteSnapshots() { return confParser .booleanConf() diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 2e104d43345a..d13e80d40004 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -35,9 +35,6 @@ private SparkReadOptions() {} // A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. public static final String AS_OF_TIMESTAMP = "as-of-timestamp"; - // Snapshot Ref of the table snapshot to read from - public static final String SNAPSHOT_REF = "snapshot-ref"; - // Overrides the table's read.split.target-size and read.split.metadata-target-size public static final String SPLIT_SIZE = "split-size"; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index e8d3e3b03607..73023a0ff7ee 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -41,7 +41,6 @@ class SparkBatchQueryScan extends SparkBatchScan { private final Long snapshotId; private final Long startSnapshotId; private final Long endSnapshotId; - private final String snapshotRef; private final Long asOfTimestamp; private final Long splitSize; private final Integer splitLookback; @@ -62,23 +61,15 @@ class SparkBatchQueryScan extends SparkBatchScan { this.snapshotId = readConf.snapshotId(); this.asOfTimestamp = readConf.asOfTimestamp(); - this.snapshotRef = readConf.snapshotRef(); - if (snapshotId != null && asOfTimestamp != null) { throw new IllegalArgumentException( "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); - } else if (snapshotId != null && snapshotRef != null) { - throw new IllegalArgumentException( - "Cannot scan using both snapshot-id and snapshot-ref to select the table snapshot"); - } else if(asOfTimestamp!= null && snapshotRef != null) { - throw new IllegalArgumentException( - "Cannot scan using both as-of-timestamp and snapshot-ref to select the table snapshot"); } this.startSnapshotId = readConf.startSnapshotId(); this.endSnapshotId = readConf.endSnapshotId(); - if (snapshotId != null || asOfTimestamp != null || snapshotRef != null) { + if (snapshotId != null || asOfTimestamp != null) { if (startSnapshotId != null || endSnapshotId != null) { throw new IllegalArgumentException( "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either " @@ -114,10 +105,6 @@ protected List tasks() { scan = scan.asOfTime(asOfTimestamp); } - if (snapshotRef != null) { - scan = scan.useSnapshotRef(snapshotRef); - } - if (startSnapshotId != null) { if (endSnapshotId != null) { scan = scan.appendsBetween(startSnapshotId, endSnapshotId); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index bb5c0ef1492f..9661cfe20b1c 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -226,62 +226,4 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .hasMessageContaining("Cannot specify both snapshot-id") .hasMessageContaining("and as-of-timestamp"); } - - @Test - public void testSnapshotSelectionByRef() throws IOException { - String tableLocation = temp.newFolder("iceberg-table").toString(); - - HadoopTables tables = new HadoopTables(CONF); - PartitionSpec spec = PartitionSpec.unpartitioned(); - Table table = tables.create(SCHEMA, spec, tableLocation); - - // produce the first snapshot - List firstBatchRecords = Lists.newArrayList( - new SimpleRecord(1, "a"), - new SimpleRecord(2, "b"), - new SimpleRecord(3, "c") - ); - Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); - firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); - - // produce the second snapshot - List secondBatchRecords = Lists.newArrayList( - new SimpleRecord(4, "d"), - new SimpleRecord(5, "e"), - new SimpleRecord(6, "f") - ); - Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); - secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); - - Assert.assertEquals("Expected 2 snapshots", 2, Iterables.size(table.snapshots())); - - // verify records in the current snapshot - table.manageSnapshots().createTag("firstag", table.currentSnapshot().snapshotId()).commit(); - Dataset currentSnapshotResult = spark.read() - .format("iceberg") - .option("snapshot-ref", "firstag") - .load(tableLocation); - currentSnapshotResult.show(); - List currentSnapshotRecords = currentSnapshotResult.orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); - List expectedRecords = Lists.newArrayList(); - expectedRecords.addAll(firstBatchRecords); - expectedRecords.addAll(secondBatchRecords); - Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords); - - // verify records in the previous snapshot - Snapshot currentSnapshot = table.currentSnapshot(); - Long parentSnapshotId = currentSnapshot.parentId(); - table.manageSnapshots().createTag("secondtag", parentSnapshotId).commit(); - Dataset previousSnapshotResult = spark.read() - .format("iceberg") - .option("snapshot-ref", "secondtag") - .load(tableLocation); - previousSnapshotResult.show(); - List previousSnapshotRecords = previousSnapshotResult.orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); - Assert.assertEquals("Previous snapshot rows should match", firstBatchRecords, previousSnapshotRecords); - } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 52553e6214ad..ef262e11f02b 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -94,8 +94,6 @@ public Long endSnapshotId() { return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional(); } - public String snapshotRef() { return confParser.stringConf().option(SparkReadOptions.SNAPSHOT_REF).parseOptional(); } - public String fileScanTaskSetId() { return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional(); } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 2971f92ae494..d13e80d40004 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -35,9 +35,6 @@ private SparkReadOptions() {} // A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. public static final String AS_OF_TIMESTAMP = "as-of-timestamp"; - // A snapshot ref name that will be used to fetch snapshot pointed by ref - public static final String SNAPSHOT_REF = "snapshot-ref"; - // Overrides the table's read.split.target-size and read.split.metadata-target-size public static final String SPLIT_SIZE = "split-size"; diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index bb5c0ef1492f..9661cfe20b1c 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -226,62 +226,4 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .hasMessageContaining("Cannot specify both snapshot-id") .hasMessageContaining("and as-of-timestamp"); } - - @Test - public void testSnapshotSelectionByRef() throws IOException { - String tableLocation = temp.newFolder("iceberg-table").toString(); - - HadoopTables tables = new HadoopTables(CONF); - PartitionSpec spec = PartitionSpec.unpartitioned(); - Table table = tables.create(SCHEMA, spec, tableLocation); - - // produce the first snapshot - List firstBatchRecords = Lists.newArrayList( - new SimpleRecord(1, "a"), - new SimpleRecord(2, "b"), - new SimpleRecord(3, "c") - ); - Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); - firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); - - // produce the second snapshot - List secondBatchRecords = Lists.newArrayList( - new SimpleRecord(4, "d"), - new SimpleRecord(5, "e"), - new SimpleRecord(6, "f") - ); - Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); - secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); - - Assert.assertEquals("Expected 2 snapshots", 2, Iterables.size(table.snapshots())); - - // verify records in the current snapshot - table.manageSnapshots().createTag("firstag", table.currentSnapshot().snapshotId()).commit(); - Dataset currentSnapshotResult = spark.read() - .format("iceberg") - .option("snapshot-ref", "firstag") - .load(tableLocation); - currentSnapshotResult.show(); - List currentSnapshotRecords = currentSnapshotResult.orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); - List expectedRecords = Lists.newArrayList(); - expectedRecords.addAll(firstBatchRecords); - expectedRecords.addAll(secondBatchRecords); - Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords); - - // verify records in the previous snapshot - Snapshot currentSnapshot = table.currentSnapshot(); - Long parentSnapshotId = currentSnapshot.parentId(); - table.manageSnapshots().createTag("secondtag", parentSnapshotId).commit(); - Dataset previousSnapshotResult = spark.read() - .format("iceberg") - .option("snapshot-ref", "secondtag") - .load(tableLocation); - previousSnapshotResult.show(); - List previousSnapshotRecords = previousSnapshotResult.orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); - Assert.assertEquals("Previous snapshot rows should match", firstBatchRecords, previousSnapshotRecords); - } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index ef262e11f02b..85c71827d7ac 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -94,6 +94,14 @@ public Long endSnapshotId() { return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional(); } + public String branch() { + return confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional(); + } + + public String tag() { + return confParser.stringConf().option(SparkReadOptions.TAG).parseOptional(); + } + public String fileScanTaskSetId() { return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional(); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 9515a48bc297..d1b25478cb5f 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -35,6 +35,12 @@ private SparkReadOptions() {} // A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. public static final String AS_OF_TIMESTAMP = "as-of-timestamp"; + // branch ref of the table snapshot to read from + public static final String BRANCH = "branch"; + + // tag ref of the table snapshot to read from + public static final String TAG = "tag"; + // Overrides the table's read.split.target-size and read.split.metadata-target-size public static final String SPLIT_SIZE = "split-size"; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 59dd8759968f..943e096e4b6e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -67,6 +67,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering private final Long startSnapshotId; private final Long endSnapshotId; private final Long asOfTimestamp; + private final String branch; + private final String tag; private final List runtimeFilterExpressions; private Set specIds = null; // lazy cache of scanned spec IDs @@ -88,6 +90,8 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering this.startSnapshotId = readConf.startSnapshotId(); this.endSnapshotId = readConf.endSnapshotId(); this.asOfTimestamp = readConf.asOfTimestamp(); + this.branch = readConf.branch(); + this.tag = readConf.tag(); this.runtimeFilterExpressions = Lists.newArrayList(); if (scan == null) { @@ -244,6 +248,13 @@ public Statistics estimateStatistics() { Snapshot snapshot = table().snapshot(snapshotIdAsOfTime); return estimateStatistics(snapshot); + } else if (branch != null) { + Snapshot snapshot = table().snapshot(branch); + return estimateStatistics(snapshot); + + } else if (tag != null) { + Snapshot snapshot = table().snapshot(tag); + return estimateStatistics(snapshot); } else { Snapshot snapshot = table().currentSnapshot(); return estimateStatistics(snapshot); @@ -269,7 +280,9 @@ && readSchema().equals(that.readSchema()) && Objects.equals(snapshotId, that.snapshotId) && Objects.equals(startSnapshotId, that.startSnapshotId) && Objects.equals(endSnapshotId, that.endSnapshotId) - && Objects.equals(asOfTimestamp, that.asOfTimestamp); + && Objects.equals(asOfTimestamp, that.asOfTimestamp) + && Objects.equals(branch, that.branch) + && Objects.equals(branch, that.tag); } @Override @@ -282,7 +295,9 @@ public int hashCode() { snapshotId, startSnapshotId, endSnapshotId, - asOfTimestamp); + asOfTimestamp, + branch, + tag); } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 21c34ed6f628..2353377c6c02 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.arrow.util.Preconditions; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -182,6 +183,14 @@ private Schema schemaWithMetadataColumns() { public Scan build() { Long snapshotId = readConf.snapshotId(); Long asOfTimestamp = readConf.asOfTimestamp(); + String branch = readConf.branch(); + String tag = readConf.branch(); + + Preconditions.checkArgument( + branch == null || tag == null, + "Cannot set both %s and %s to select which table snapshot to scan", + SparkReadOptions.BRANCH, + SparkReadOptions.TAG); Preconditions.checkArgument( snapshotId == null || asOfTimestamp == null, @@ -189,6 +198,13 @@ public Scan build() { SparkReadOptions.SNAPSHOT_ID, SparkReadOptions.AS_OF_TIMESTAMP); + String snapshotRef = branch != null ? branch : tag; + Preconditions.checkArgument( + snapshotId == null || snapshotRef == null, + "Cannot set both %s and %s to select which table snapshot to scan", + SparkReadOptions.SNAPSHOT_ID, + "branch/tag"); + Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); @@ -225,6 +241,12 @@ public Scan build() { scan = scan.asOfTime(asOfTimestamp); } + if (branch != null) { + scan.useSnapshotRef(branch); + } else if (tag != null) { + scan.useSnapshotRef(tag); + } + if (startSnapshotId != null) { if (endSnapshotId != null) { scan = scan.appendsBetween(startSnapshotId, endSnapshotId); @@ -240,7 +262,10 @@ public Scan build() { public Scan buildMergeOnReadScan() { Preconditions.checkArgument( - readConf.snapshotId() == null && readConf.asOfTimestamp() == null, + readConf.snapshotId() == null + && readConf.asOfTimestamp() == null + && readConf.branch() == null + && readConf.tag() == null, "Cannot set time travel options %s and %s for row-level command scans", SparkReadOptions.SNAPSHOT_ID, SparkReadOptions.AS_OF_TIMESTAMP); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 9661cfe20b1c..aae10ccd5764 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -226,4 +226,88 @@ public void testSnapshotSelectionBySnapshotIdAndTimestamp() throws IOException { .hasMessageContaining("Cannot specify both snapshot-id") .hasMessageContaining("and as-of-timestamp"); } + + @Test + public void testSnapshotSelectionByTag() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); + + // produce the second snapshot + List secondBatchRecords = Lists.newArrayList( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e"), + new SimpleRecord(6, "f") + ); + Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); + secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + // verify records in the current snapshot by tag + Dataset currentSnapshotResult = spark.read() + .format("iceberg") + .option("tag", "tag") + .load(tableLocation); + currentSnapshotResult.show(); + List currentSnapshotRecords = currentSnapshotResult.orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + } + + @Test + public void testSnapshotSelectionByBranch() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + + // produce the second snapshot + List secondBatchRecords = Lists.newArrayList( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e"), + new SimpleRecord(6, "f") + ); + Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); + secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + // verify records in the current snapshot by tag + Dataset currentSnapshotResult = spark.read() + .format("iceberg") + .option("branch", "branch") + .load(tableLocation); + currentSnapshotResult.show(); + List currentSnapshotRecords = currentSnapshotResult.orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(firstBatchRecords); + Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + } } From 8b29f004e777ab57faa9d2cf2dcdf07efdaf3bec Mon Sep 17 00:00:00 2001 From: namrathamk <85421654+namrathamk@users.noreply.github.com> Date: Thu, 11 Aug 2022 06:05:36 +0530 Subject: [PATCH 09/20] Adding support to only spark 3.3 Adding support to only spark 3.3 Adding support to only spark 3.3 --- .../apache/iceberg/spark/SparkConfParser.java | 1 - .../spark/source/SparkBatchQueryScan.java | 4 +--- .../spark/source/SparkBatchQueryScan.java | 8 ++------ .../iceberg/spark/source/SparkScanBuilder.java | 18 +----------------- 4 files changed, 4 insertions(+), 27 deletions(-) diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index bf19dd0d8e47..33e5ca936800 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -132,7 +132,6 @@ public String parse() { Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Function.identity(), defaultValue); } - } abstract class ConfParser { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 73023a0ff7ee..4fcab5517d44 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -76,9 +76,7 @@ class SparkBatchQueryScan extends SparkBatchScan { + SparkReadOptions.SNAPSHOT_ID + " or " + SparkReadOptions.AS_OF_TIMESTAMP - + " or " - + SparkReadOptions.SNAPSHOT_REF - + " is specified "); + + " is specified"); } } else if (startSnapshotId == null && endSnapshotId != null) { throw new IllegalArgumentException( diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 17e91bc16537..59dd8759968f 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -67,7 +67,6 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering private final Long startSnapshotId; private final Long endSnapshotId; private final Long asOfTimestamp; - private final String snapshotRef; private final List runtimeFilterExpressions; private Set specIds = null; // lazy cache of scanned spec IDs @@ -89,7 +88,6 @@ class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering this.startSnapshotId = readConf.startSnapshotId(); this.endSnapshotId = readConf.endSnapshotId(); this.asOfTimestamp = readConf.asOfTimestamp(); - this.snapshotRef = readConf.snapshotRef(); this.runtimeFilterExpressions = Lists.newArrayList(); if (scan == null) { @@ -271,8 +269,7 @@ && readSchema().equals(that.readSchema()) && Objects.equals(snapshotId, that.snapshotId) && Objects.equals(startSnapshotId, that.startSnapshotId) && Objects.equals(endSnapshotId, that.endSnapshotId) - && Objects.equals(asOfTimestamp, that.asOfTimestamp) - && Objects.equals(snapshotRef, that.snapshotRef); + && Objects.equals(asOfTimestamp, that.asOfTimestamp); } @Override @@ -285,8 +282,7 @@ public int hashCode() { snapshotId, startSnapshotId, endSnapshotId, - asOfTimestamp, - snapshotRef); + asOfTimestamp); } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 524d883da694..21c34ed6f628 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -182,29 +182,17 @@ private Schema schemaWithMetadataColumns() { public Scan build() { Long snapshotId = readConf.snapshotId(); Long asOfTimestamp = readConf.asOfTimestamp(); - String snapshotRef = readConf.snapshotRef(); Preconditions.checkArgument( snapshotId == null || asOfTimestamp == null, "Cannot set both %s and %s to select which table snapshot to scan", SparkReadOptions.SNAPSHOT_ID, SparkReadOptions.AS_OF_TIMESTAMP); - Preconditions.checkArgument( - snapshotId == null || snapshotRef == null, - "Cannot set both %s and %s to select which table snapshot to scan", - SparkReadOptions.SNAPSHOT_ID, - SparkReadOptions.SNAPSHOT_REF); - Preconditions.checkArgument( - asOfTimestamp == null || snapshotRef == null, - "Cannot set both %s and %s to select which table snapshot to scan", - SparkReadOptions.AS_OF_TIMESTAMP, - SparkReadOptions.SNAPSHOT_REF); - Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); - if (snapshotId != null || asOfTimestamp != null || snapshotRef != null) { + if (snapshotId != null || asOfTimestamp != null) { Preconditions.checkArgument( startSnapshotId == null && endSnapshotId == null, "Cannot set %s and %s for incremental scans when either %s or %s is set", @@ -237,10 +225,6 @@ public Scan build() { scan = scan.asOfTime(asOfTimestamp); } - if (snapshotRef != null) { - scan = scan.useSnapshotRef(snapshotRef); - } - if (startSnapshotId != null) { if (endSnapshotId != null) { scan = scan.appendsBetween(startSnapshotId, endSnapshotId); From b9a78031c3afdb938384a3f93d4827e89bf60e2d Mon Sep 17 00:00:00 2001 From: namrathamk <85421654+namrathamk@users.noreply.github.com> Date: Fri, 12 Aug 2022 19:12:23 +0530 Subject: [PATCH 10/20] Adding support to only spark 3.3 --- .../spark/source/TestSnapshotSelection.java | 60 ++++++++----------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index aae10ccd5764..6dc9b7924b00 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -236,37 +236,31 @@ public void testSnapshotSelectionByTag() throws IOException { Table table = tables.create(SCHEMA, spec, tableLocation); // produce the first snapshot - List firstBatchRecords = Lists.newArrayList( - new SimpleRecord(1, "a"), - new SimpleRecord(2, "b"), - new SimpleRecord(3, "c") - ); + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); // produce the second snapshot - List secondBatchRecords = Lists.newArrayList( - new SimpleRecord(4, "d"), - new SimpleRecord(5, "e"), - new SimpleRecord(6, "f") - ); + List secondBatchRecords = + Lists.newArrayList( + new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")); Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); // verify records in the current snapshot by tag - Dataset currentSnapshotResult = spark.read() - .format("iceberg") - .option("tag", "tag") - .load(tableLocation); + Dataset currentSnapshotResult = + spark.read().format("iceberg").option("tag", "tag").load(tableLocation); currentSnapshotResult.show(); - List currentSnapshotRecords = currentSnapshotResult.orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); + List currentSnapshotRecords = + currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); List expectedRecords = Lists.newArrayList(); expectedRecords.addAll(firstBatchRecords); - Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, currentSnapshotRecords); } @Test @@ -278,36 +272,30 @@ public void testSnapshotSelectionByBranch() throws IOException { Table table = tables.create(SCHEMA, spec, tableLocation); // produce the first snapshot - List firstBatchRecords = Lists.newArrayList( - new SimpleRecord(1, "a"), - new SimpleRecord(2, "b"), - new SimpleRecord(3, "c") - ); + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); // produce the second snapshot - List secondBatchRecords = Lists.newArrayList( - new SimpleRecord(4, "d"), - new SimpleRecord(5, "e"), - new SimpleRecord(6, "f") - ); + List secondBatchRecords = + Lists.newArrayList( + new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f")); Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); // verify records in the current snapshot by tag - Dataset currentSnapshotResult = spark.read() - .format("iceberg") - .option("branch", "branch") - .load(tableLocation); + Dataset currentSnapshotResult = + spark.read().format("iceberg").option("branch", "branch").load(tableLocation); currentSnapshotResult.show(); - List currentSnapshotRecords = currentSnapshotResult.orderBy("id") - .as(Encoders.bean(SimpleRecord.class)) - .collectAsList(); + List currentSnapshotRecords = + currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); List expectedRecords = Lists.newArrayList(); expectedRecords.addAll(firstBatchRecords); - Assert.assertEquals("Current snapshot rows should match", expectedRecords, currentSnapshotRecords); + Assert.assertEquals( + "Current snapshot rows should match", expectedRecords, currentSnapshotRecords); } } From 55a3595386aaeca7c61a64bc8976699f89c80903 Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Thu, 20 Oct 2022 13:45:19 -0600 Subject: [PATCH 11/20] Adding more tests --- .../spark/source/SparkScanBuilder.java | 26 ++------------- .../spark/source/TestSnapshotSelection.java | 32 +++++++++++++++++-- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index ca002738b5a2..950d4bea478a 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -22,7 +22,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.IncrementalChangelogScan; -import org.apache.arrow.util.Preconditions; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -187,29 +186,10 @@ public Scan build() { String branch = readConf.branch(); String tag = readConf.branch(); - Preconditions.checkArgument( - branch == null || tag == null, - "Cannot set both %s and %s to select which table snapshot to scan", - SparkReadOptions.BRANCH, - SparkReadOptions.TAG); - - Preconditions.checkArgument( - snapshotId == null || asOfTimestamp == null, - "Cannot set both %s and %s to select which table snapshot to scan", - SparkReadOptions.SNAPSHOT_ID, - SparkReadOptions.AS_OF_TIMESTAMP); - - String snapshotRef = branch != null ? branch : tag; - Preconditions.checkArgument( - snapshotId == null || snapshotRef == null, - "Cannot set both %s and %s to select which table snapshot to scan", - SparkReadOptions.SNAPSHOT_ID, - "branch/tag"); - Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); - if (snapshotId != null || asOfTimestamp != null) { + if (snapshotId != null || asOfTimestamp != null || branch != null || tag != null) { Preconditions.checkArgument( startSnapshotId == null && endSnapshotId == null, "Cannot set %s and %s for incremental scans when either %s or %s is set", @@ -243,9 +223,9 @@ public Scan build() { } if (branch != null) { - scan.useSnapshotRef(branch); + scan.useRef(branch); } else if (tag != null) { - scan.useSnapshotRef(tag); + scan.useRef(tag); } if (startSnapshotId != null) { diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 6dc9b7924b00..33363d4b644b 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -254,7 +254,6 @@ public void testSnapshotSelectionByTag() throws IOException { // verify records in the current snapshot by tag Dataset currentSnapshotResult = spark.read().format("iceberg").option("tag", "tag").load(tableLocation); - currentSnapshotResult.show(); List currentSnapshotRecords = currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); List expectedRecords = Lists.newArrayList(); @@ -290,7 +289,6 @@ public void testSnapshotSelectionByBranch() throws IOException { // verify records in the current snapshot by tag Dataset currentSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); - currentSnapshotResult.show(); List currentSnapshotRecords = currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); List expectedRecords = Lists.newArrayList(); @@ -298,4 +296,34 @@ public void testSnapshotSelectionByBranch() throws IOException { Assert.assertEquals( "Current snapshot rows should match", expectedRecords, currentSnapshotRecords); } + + @Test + public void testSnapshotSelectionByBranchAndTag() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + // produce the first snapshot + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); + + Assertions.assertThatThrownBy( + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.BRANCH, "branch") + .option(SparkReadOptions.TAG, "tag") + .load(tableLocation)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot override ref, already set snapshot id=1"); + } } From 8c86fc857db1e8088dd3320cff6daa297e23cca2 Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Sat, 22 Oct 2022 15:30:34 -0600 Subject: [PATCH 12/20] fix tests --- .../spark/source/SparkBatchQueryScan.java | 8 +++---- .../spark/source/SparkScanBuilder.java | 2 +- .../spark/source/TestSnapshotSelection.java | 22 +++++++++---------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index a54af5c2a298..ee3730e12045 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -255,11 +255,11 @@ public Statistics estimateStatistics() { } else if (tag != null) { Snapshot snapshot = table().snapshot(tag); return estimateStatistics(snapshot); + } else { Snapshot snapshot = table().currentSnapshot(); return estimateStatistics(snapshot); } - } @Override @@ -281,9 +281,9 @@ && readSchema().equals(that.readSchema()) && Objects.equals(snapshotId, that.snapshotId) && Objects.equals(startSnapshotId, that.startSnapshotId) && Objects.equals(endSnapshotId, that.endSnapshotId) - && Objects.equals(asOfTimestamp, that.asOfTimestamp) - && Objects.equals(branch, that.branch) - && Objects.equals(branch, that.tag); + && Objects.equals(asOfTimestamp, that.asOfTimestamp); + // && Objects.equals(branch, that.branch) + // && Objects.equals(tag, that.tag); } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 950d4bea478a..83870297d0ae 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -189,7 +189,7 @@ public Scan build() { Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); - if (snapshotId != null || asOfTimestamp != null || branch != null || tag != null) { + if (snapshotId != null || asOfTimestamp != null) { Preconditions.checkArgument( startSnapshotId == null && endSnapshotId == null, "Cannot set %s and %s for incremental scans when either %s or %s is set", diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 33363d4b644b..da76fb202983 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -307,8 +307,8 @@ public void testSnapshotSelectionByBranchAndTag() throws IOException { // produce the first snapshot List firstBatchRecords = - Lists.newArrayList( - new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); @@ -316,14 +316,14 @@ public void testSnapshotSelectionByBranchAndTag() throws IOException { table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); Assertions.assertThatThrownBy( - () -> - spark - .read() - .format("iceberg") - .option(SparkReadOptions.BRANCH, "branch") - .option(SparkReadOptions.TAG, "tag") - .load(tableLocation)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot override ref, already set snapshot id=1"); + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.BRANCH, "branch") + .option(SparkReadOptions.TAG, "tag") + .load(tableLocation)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot override ref, already set snapshot id=1"); } } From ac763d94a4059106e97be066b8932db3225afe6f Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Sat, 22 Oct 2022 15:40:56 -0600 Subject: [PATCH 13/20] fix tests --- .../spark/source/TestSnapshotSelection.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index da76fb202983..b205cf073bd1 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -326,4 +326,45 @@ public void testSnapshotSelectionByBranchAndTag() throws IOException { .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Cannot override ref, already set snapshot id=1"); } + + @Test + public void testSnapshotSelectionByTimestampAndBranch() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.unpartitioned(); + Table table = tables.create(SCHEMA, spec, tableLocation); + + List firstBatchRecords = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class); + firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); + + long timestamp = System.currentTimeMillis(); + table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit(); + table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); + + Assertions.assertThatThrownBy( + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) + .option(SparkReadOptions.BRANCH, "branch") + .load(tableLocation)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot override ref, already set snapshot id=1"); + + Assertions.assertThatThrownBy( + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) + .option(SparkReadOptions.TAG, "tag") + .load(tableLocation)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot override ref, already set snapshot id=1"); + } } From fdfb0c6d8b53958f3a28f8dc4cbff2fd6d1b7a53 Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Sat, 29 Oct 2022 20:35:29 -0600 Subject: [PATCH 14/20] Test case fix --- .../spark/source/SparkScanBuilder.java | 10 ++++--- .../spark/source/TestSnapshotSelection.java | 26 +++++++++---------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 83870297d0ae..256bac9a4694 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -184,7 +184,7 @@ public Scan build() { Long snapshotId = readConf.snapshotId(); Long asOfTimestamp = readConf.asOfTimestamp(); String branch = readConf.branch(); - String tag = readConf.branch(); + String tag = readConf.tag(); Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); @@ -223,9 +223,11 @@ public Scan build() { } if (branch != null) { - scan.useRef(branch); - } else if (tag != null) { - scan.useRef(tag); + scan = scan.useRef(branch); + } + + if (tag != null) { + scan = scan.useRef(tag); } if (startSnapshotId != null) { diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index b205cf073bd1..7cd980181d71 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -316,15 +316,15 @@ public void testSnapshotSelectionByBranchAndTag() throws IOException { table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); Assertions.assertThatThrownBy( - () -> - spark - .read() - .format("iceberg") - .option(SparkReadOptions.BRANCH, "branch") - .option(SparkReadOptions.TAG, "tag") - .load(tableLocation)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot override ref, already set snapshot id=1"); + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.TAG, "tag") + .option(SparkReadOptions.BRANCH, "branch") + .load(tableLocation).show()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot override ref, already set snapshot id="); } @Test @@ -352,9 +352,9 @@ public void testSnapshotSelectionByTimestampAndBranch() throws IOException { .format("iceberg") .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) .option(SparkReadOptions.BRANCH, "branch") - .load(tableLocation)) + .load(tableLocation).show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot override ref, already set snapshot id=1"); + .hasMessageStartingWith("Cannot override ref, already set snapshot id="); Assertions.assertThatThrownBy( () -> @@ -363,8 +363,8 @@ public void testSnapshotSelectionByTimestampAndBranch() throws IOException { .format("iceberg") .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) .option(SparkReadOptions.TAG, "tag") - .load(tableLocation)) + .load(tableLocation).show()) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Cannot override ref, already set snapshot id=1"); + .hasMessageStartingWith("Cannot override ref, already set snapshot id="); } } From 4bad10fbc9d8b9e9cd4949fc54252211a62e07c5 Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Sat, 29 Oct 2022 20:38:19 -0600 Subject: [PATCH 15/20] Test case fix spotless --- .../spark/source/TestSnapshotSelection.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index 7cd980181d71..f551046f35ba 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -316,15 +316,16 @@ public void testSnapshotSelectionByBranchAndTag() throws IOException { table.manageSnapshots().createTag("tag", table.currentSnapshot().snapshotId()).commit(); Assertions.assertThatThrownBy( - () -> - spark - .read() - .format("iceberg") - .option(SparkReadOptions.TAG, "tag") - .option(SparkReadOptions.BRANCH, "branch") - .load(tableLocation).show()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("Cannot override ref, already set snapshot id="); + () -> + spark + .read() + .format("iceberg") + .option(SparkReadOptions.TAG, "tag") + .option(SparkReadOptions.BRANCH, "branch") + .load(tableLocation) + .show()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageStartingWith("Cannot override ref, already set snapshot id="); } @Test @@ -352,7 +353,8 @@ public void testSnapshotSelectionByTimestampAndBranch() throws IOException { .format("iceberg") .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) .option(SparkReadOptions.BRANCH, "branch") - .load(tableLocation).show()) + .load(tableLocation) + .show()) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Cannot override ref, already set snapshot id="); @@ -363,7 +365,8 @@ public void testSnapshotSelectionByTimestampAndBranch() throws IOException { .format("iceberg") .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) .option(SparkReadOptions.TAG, "tag") - .load(tableLocation).show()) + .load(tableLocation) + .show()) .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Cannot override ref, already set snapshot id="); } From 00f07a0196993412f56de7905dfce1abda795ced Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Sat, 29 Oct 2022 20:43:19 -0600 Subject: [PATCH 16/20] comments fix --- .../java/org/apache/iceberg/spark/SparkReadOptions.java | 4 ++-- .../org/apache/iceberg/spark/source/SparkScanBuilder.java | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index d1b25478cb5f..96e09d70ef65 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -35,10 +35,10 @@ private SparkReadOptions() {} // A timestamp in milliseconds; the snapshot used will be the snapshot current at this time. public static final String AS_OF_TIMESTAMP = "as-of-timestamp"; - // branch ref of the table snapshot to read from + // Branch to read from public static final String BRANCH = "branch"; - // tag ref of the table snapshot to read from + // Tag to read from public static final String TAG = "tag"; // Overrides the table's read.split.target-size and read.split.metadata-target-size diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 256bac9a4694..0fdfb160b5af 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -186,6 +186,12 @@ public Scan build() { String branch = readConf.branch(); String tag = readConf.tag(); + Preconditions.checkArgument( + snapshotId == null || asOfTimestamp == null, + "Cannot set both %s and %s to select which table snapshot to scan", + SparkReadOptions.SNAPSHOT_ID, + SparkReadOptions.AS_OF_TIMESTAMP); + Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); From f685af46ccbc757f8aea3c19b2825d1df25e4d42 Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Sat, 29 Oct 2022 20:54:22 -0600 Subject: [PATCH 17/20] statement fix --- .../org/apache/iceberg/spark/source/SparkScanBuilder.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 0fdfb160b5af..a2183fd17538 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -287,9 +287,11 @@ public Scan buildMergeOnReadScan() { && readConf.asOfTimestamp() == null && readConf.branch() == null && readConf.tag() == null, - "Cannot set time travel options %s and %s for row-level command scans", + "Cannot set time travel options %s, %s, %s and %s for row-level command scans", SparkReadOptions.SNAPSHOT_ID, - SparkReadOptions.AS_OF_TIMESTAMP); + SparkReadOptions.AS_OF_TIMESTAMP, + SparkReadOptions.BRANCH, + SparkReadOptions.TAG); Preconditions.checkArgument( readConf.startSnapshotId() == null && readConf.endSnapshotId() == null, From d3fa84eed4096170020a2dfc608a661d2df48edb Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Sat, 29 Oct 2022 20:57:07 -0600 Subject: [PATCH 18/20] statement fix --- .../apache/iceberg/spark/source/SparkScanBuilder.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index a2183fd17538..150da814ba9e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -251,10 +251,15 @@ public Scan build() { public Scan buildChangelogScan() { Preconditions.checkArgument( - readConf.snapshotId() == null && readConf.asOfTimestamp() == null, - "Cannot set neither %s nor %s for changelogs", + readConf.snapshotId() == null + && readConf.asOfTimestamp() == null + && readConf.branch() == null + && readConf.tag() == null, + "Cannot set neither %s, %s, %s and %s for changelogs", SparkReadOptions.SNAPSHOT_ID, - SparkReadOptions.AS_OF_TIMESTAMP); + SparkReadOptions.AS_OF_TIMESTAMP, + SparkReadOptions.BRANCH, + SparkReadOptions.TAG); Long startSnapshotId = readConf.startSnapshotId(); Long endSnapshotId = readConf.endSnapshotId(); From a693c7213404d444df42bed663940e736bc7d22d Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Sat, 29 Oct 2022 22:01:17 -0600 Subject: [PATCH 19/20] adding supressions --- .../apache/iceberg/spark/source/SparkBatchQueryScan.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index ee3730e12045..6dcbd5091212 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -263,6 +263,7 @@ public Statistics estimateStatistics() { } @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") public boolean equals(Object o) { if (this == o) { return true; @@ -281,9 +282,9 @@ && readSchema().equals(that.readSchema()) && Objects.equals(snapshotId, that.snapshotId) && Objects.equals(startSnapshotId, that.startSnapshotId) && Objects.equals(endSnapshotId, that.endSnapshotId) - && Objects.equals(asOfTimestamp, that.asOfTimestamp); - // && Objects.equals(branch, that.branch) - // && Objects.equals(tag, that.tag); + && Objects.equals(asOfTimestamp, that.asOfTimestamp) + && Objects.equals(branch, that.branch) + && Objects.equals(tag, that.tag); } @Override From ab0832bfe0d9582e559ea62355332e0eafacc75b Mon Sep 17 00:00:00 2001 From: namrathamyske Date: Sun, 30 Oct 2022 11:11:47 -0600 Subject: [PATCH 20/20] changing comments --- .../apache/iceberg/spark/source/TestSnapshotSelection.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java index f551046f35ba..0b7348fa078a 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -286,7 +286,7 @@ public void testSnapshotSelectionByBranch() throws IOException { Dataset secondDf = spark.createDataFrame(secondBatchRecords, SimpleRecord.class); secondDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation); - // verify records in the current snapshot by tag + // verify records in the current snapshot by branch Dataset currentSnapshotResult = spark.read().format("iceberg").option("branch", "branch").load(tableLocation); List currentSnapshotRecords = @@ -298,7 +298,7 @@ public void testSnapshotSelectionByBranch() throws IOException { } @Test - public void testSnapshotSelectionByBranchAndTag() throws IOException { + public void testSnapshotSelectionByBranchAndTagFails() throws IOException { String tableLocation = temp.newFolder("iceberg-table").toString(); HadoopTables tables = new HadoopTables(CONF); @@ -329,7 +329,7 @@ public void testSnapshotSelectionByBranchAndTag() throws IOException { } @Test - public void testSnapshotSelectionByTimestampAndBranch() throws IOException { + public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOException { String tableLocation = temp.newFolder("iceberg-table").toString(); HadoopTables tables = new HadoopTables(CONF);