From 4afc8b462c108a31467d11804dd539705962d0c1 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Tue, 5 Dec 2023 17:14:09 +0100 Subject: [PATCH] Spark: Don't allow branch_ usage with VERSION AS OF (#9219) --- .../org/apache/iceberg/spark/SparkCatalog.java | 2 +- .../apache/iceberg/spark/source/SparkTable.java | 4 ++++ .../org/apache/iceberg/spark/sql/TestSelect.java | 15 +++++++++++++++ .../org/apache/iceberg/spark/SparkCatalog.java | 2 +- .../apache/iceberg/spark/source/SparkTable.java | 4 ++++ .../org/apache/iceberg/spark/sql/TestSelect.java | 15 +++++++++++++++ .../org/apache/iceberg/spark/SparkCatalog.java | 2 +- .../apache/iceberg/spark/source/SparkTable.java | 4 ++++ .../org/apache/iceberg/spark/sql/TestSelect.java | 15 +++++++++++++++ 9 files changed, 60 insertions(+), 3 deletions(-) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 02bbec6824f3..48decf995fbb 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -171,7 +171,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep SparkTable sparkTable = (SparkTable) table; Preconditions.checkArgument( - sparkTable.snapshotId() == null, + sparkTable.snapshotId() == null && sparkTable.branch() == null, "Cannot do time-travel based on both table identifier and AS OF"); try { diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 574d014e8335..eddcdb1819ee 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -173,6 +173,10 @@ public Long snapshotId() { return snapshotId; } + public String branch() { + return branch; + } + public SparkTable copyWithSnapshotId(long newSnapshotId) { return new SparkTable(icebergTable, newSnapshotId, refreshEagerly); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index e08bc4574dbf..1368c26792ee 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -426,6 +426,21 @@ public void testInvalidTimeTravelBasedOnBothAsOfAndTableIdentifier() { }); } + @Test + public void testInvalidTimeTravelAgainstBranchIdentifierWithAsOf() { + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + validationCatalog.loadTable(tableIdent).manageSnapshots().createBranch("b1").commit(); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + // using branch_b1 in the table identifier and VERSION AS OF + Assertions.assertThatThrownBy( + () -> sql("SELECT * FROM %s.branch_b1 VERSION AS OF %s", tableName, snapshotId)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot do time-travel based on both table identifier and AS OF"); + } + @Test public void testSpecifySnapshotAndTimestamp() { // get the snapshot ID of the last write diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 6958ebc1034a..6b7becc77cd7 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -170,7 +170,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep SparkTable sparkTable = (SparkTable) table; Preconditions.checkArgument( - sparkTable.snapshotId() == null, + sparkTable.snapshotId() == null && sparkTable.branch() == null, "Cannot do time-travel based on both table identifier and AS OF"); try { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index e200bee03e9f..bbc7434138ed 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -173,6 +173,10 @@ public Long snapshotId() { return snapshotId; } + public String branch() { + return branch; + } + public SparkTable copyWithSnapshotId(long newSnapshotId) { return new SparkTable(icebergTable, newSnapshotId, refreshEagerly); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index dacaee7d8030..161c2e0ba637 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -421,6 +421,21 @@ public void testInvalidTimeTravelBasedOnBothAsOfAndTableIdentifier() { .hasMessage("Cannot do time-travel based on both table identifier and AS OF"); } + @Test + public void testInvalidTimeTravelAgainstBranchIdentifierWithAsOf() { + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + validationCatalog.loadTable(tableIdent).manageSnapshots().createBranch("b1").commit(); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + // using branch_b1 in the table identifier and VERSION AS OF + Assertions.assertThatThrownBy( + () -> sql("SELECT * FROM %s.branch_b1 VERSION AS OF %s", tableName, snapshotId)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot do time-travel based on both table identifier and AS OF"); + } + @Test public void testSpecifySnapshotAndTimestamp() { // get the snapshot ID of the last write diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 467b42899480..eef0f0703bc3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -170,7 +170,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep SparkTable sparkTable = (SparkTable) table; Preconditions.checkArgument( - sparkTable.snapshotId() == null, + sparkTable.snapshotId() == null && sparkTable.branch() == null, "Cannot do time-travel based on both table identifier and AS OF"); try { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index e200bee03e9f..bbc7434138ed 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -173,6 +173,10 @@ public Long snapshotId() { return snapshotId; } + public String branch() { + return branch; + } + public SparkTable copyWithSnapshotId(long newSnapshotId) { return new SparkTable(icebergTable, newSnapshotId, refreshEagerly); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index dacaee7d8030..161c2e0ba637 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -421,6 +421,21 @@ public void testInvalidTimeTravelBasedOnBothAsOfAndTableIdentifier() { .hasMessage("Cannot do time-travel based on both table identifier and AS OF"); } + @Test + public void testInvalidTimeTravelAgainstBranchIdentifierWithAsOf() { + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + validationCatalog.loadTable(tableIdent).manageSnapshots().createBranch("b1").commit(); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + // using branch_b1 in the table identifier and VERSION AS OF + Assertions.assertThatThrownBy( + () -> sql("SELECT * FROM %s.branch_b1 VERSION AS OF %s", tableName, snapshotId)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot do time-travel based on both table identifier and AS OF"); + } + @Test public void testSpecifySnapshotAndTimestamp() { // get the snapshot ID of the last write