Skip to content

Commit

Permalink
Spark 2.4: Handle no-op for rewrite manifests procedure/action (#6733)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat authored Feb 3, 2023
1 parent 17cbcd8 commit 6e19b53
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ private RewriteManifests.Result doExecute() {
int targetNumManifests = targetNumManifests(totalSizeBytes);
int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests);

if (targetNumManifests == 1 && matchingManifests.size() == 1) {
return BaseRewriteManifestsActionResult.empty();
}

Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);

List<ManifestFile> newManifests;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ public void testRewriteImportedManifests() throws IOException {
SparkTableUtil.importSparkTable(
spark, new TableIdentifier("parquet_table"), table, stagingDir.toString());

// add some more data to create more than one manifest for the rewrite
inputDF.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);
table.refresh();

Snapshot snapshot = table.currentSnapshot();

SparkActions actions = SparkActions.get();
Expand Down Expand Up @@ -431,6 +435,8 @@ public void testRewriteManifestsWithPredicate() throws IOException {
new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"));
writeRecords(records1);

writeRecords(records1);

List<ThreeColumnRecord> records2 =
Lists.newArrayList(
new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"),
Expand All @@ -440,21 +446,24 @@ public void testRewriteManifestsWithPredicate() throws IOException {
table.refresh();

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
Assert.assertEquals("Should have 3 manifests before rewrite", 3, manifests.size());

SparkActions actions = SparkActions.get();

// rewrite only the first manifest without caching
RewriteManifests.Result result =
actions
.rewriteManifests(table)
.rewriteIf(manifest -> manifest.path().equals(manifests.get(0).path()))
.rewriteIf(
manifest ->
(manifest.path().equals(manifests.get(0).path())
|| (manifest.path().equals(manifests.get(1).path()))))
.stagingLocation(temp.newFolder().toString())
.option("use-caching", "false")
.execute();

Assert.assertEquals(
"Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests()));
"Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
"Action should add 1 manifests", 1, Iterables.size(result.addedManifests()));

Expand All @@ -464,11 +473,16 @@ public void testRewriteManifestsWithPredicate() throws IOException {
Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());

Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0)));
Assert.assertFalse(
"Second manifest must be rewritten", newManifests.contains(manifests.get(1)));
Assert.assertTrue(
"Second manifest must not be rewritten", newManifests.contains(manifests.get(1)));
"Third manifest must not be rewritten", newManifests.contains(manifests.get(2)));

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records1);
expectedRecords.add(records1.get(0));
expectedRecords.add(records1.get(0));
expectedRecords.add(records1.get(1));
expectedRecords.add(records1.get(1));
expectedRecords.addAll(records2);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
Expand All @@ -478,6 +492,33 @@ public void testRewriteManifestsWithPredicate() throws IOException {
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testRewriteManifestsNoOp() throws IOException {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build();
Map<String, String> options = Maps.newHashMap();
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);

List<ThreeColumnRecord> records1 =
Lists.newArrayList(
new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB"));
writeRecords(records1);

table.refresh();

List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifest before rewrite", 1, manifests.size());

SparkActions actions = SparkActions.get();

RewriteManifests.Result result = actions.rewriteManifests(table).execute();

Assert.assertEquals(
"Action should rewrite 0 manifests", 0, Iterables.size(result.rewrittenManifests()));
Assert.assertEquals(
"Action should add 0 manifests", 0, Iterables.size(result.addedManifests()));
}

@Test
public void testRewriteSmallManifestsNonPartitionedV2Table() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Expand Down

0 comments on commit 6e19b53

Please sign in to comment.