From f58e2ab1b9ae06dc46b5aacacb066869bbd15e37 Mon Sep 17 00:00:00 2001 From: Guy Khazma Date: Thu, 8 May 2025 23:08:44 -0400 Subject: [PATCH] enable withcommitproperties for snapshot update --- .../BaseSnapshotUpdateSparkAction.java | 7 ++ .../TestRemoveDanglingDeleteAction.java | 58 ++++++++++++ .../actions/TestRewriteDataFilesAction.java | 36 ++++++++ .../actions/TestRewriteManifestsAction.java | 59 ++++++++++++ .../BaseSnapshotUpdateSparkAction.java | 7 ++ .../TestRemoveDanglingDeleteAction.java | 89 +++++++++++++++++++ .../actions/TestRewriteDataFilesAction.java | 36 ++++++++ .../actions/TestRewriteManifestsAction.java | 59 ++++++++++++ .../BaseSnapshotUpdateSparkAction.java | 7 ++ .../TestRemoveDanglingDeleteAction.java | 89 +++++++++++++++++++ .../actions/TestRewriteDataFilesAction.java | 36 ++++++++ .../actions/TestRewriteManifestsAction.java | 59 ++++++++++++ 12 files changed, 542 insertions(+) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java index b69b80a8d3a6..2ee730c7fe63 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.spark.sql.SparkSession; abstract class BaseSnapshotUpdateSparkAction extends BaseSparkAction { @@ -37,11 +38,17 @@ public ThisT snapshotProperty(String property, String value) { } protected void commit(org.apache.iceberg.SnapshotUpdate update) { + if (!CommitMetadata.commitProperties().isEmpty()) { + summary.putAll(CommitMetadata.commitProperties()); + } summary.forEach(update::set); update.commit(); } protected Map commitSummary() { + if (!CommitMetadata.commitProperties().isEmpty()) { + summary.putAll(CommitMetadata.commitProperties()); + } return ImmutableMap.copyOf(summary); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java index fb07421fc5db..e26fd2828c0a 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -24,6 +24,7 @@ import java.io.File; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -37,12 +38,16 @@ import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.RemoveDanglingDeleteFiles; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Encoders; @@ -434,4 +439,57 @@ public void testUnpartitionedTable() { SparkActions.get().removeDanglingDeleteFiles(table).execute(); assertThat(result.removedDeleteFiles()).as("No-op for unpartitioned tables").isEmpty(); } + + @TestTemplate + public void testRemoveDanglingDeleteCommitProperties() throws InterruptedException { + setupPartitionedTable(); + // Add Data Files + table.newAppend().appendFile(FILE_A).appendFile(FILE_C).appendFile(FILE_D).commit(); + // Add Data Files with EQ and POS deletes + table + .newRowDelta() + .addRows(FILE_A2) + .addRows(FILE_B2) + .addRows(FILE_C2) + .addRows(FILE_D2) + .addDeletes(FILE_A_POS_DELETES) + .addDeletes(FILE_A2_POS_DELETES) + .addDeletes(FILE_A_EQ_DELETES) + .addDeletes(FILE_A2_EQ_DELETES) + .addDeletes(FILE_B_POS_DELETES) + .addDeletes(FILE_B2_POS_DELETES) + .addDeletes(FILE_B_EQ_DELETES) + .addDeletes(FILE_B2_EQ_DELETES) + .commit(); + Thread removeDanglingDeletesThread = + new Thread( + () -> { + Map properties = + ImmutableMap.of( + "writer-thread", + String.valueOf(Thread.currentThread().getName()), + SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key", + "someValue", + SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key", + "anotherValue"); + CommitMetadata.withCommitProperties( + properties, + () -> { + SparkActions.get().removeDanglingDeleteFiles(table).execute(); + return 0; + }, + RuntimeException.class); + }); + removeDanglingDeletesThread.setName("test-extra-commit-message-remove-dangling-delete"); + removeDanglingDeletesThread.start(); + removeDanglingDeletesThread.join(); + + table.refresh(); + List snapshots = Lists.newArrayList(table.snapshots()); + assertThat(snapshots.get(1).summary()).doesNotContainKey("writer-thread"); + assertThat(snapshots.get(2).summary()) + .containsEntry("writer-thread", "test-extra-commit-message-remove-dangling-delete") + .containsEntry("extra-key", "someValue") + .containsEntry("another-key", "anotherValue"); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 83eb53eb65f4..408151e152b9 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -102,6 +102,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkTableUtil; @@ -1888,6 +1889,41 @@ public void testZOrderRewriteWithSpecificOutputSpecId() { shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); } + @TestTemplate + public void testRewriteDataFilesCommitProperties() throws InterruptedException { + Table table = createTablePartitioned(4, 2); + Thread rewriteDataFilesThread = + new Thread( + () -> { + Map properties = + ImmutableMap.of( + "writer-thread", + String.valueOf(Thread.currentThread().getName()), + SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key", + "someValue", + SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key", + "anotherValue"); + CommitMetadata.withCommitProperties( + properties, + () -> { + basicRewrite(table).execute(); + return 0; + }, + RuntimeException.class); + }); + rewriteDataFilesThread.setName("test-extra-commit-message-rewrite-data-files"); + rewriteDataFilesThread.start(); + rewriteDataFilesThread.join(); + + table.refresh(); + List snapshots = Lists.newArrayList(table.snapshots()); + assertThat(snapshots.get(0).summary()).doesNotContainKey("writer-thread"); + assertThat(snapshots.get(1).summary()) + .containsEntry("writer-thread", "test-extra-commit-message-rewrite-data-files") + .containsEntry("extra-key", "someValue") + .containsEntry("another-key", "anotherValue"); + } + protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int outputSpecId) { List rewrittenFiles = currentDataFiles(table); assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 76b201aa5649..456c6eb11bf6 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -57,6 +57,7 @@ import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -73,6 +74,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.spark.TestBase; @@ -1013,6 +1015,63 @@ public void testRewriteManifestsAfterUpgradeToV3() throws IOException { } } + @TestTemplate + public void testRewriteManifestsActionCommitProperties() throws InterruptedException { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = + Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")); + writeRecords(records1); + + List records2 = + Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")); + writeRecords(records2); + + Thread rewriteManifestThread = + new Thread( + () -> { + Map properties = + ImmutableMap.of( + "writer-thread", + String.valueOf(Thread.currentThread().getName()), + SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key", + "someValue", + SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key", + "anotherValue"); + CommitMetadata.withCommitProperties( + properties, + () -> { + SparkActions actions = SparkActions.get(); + + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); + return 0; + }, + RuntimeException.class); + }); + rewriteManifestThread.setName("test-extra-commit-message-rewrite-manifest"); + rewriteManifestThread.start(); + rewriteManifestThread.join(); + + table.refresh(); + List snapshots = Lists.newArrayList(table.snapshots()); + assertThat(snapshots.get(1).summary()).doesNotContainKey("writer-thread"); + assertThat(snapshots.get(2).summary()) + .containsEntry("writer-thread", "test-extra-commit-message-rewrite-manifest") + .containsEntry("extra-key", "someValue") + .containsEntry("another-key", "anotherValue"); + } + private List actualRecords() { return spark .read() diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java index b69b80a8d3a6..2ee730c7fe63 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.spark.sql.SparkSession; abstract class BaseSnapshotUpdateSparkAction extends BaseSparkAction { @@ -37,11 +38,17 @@ public ThisT snapshotProperty(String property, String value) { } protected void commit(org.apache.iceberg.SnapshotUpdate update) { + if (!CommitMetadata.commitProperties().isEmpty()) { + summary.putAll(CommitMetadata.commitProperties()); + } summary.forEach(update::set); update.commit(); } protected Map commitSummary() { + if (!CommitMetadata.commitProperties().isEmpty()) { + summary.putAll(CommitMetadata.commitProperties()); + } return ImmutableMap.copyOf(summary); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java index 56800a26dc72..f758766e0af5 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -25,6 +25,7 @@ import java.io.File; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -39,6 +40,8 @@ import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.RemoveDanglingDeleteFiles; @@ -46,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Encoders; @@ -515,6 +519,91 @@ public void testPartitionedDeletesWithDanglingDvs() { assertThat(actualAfter).containsExactlyInAnyOrderElementsOf(expectedAfter); } + @TestTemplate + public void testRemoveDanglingDeleteCommitProperties() throws InterruptedException { + setupPartitionedTable(); + + // Add Data Files + table.newAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit(); + + // Add Delete Files + DeleteFile fileADeletes = fileADeletes(); + DeleteFile fileA2Deletes = fileA2Deletes(); + DeleteFile fileBDeletes = fileBDeletes(); + DeleteFile fileB2Deletes = fileB2Deletes(); + table + .newRowDelta() + .addDeletes(fileADeletes) + .addDeletes(fileA2Deletes) + .addDeletes(fileBDeletes) + .addDeletes(fileB2Deletes) + .addDeletes(FILE_A_EQ_DELETES) + .addDeletes(FILE_A2_EQ_DELETES) + .addDeletes(FILE_B_EQ_DELETES) + .addDeletes(FILE_B2_EQ_DELETES) + .commit(); + + // Add More Data Files + table + .newAppend() + .appendFile(FILE_A2) + .appendFile(FILE_B2) + .appendFile(FILE_C2) + .appendFile(FILE_D2) + .commit(); + + List> actual = allEntries(); + List> expected = + ImmutableList.of( + Tuple2.apply(1L, FILE_B.location()), + Tuple2.apply(1L, FILE_C.location()), + Tuple2.apply(1L, FILE_D.location()), + Tuple2.apply(2L, FILE_A_EQ_DELETES.location()), + Tuple2.apply(2L, fileADeletes.location()), + Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), + Tuple2.apply(2L, fileA2Deletes.location()), + Tuple2.apply(2L, FILE_B_EQ_DELETES.location()), + Tuple2.apply(2L, fileBDeletes.location()), + Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()), + Tuple2.apply(2L, fileB2Deletes.location()), + Tuple2.apply(3L, FILE_A2.location()), + Tuple2.apply(3L, FILE_B2.location()), + Tuple2.apply(3L, FILE_C2.location()), + Tuple2.apply(3L, FILE_D2.location())); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + + Thread removeDanglingDeletesThread = + new Thread( + () -> { + Map properties = + ImmutableMap.of( + "writer-thread", + String.valueOf(Thread.currentThread().getName()), + SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key", + "someValue", + SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key", + "anotherValue"); + CommitMetadata.withCommitProperties( + properties, + () -> { + SparkActions.get().removeDanglingDeleteFiles(table).execute(); + return 0; + }, + RuntimeException.class); + }); + removeDanglingDeletesThread.setName("test-extra-commit-message-remove-dangling-delete"); + removeDanglingDeletesThread.start(); + removeDanglingDeletesThread.join(); + + table.refresh(); + List snapshots = Lists.newArrayList(table.snapshots()); + assertThat(snapshots.get(2).summary()).doesNotContainKey("writer-thread"); + assertThat(snapshots.get(3).summary()) + .containsEntry("writer-thread", "test-extra-commit-message-remove-dangling-delete") + .containsEntry("extra-key", "someValue") + .containsEntry("another-key", "anotherValue"); + } + private List> liveEntries() { return spark .read() diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 83eb53eb65f4..408151e152b9 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -102,6 +102,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkTableUtil; @@ -1888,6 +1889,41 @@ public void testZOrderRewriteWithSpecificOutputSpecId() { shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); } + @TestTemplate + public void testRewriteDataFilesCommitProperties() throws InterruptedException { + Table table = createTablePartitioned(4, 2); + Thread rewriteDataFilesThread = + new Thread( + () -> { + Map properties = + ImmutableMap.of( + "writer-thread", + String.valueOf(Thread.currentThread().getName()), + SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key", + "someValue", + SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key", + "anotherValue"); + CommitMetadata.withCommitProperties( + properties, + () -> { + basicRewrite(table).execute(); + return 0; + }, + RuntimeException.class); + }); + rewriteDataFilesThread.setName("test-extra-commit-message-rewrite-data-files"); + rewriteDataFilesThread.start(); + rewriteDataFilesThread.join(); + + table.refresh(); + List snapshots = Lists.newArrayList(table.snapshots()); + assertThat(snapshots.get(0).summary()).doesNotContainKey("writer-thread"); + assertThat(snapshots.get(1).summary()) + .containsEntry("writer-thread", "test-extra-commit-message-rewrite-data-files") + .containsEntry("extra-key", "someValue") + .containsEntry("another-key", "anotherValue"); + } + protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int outputSpecId) { List rewrittenFiles = currentDataFiles(table); assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 76b201aa5649..456c6eb11bf6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -57,6 +57,7 @@ import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -73,6 +74,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.spark.TestBase; @@ -1013,6 +1015,63 @@ public void testRewriteManifestsAfterUpgradeToV3() throws IOException { } } + @TestTemplate + public void testRewriteManifestsActionCommitProperties() throws InterruptedException { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = + Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")); + writeRecords(records1); + + List records2 = + Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")); + writeRecords(records2); + + Thread rewriteManifestThread = + new Thread( + () -> { + Map properties = + ImmutableMap.of( + "writer-thread", + String.valueOf(Thread.currentThread().getName()), + SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key", + "someValue", + SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key", + "anotherValue"); + CommitMetadata.withCommitProperties( + properties, + () -> { + SparkActions actions = SparkActions.get(); + + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); + return 0; + }, + RuntimeException.class); + }); + rewriteManifestThread.setName("test-extra-commit-message-rewrite-manifest"); + rewriteManifestThread.start(); + rewriteManifestThread.join(); + + table.refresh(); + List snapshots = Lists.newArrayList(table.snapshots()); + assertThat(snapshots.get(1).summary()).doesNotContainKey("writer-thread"); + assertThat(snapshots.get(2).summary()) + .containsEntry("writer-thread", "test-extra-commit-message-rewrite-manifest") + .containsEntry("extra-key", "someValue") + .containsEntry("another-key", "anotherValue"); + } + private List actualRecords() { return spark .read() diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java index b69b80a8d3a6..2ee730c7fe63 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.spark.sql.SparkSession; abstract class BaseSnapshotUpdateSparkAction extends BaseSparkAction { @@ -37,11 +38,17 @@ public ThisT snapshotProperty(String property, String value) { } protected void commit(org.apache.iceberg.SnapshotUpdate update) { + if (!CommitMetadata.commitProperties().isEmpty()) { + summary.putAll(CommitMetadata.commitProperties()); + } summary.forEach(update::set); update.commit(); } protected Map commitSummary() { + if (!CommitMetadata.commitProperties().isEmpty()) { + summary.putAll(CommitMetadata.commitProperties()); + } return ImmutableMap.copyOf(summary); } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java index 56800a26dc72..f758766e0af5 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java @@ -25,6 +25,7 @@ import java.io.File; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -39,6 +40,8 @@ import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.RemoveDanglingDeleteFiles; @@ -46,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.TestBase; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Encoders; @@ -515,6 +519,91 @@ public void testPartitionedDeletesWithDanglingDvs() { assertThat(actualAfter).containsExactlyInAnyOrderElementsOf(expectedAfter); } + @TestTemplate + public void testRemoveDanglingDeleteCommitProperties() throws InterruptedException { + setupPartitionedTable(); + + // Add Data Files + table.newAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit(); + + // Add Delete Files + DeleteFile fileADeletes = fileADeletes(); + DeleteFile fileA2Deletes = fileA2Deletes(); + DeleteFile fileBDeletes = fileBDeletes(); + DeleteFile fileB2Deletes = fileB2Deletes(); + table + .newRowDelta() + .addDeletes(fileADeletes) + .addDeletes(fileA2Deletes) + .addDeletes(fileBDeletes) + .addDeletes(fileB2Deletes) + .addDeletes(FILE_A_EQ_DELETES) + .addDeletes(FILE_A2_EQ_DELETES) + .addDeletes(FILE_B_EQ_DELETES) + .addDeletes(FILE_B2_EQ_DELETES) + .commit(); + + // Add More Data Files + table + .newAppend() + .appendFile(FILE_A2) + .appendFile(FILE_B2) + .appendFile(FILE_C2) + .appendFile(FILE_D2) + .commit(); + + List> actual = allEntries(); + List> expected = + ImmutableList.of( + Tuple2.apply(1L, FILE_B.location()), + Tuple2.apply(1L, FILE_C.location()), + Tuple2.apply(1L, FILE_D.location()), + Tuple2.apply(2L, FILE_A_EQ_DELETES.location()), + Tuple2.apply(2L, fileADeletes.location()), + Tuple2.apply(2L, FILE_A2_EQ_DELETES.location()), + Tuple2.apply(2L, fileA2Deletes.location()), + Tuple2.apply(2L, FILE_B_EQ_DELETES.location()), + Tuple2.apply(2L, fileBDeletes.location()), + Tuple2.apply(2L, FILE_B2_EQ_DELETES.location()), + Tuple2.apply(2L, fileB2Deletes.location()), + Tuple2.apply(3L, FILE_A2.location()), + Tuple2.apply(3L, FILE_B2.location()), + Tuple2.apply(3L, FILE_C2.location()), + Tuple2.apply(3L, FILE_D2.location())); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + + Thread removeDanglingDeletesThread = + new Thread( + () -> { + Map properties = + ImmutableMap.of( + "writer-thread", + String.valueOf(Thread.currentThread().getName()), + SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key", + "someValue", + SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key", + "anotherValue"); + CommitMetadata.withCommitProperties( + properties, + () -> { + SparkActions.get().removeDanglingDeleteFiles(table).execute(); + return 0; + }, + RuntimeException.class); + }); + removeDanglingDeletesThread.setName("test-extra-commit-message-remove-dangling-delete"); + removeDanglingDeletesThread.start(); + removeDanglingDeletesThread.join(); + + table.refresh(); + List snapshots = Lists.newArrayList(table.snapshots()); + assertThat(snapshots.get(2).summary()).doesNotContainKey("writer-thread"); + assertThat(snapshots.get(3).summary()) + .containsEntry("writer-thread", "test-extra-commit-message-remove-dangling-delete") + .containsEntry("extra-key", "someValue") + .containsEntry("another-key", "anotherValue"); + } + private List> liveEntries() { return spark .read() diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 87ce48adea13..ec7dcd76f6b2 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -102,6 +102,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkTableUtil; @@ -1889,6 +1890,41 @@ public void testZOrderRewriteWithSpecificOutputSpecId() { shouldRewriteDataFilesWithPartitionSpec(table, outputSpecId); } + @TestTemplate + public void testRewriteDataFilesCommitProperties() throws InterruptedException { + Table table = createTablePartitioned(4, 2); + Thread rewriteDataFilesThread = + new Thread( + () -> { + Map properties = + ImmutableMap.of( + "writer-thread", + String.valueOf(Thread.currentThread().getName()), + SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key", + "someValue", + SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key", + "anotherValue"); + CommitMetadata.withCommitProperties( + properties, + () -> { + basicRewrite(table).execute(); + return 0; + }, + RuntimeException.class); + }); + rewriteDataFilesThread.setName("test-extra-commit-message-rewrite-data-files"); + rewriteDataFilesThread.start(); + rewriteDataFilesThread.join(); + + table.refresh(); + List snapshots = Lists.newArrayList(table.snapshots()); + assertThat(snapshots.get(0).summary()).doesNotContainKey("writer-thread"); + assertThat(snapshots.get(1).summary()) + .containsEntry("writer-thread", "test-extra-commit-message-rewrite-data-files") + .containsEntry("extra-key", "someValue") + .containsEntry("another-key", "anotherValue"); + } + protected void shouldRewriteDataFilesWithPartitionSpec(Table table, int outputSpecId) { List rewrittenFiles = currentDataFiles(table); assertThat(rewrittenFiles).allMatch(file -> file.specId() == outputSpecId); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java index 76b201aa5649..456c6eb11bf6 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java @@ -57,6 +57,7 @@ import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -73,6 +74,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.spark.TestBase; @@ -1013,6 +1015,63 @@ public void testRewriteManifestsAfterUpgradeToV3() throws IOException { } } + @TestTemplate + public void testRewriteManifestsActionCommitProperties() throws InterruptedException { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Map options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List records1 = + Lists.newArrayList( + new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")); + writeRecords(records1); + + List records2 = + Lists.newArrayList( + new ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), + new ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")); + writeRecords(records2); + + Thread rewriteManifestThread = + new Thread( + () -> { + Map properties = + ImmutableMap.of( + "writer-thread", + String.valueOf(Thread.currentThread().getName()), + SnapshotSummary.EXTRA_METADATA_PREFIX + "extra-key", + "someValue", + SnapshotSummary.EXTRA_METADATA_PREFIX + "another-key", + "anotherValue"); + CommitMetadata.withCommitProperties( + properties, + () -> { + SparkActions actions = SparkActions.get(); + + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); + return 0; + }, + RuntimeException.class); + }); + rewriteManifestThread.setName("test-extra-commit-message-rewrite-manifest"); + rewriteManifestThread.start(); + rewriteManifestThread.join(); + + table.refresh(); + List snapshots = Lists.newArrayList(table.snapshots()); + assertThat(snapshots.get(1).summary()).doesNotContainKey("writer-thread"); + assertThat(snapshots.get(2).summary()) + .containsEntry("writer-thread", "test-extra-commit-message-rewrite-manifest") + .containsEntry("extra-key", "someValue") + .containsEntry("another-key", "anotherValue"); + } + private List actualRecords() { return spark .read()