Skip to content

Commit

Permalink
Spark: Add tests for SELECT using tag/branch prefix identifier (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
wypoon authored and devangjhabakh committed Apr 22, 2024
1 parent 19d20c6 commit 8e52c83
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,30 @@ public void testVersionAsOf() {
}

@Test
public void testTagReferenceAsOf() {
public void testTagReference() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag("test_tag", snapshotId).commit();

// create a second snapshot, read the table at the snapshot
List<Object[]> expected = sql("SELECT * FROM %s", tableName);

// create a second snapshot, read the table at the tag
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual1);

// read the table at the snapshot
// read the table at the tag
// HIVE time travel syntax
List<Object[]> actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag'", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual2);

// read the table using DataFrameReader option: branch
// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// read the table using the "tag_" prefix in the table name
List<Object[]> actual3 = sql("SELECT * FROM %s.tag_test_tag", tableName);
assertEquals("Snapshot at specific tag reference name, prefix", expected, actual3);
}

// read the table using DataFrameReader option: tag
Dataset<Row> df =
spark.read().format("iceberg").option(SparkReadOptions.TAG, "test_tag").load(tableName);
List<Object[]> fromDF = rowsToJava(df.collectAsList());
Expand Down Expand Up @@ -284,23 +291,30 @@ public void testUseSnapshotIdForTagReferenceAsOf() {
}

@Test
public void testBranchReferenceAsOf() {
public void testBranchReference() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch("test_branch", snapshotId).commit();

// create a second snapshot, read the table at the snapshot
List<Object[]> expected = sql("SELECT * FROM %s", tableName);

// create a second snapshot, read the table at the branch
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_branch'", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual1);

// read the table at the snapshot
// read the table at the branch
// HIVE time travel syntax
List<Object[]> actual2 =
sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual2);

// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// read the table using the "branch_" prefix in the table name
List<Object[]> actual3 = sql("SELECT * FROM %s.branch_test_branch", tableName);
assertEquals("Snapshot at specific branch reference name, prefix", expected, actual3);
}

// read the table using DataFrameReader option: branch
Dataset<Row> df =
spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,23 +234,30 @@ public void testVersionAsOf() {
}

@Test
public void testTagReferenceAsOf() {
public void testTagReference() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag("test_tag", snapshotId).commit();

// create a second snapshot, read the table at the snapshot
List<Object[]> expected = sql("SELECT * FROM %s", tableName);

// create a second snapshot, read the table at the tag
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual1);

// read the table at the snapshot
// read the table at the tag
// HIVE time travel syntax
List<Object[]> actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag'", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual2);

// read the table using DataFrameReader option: branch
// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// read the table using the "tag_" prefix in the table name
List<Object[]> actual3 = sql("SELECT * FROM %s.tag_test_tag", tableName);
assertEquals("Snapshot at specific tag reference name, prefix", expected, actual3);
}

// read the table using DataFrameReader option: tag
Dataset<Row> df =
spark.read().format("iceberg").option(SparkReadOptions.TAG, "test_tag").load(tableName);
List<Object[]> fromDF = rowsToJava(df.collectAsList());
Expand Down Expand Up @@ -283,23 +290,30 @@ public void testUseSnapshotIdForTagReferenceAsOf() {
}

@Test
public void testBranchReferenceAsOf() {
public void testBranchReference() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch("test_branch", snapshotId).commit();

// create a second snapshot, read the table at the snapshot
List<Object[]> expected = sql("SELECT * FROM %s", tableName);

// create a second snapshot, read the table at the branch
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_branch'", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual1);

// read the table at the snapshot
// read the table at the branch
// HIVE time travel syntax
List<Object[]> actual2 =
sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual2);

// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// read the table using the "branch_" prefix in the table name
List<Object[]> actual3 = sql("SELECT * FROM %s.branch_test_branch", tableName);
assertEquals("Snapshot at specific branch reference name, prefix", expected, actual3);
}

// read the table using DataFrameReader option: branch
Dataset<Row> df =
spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,23 +234,30 @@ public void testVersionAsOf() {
}

@Test
public void testTagReferenceAsOf() {
public void testTagReference() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createTag("test_tag", snapshotId).commit();

// create a second snapshot, read the table at the snapshot
List<Object[]> expected = sql("SELECT * FROM %s", tableName);

// create a second snapshot, read the table at the tag
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_tag'", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual1);

// read the table at the snapshot
// read the table at the tag
// HIVE time travel syntax
List<Object[]> actual2 = sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_tag'", tableName);
assertEquals("Snapshot at specific tag reference name", expected, actual2);

// read the table using DataFrameReader option: branch
// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// read the table using the "tag_" prefix in the table name
List<Object[]> actual3 = sql("SELECT * FROM %s.tag_test_tag", tableName);
assertEquals("Snapshot at specific tag reference name, prefix", expected, actual3);
}

// read the table using DataFrameReader option: tag
Dataset<Row> df =
spark.read().format("iceberg").option(SparkReadOptions.TAG, "test_tag").load(tableName);
List<Object[]> fromDF = rowsToJava(df.collectAsList());
Expand Down Expand Up @@ -283,23 +290,30 @@ public void testUseSnapshotIdForTagReferenceAsOf() {
}

@Test
public void testBranchReferenceAsOf() {
public void testBranchReference() {
Table table = validationCatalog.loadTable(tableIdent);
long snapshotId = table.currentSnapshot().snapshotId();
table.manageSnapshots().createBranch("test_branch", snapshotId).commit();

// create a second snapshot, read the table at the snapshot
List<Object[]> expected = sql("SELECT * FROM %s", tableName);

// create a second snapshot, read the table at the branch
sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName);
List<Object[]> actual1 = sql("SELECT * FROM %s VERSION AS OF 'test_branch'", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual1);

// read the table at the snapshot
// read the table at the branch
// HIVE time travel syntax
List<Object[]> actual2 =
sql("SELECT * FROM %s FOR SYSTEM_VERSION AS OF 'test_branch'", tableName);
assertEquals("Snapshot at specific branch reference name", expected, actual2);

// Spark session catalog does not support extended table names
if (!"spark_catalog".equals(catalogName)) {
// read the table using the "branch_" prefix in the table name
List<Object[]> actual3 = sql("SELECT * FROM %s.branch_test_branch", tableName);
assertEquals("Snapshot at specific branch reference name, prefix", expected, actual3);
}

// read the table using DataFrameReader option: branch
Dataset<Row> df =
spark
Expand Down

0 comments on commit 8e52c83

Please sign in to comment.