From 6e19b53dd79864e03b60cd2be77d072bca6a90c0 Mon Sep 17 00:00:00 2001 From: Ajantha Bhat Date: Sat, 4 Feb 2023 04:59:02 +0530 Subject: [PATCH] Spark 2.4: Handle no-op for rewrite manifests procedure/action (#6733) --- .../BaseRewriteManifestsSparkAction.java | 4 ++ .../actions/TestRewriteManifestsAction.java | 51 +++++++++++++++++-- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java index 4cca7caf6e56..cc44b7027915 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java @@ -171,6 +171,10 @@ private RewriteManifests.Result doExecute() { int targetNumManifests = targetNumManifests(totalSizeBytes); int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests); + if (targetNumManifests == 1 && matchingManifests.size() == 1) { + return BaseRewriteManifestsActionResult.empty(); + } + Dataset manifestEntryDF = buildManifestEntryDF(matchingManifests); List newManifests; diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 95f2f12d5ff8..86ac01eb459e 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -341,6 +341,10 @@ public void testRewriteImportedManifests() throws IOException { SparkTableUtil.importSparkTable( spark, new TableIdentifier("parquet_table"), table, stagingDir.toString()); + // add some more data to create more than one manifest for the rewrite + inputDF.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + table.refresh(); + Snapshot snapshot = table.currentSnapshot(); SparkActions actions = SparkActions.get(); @@ -431,6 +435,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")); writeRecords(records1); + writeRecords(records1); + List records2 = Lists.newArrayList( new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), @@ -440,7 +446,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { table.refresh(); List manifests = table.currentSnapshot().allManifests(table.io()); - Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); + Assert.assertEquals("Should have 3 manifests before rewrite", 3, manifests.size()); SparkActions actions = SparkActions.get(); @@ -448,13 +454,16 @@ public void testRewriteManifestsWithPredicate() throws IOException { RewriteManifests.Result result = actions .rewriteManifests(table) - .rewriteIf(manifest -> manifest.path().equals(manifests.get(0).path())) + .rewriteIf( + manifest -> + (manifest.path().equals(manifests.get(0).path()) + || (manifest.path().equals(manifests.get(1).path())))) .stagingLocation(temp.newFolder().toString()) .option("use-caching", "false") .execute(); Assert.assertEquals( - "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); + "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); @@ -464,11 +473,16 @@ public void testRewriteManifestsWithPredicate() throws IOException { Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size()); Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0))); + Assert.assertFalse( + "Second manifest must be rewritten", newManifests.contains(manifests.get(1))); Assert.assertTrue( - "Second manifest must not be rewritten", newManifests.contains(manifests.get(1))); + "Third manifest must not be rewritten", newManifests.contains(manifests.get(2))); List expectedRecords = Lists.newArrayList(); - expectedRecords.addAll(records1); + expectedRecords.add(records1.get(0)); + expectedRecords.add(records1.get(0)); + expectedRecords.add(records1.get(1)); + expectedRecords.add(records1.get(1)); expectedRecords.addAll(records2); Dataset resultDF = spark.read().format("iceberg").load(tableLocation); @@ -478,6 +492,33 @@ public void testRewriteManifestsWithPredicate() throws IOException { Assert.assertEquals("Rows must match", expectedRecords, actualRecords); } + @Test + public void testRewriteManifestsNoOp() throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 2).build(); + Map options = Maps.newHashMap(); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = + Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")); + writeRecords(records1); + + table.refresh(); + + List manifests = table.currentSnapshot().allManifests(table.io()); + Assert.assertEquals("Should have 1 manifest before rewrite", 1, manifests.size()); + + SparkActions actions = SparkActions.get(); + + RewriteManifests.Result result = actions.rewriteManifests(table).execute(); + + Assert.assertEquals( + "Action should rewrite 0 manifests", 0, Iterables.size(result.rewrittenManifests())); + Assert.assertEquals( + "Action should add 0 manifests", 0, Iterables.size(result.addedManifests())); + } + @Test public void testRewriteSmallManifestsNonPartitionedV2Table() { PartitionSpec spec = PartitionSpec.unpartitioned();