Skip to content

Commit

Permalink
Spark 3.2: Handle no-op for rewrite manifests procedure/action (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat authored and danielcweeks committed Feb 11, 2023
1 parent b2cb667 commit 1e9fc1a
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,28 @@ public void testRewriteLargeManifests() {
"Must have 4 manifests", 4, table.currentSnapshot().allManifests(table.io()).size());
}

@Test
public void testRewriteManifestsNoOp() {
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)",
tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName);

Table table = validationCatalog.loadTable(tableIdent);

Assert.assertEquals(
"Must have 1 manifest", 1, table.currentSnapshot().allManifests(table.io()).size());

List<Object[]> output = sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent);
// should not rewrite any manifests for no-op (output of rewrite is same as before and after)
assertEquals("Procedure output must match", ImmutableList.of(row(0, 0)), output);

table.refresh();

Assert.assertEquals(
"Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
}

@Test
public void testRewriteLargeManifestsOnDatePartitionedTableWithJava8APIEnabled() {
withSQLConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ private RewriteManifests.Result doExecute() {
int targetNumManifests = targetNumManifests(totalSizeBytes);
int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests);

if (targetNumManifests == 1 && matchingManifests.size() == 1) {
return BaseRewriteManifestsActionResult.empty();
}

Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);

List<ManifestFile> newManifests;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ public void testRewriteImportedManifests() throws IOException {
SparkTableUtil.importSparkTable(
spark, new TableIdentifier("parquet_table"), table, stagingDir.toString());

// add some more data to create more than one manifest for the rewrite
inputDF.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);
table.refresh();

Snapshot snapshot = table.currentSnapshot();

SparkActions actions = SparkActions.get();
Expand Down Expand Up @@ -431,6 +435,8 @@ public void testRewriteManifestsWithPredicate() throws IOException {
new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"));
writeRecords(records1);

writeRecords(records1);

List<ThreeColumnRecord> records2 =
Lists.newArrayList(
new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
Expand All @@ -440,21 +446,24 @@ public void testRewriteManifestsWithPredicate() throws IOException {
table.refresh();

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
Assert.assertEquals("Should have 3 manifests before rewrite", 3, manifests.size());

SparkActions actions = SparkActions.get();

// rewrite only the first manifest without caching
RewriteManifests.Result result =
actions
.rewriteManifests(table)
.rewriteIf(manifest -> manifest.path().equals(manifests.get(0).path()))
.rewriteIf(
manifest ->
(manifest.path().equals(manifests.get(0).path())
|| (manifest.path().equals(manifests.get(1).path()))))
.stagingLocation(temp.newFolder().toString())
.option("use-caching", "false")
.execute();

Assert.assertEquals(
"Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests()));
"Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
"Action should add 1 manifests", 1, Iterables.size(result.addedManifests()));

Expand All @@ -464,11 +473,16 @@ public void testRewriteManifestsWithPredicate() throws IOException {
Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());

Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0)));
Assert.assertFalse(
"Second manifest must be rewritten", newManifests.contains(manifests.get(1)));
Assert.assertTrue(
"Second manifest must not be rewritten", newManifests.contains(manifests.get(1)));
"Third manifest must not be rewritten", newManifests.contains(manifests.get(2)));

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
expectedRecords.add(records1.get(0));
expectedRecords.add(records1.get(0));
expectedRecords.add(records1.get(1));
expectedRecords.add(records1.get(1));
expectedRecords.addAll(records2);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
Expand Down

0 comments on commit 1e9fc1a

Please sign in to comment.