Skip to content

Commit

Permalink
add test that writes new data to the branch and reads it
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Nov 23, 2023
1 parent 0127c1d commit 761fdae
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,32 @@ public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException {
.option("branch", "branch")
.load(tableLocation)
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList())
.containsExactly(
new GenericRowWithSchema(new Object[] {1, null}, null),
new GenericRowWithSchema(new Object[] {2, null}, null),
new GenericRowWithSchema(new Object[] {3, null}, null));
new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null));

// writing new records into the branch should work with the re-introduced column
List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
Dataset<Row> dataFrame = spark.createDataFrame(records, SimpleRecord.class);
dataFrame
.select("id", "data")
.write()
.format("iceberg")
.option("branch", "branch")
.mode("append")
.save(tableLocation);

Assertions.assertThat(
branchSnapshotResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList())
.hasSize(6)
.contains(new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null))
.containsAll(records);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,32 @@ public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException {
.option("branch", "branch")
.load(tableLocation)
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList())
.containsExactly(
new GenericRowWithSchema(new Object[] {1, null}, null),
new GenericRowWithSchema(new Object[] {2, null}, null),
new GenericRowWithSchema(new Object[] {3, null}, null));
new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null));

// writing new records into the branch should work with the re-introduced column
List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
Dataset<Row> dataFrame = spark.createDataFrame(records, SimpleRecord.class);
dataFrame
.select("id", "data")
.write()
.format("iceberg")
.option("branch", "branch")
.mode("append")
.save(tableLocation);

Assertions.assertThat(
branchSnapshotResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList())
.hasSize(6)
.contains(new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null))
.containsAll(records);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,32 @@ public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException {
.option("branch", "branch")
.load(tableLocation)
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList())
.containsExactly(
new GenericRowWithSchema(new Object[] {1, null}, null),
new GenericRowWithSchema(new Object[] {2, null}, null),
new GenericRowWithSchema(new Object[] {3, null}, null));
new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null));

// writing new records into the branch should work with the re-introduced column
List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(4, "d"), new SimpleRecord(5, "e"), new SimpleRecord(6, "f"));
Dataset<Row> dataFrame = spark.createDataFrame(records, SimpleRecord.class);
dataFrame
.select("id", "data")
.write()
.format("iceberg")
.option("branch", "branch")
.mode("append")
.save(tableLocation);

Assertions.assertThat(
branchSnapshotResult
.orderBy("id")
.as(Encoders.bean(SimpleRecord.class))
.collectAsList())
.hasSize(6)
.contains(new SimpleRecord(1, null), new SimpleRecord(2, null), new SimpleRecord(3, null))
.containsAll(records);
}

@Test
Expand Down

0 comments on commit 761fdae

Please sign in to comment.