diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java index 784e5ed1e7c7..40625b5e3450 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java @@ -78,6 +78,28 @@ public void testRewriteLargeManifests() { "Must have 4 manifests", 4, table.currentSnapshot().allManifests(table.io()).size()); } + @Test + public void testRewriteManifestsNoOp() { + sql( + "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)", + tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + + Assert.assertEquals( + "Must have 1 manifest", 1, table.currentSnapshot().allManifests(table.io()).size()); + + List output = sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent); + // should not rewrite any manifests for no-op (output of rewrite is same as before and after) + assertEquals("Procedure output must match", ImmutableList.of(row(0, 0)), output); + + table.refresh(); + + Assert.assertEquals( + "Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size()); + } + @Test public void testRewriteLargeManifestsOnDatePartitionedTableWithJava8APIEnabled() { withSQLConf( diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index b598bcb19bd8..42977b0be569 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -172,6 +172,10 @@ private RewriteManifests.Result doExecute() { int targetNumManifests = targetNumManifests(totalSizeBytes); int targetNumManifestEntries = targetNumManifestEntries(numEntries, targetNumManifests); + if (targetNumManifests == 1 && matchingManifests.size() == 1) { + return BaseRewriteManifestsActionResult.empty(); + } + Dataset manifestEntryDF = buildManifestEntryDF(matchingManifests); List newManifests; diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 95f2f12d5ff8..9382846e1eed 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -341,6 +341,10 @@ public void testRewriteImportedManifests() throws IOException { SparkTableUtil.importSparkTable( spark, new TableIdentifier("parquet_table"), table, stagingDir.toString()); + // add some more data to create more than one manifest for the rewrite + inputDF.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + table.refresh(); + Snapshot snapshot = table.currentSnapshot(); SparkActions actions = SparkActions.get(); @@ -431,6 +435,8 @@ public void testRewriteManifestsWithPredicate() throws IOException { new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")); writeRecords(records1); + writeRecords(records1); + List records2 = Lists.newArrayList( new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), @@ -440,7 +446,7 @@ public void testRewriteManifestsWithPredicate() throws IOException { table.refresh(); List manifests = table.currentSnapshot().allManifests(table.io()); - Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size()); + Assert.assertEquals("Should have 3 manifests before rewrite", 3, manifests.size()); SparkActions actions = SparkActions.get(); @@ -448,13 +454,16 @@ public void testRewriteManifestsWithPredicate() throws IOException { RewriteManifests.Result result = actions .rewriteManifests(table) - .rewriteIf(manifest -> manifest.path().equals(manifests.get(0).path())) + .rewriteIf( + manifest -> + (manifest.path().equals(manifests.get(0).path()) + || (manifest.path().equals(manifests.get(1).path())))) .stagingLocation(temp.newFolder().toString()) .option("use-caching", "false") .execute(); Assert.assertEquals( - "Action should rewrite 1 manifest", 1, Iterables.size(result.rewrittenManifests())); + "Action should rewrite 2 manifest", 2, Iterables.size(result.rewrittenManifests())); Assert.assertEquals( "Action should add 1 manifests", 1, Iterables.size(result.addedManifests())); @@ -464,11 +473,16 @@ public void testRewriteManifestsWithPredicate() throws IOException { Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size()); Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0))); + Assert.assertFalse( + "Second manifest must be rewritten", newManifests.contains(manifests.get(1))); Assert.assertTrue( - "Second manifest must not be rewritten", newManifests.contains(manifests.get(1))); + "Third manifest must not be rewritten", newManifests.contains(manifests.get(2))); List expectedRecords = Lists.newArrayList(); - expectedRecords.addAll(records1); + expectedRecords.add(records1.get(0)); + expectedRecords.add(records1.get(0)); + expectedRecords.add(records1.get(1)); + expectedRecords.add(records1.get(1)); expectedRecords.addAll(records2); Dataset resultDF = spark.read().format("iceberg").load(tableLocation);