From 8de260db3c7e763ed126c148f714c0c7f5b44a55 Mon Sep 17 00:00:00 2001 From: Jory Seguin <84862552+cccs-jory@users.noreply.github.com> Date: Wed, 7 May 2025 17:15:01 +0000 Subject: [PATCH 1/8] Add support for custom snapshot properties in SparkSQLProperties and SparkWriteConf --- .../iceberg/spark/SparkSQLProperties.java | 3 ++ .../iceberg/spark/TestSparkWriteConf.java | 30 +++++++++++++++++++ .../iceberg/spark/SparkSQLProperties.java | 3 ++ .../apache/iceberg/spark/SparkWriteConf.java | 9 ++++++ 4 files changed, 45 insertions(+) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 0ca12369bd64..32053159557f 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -105,4 +105,7 @@ private SparkSQLProperties() {} // Controls whether to report available column statistics to Spark for query optimization. public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats"; public static final boolean REPORT_COLUMN_STATS_DEFAULT = true; + + // Prefix for custom snapshot properties + public static final String SNAPSHOT_PROPERTY_PREFIX = "spark.sql.iceberg.snapshot-property."; } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 00faecf9a830..66d6c32c81ea 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -486,6 +486,36 @@ public void testDeleteFileWriteConf() { } } + @Test + public void testExtraSnapshotMetadataReflectsSessionConfig() { + withSQLConf( + ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "test-value"), + () -> { + Table table = validationCatalog.loadTable(tableIdent); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + Map metadata = writeConf.extraSnapshotMetadata(); + + assertThat(metadata).containsEntry("test-key", "test-value"); + }); + } + + @Test + public void testExtraSnapshotMetadataWriteOptionsOverrideSessionConfig() { + withSQLConf( + ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), + () -> { + Table table = validationCatalog.loadTable(tableIdent); + Map writeOptions = ImmutableMap.of("snapshot-property.test-key", "write-option-value"); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + + Map metadata = writeConf.extraSnapshotMetadata(); + + // Assert that writeOptions take precedence over session config + assertThat(metadata).containsEntry("test-key", "write-option-value"); + }); + } + private void testWriteProperties(List> propertiesSuite) { withSQLConf( propertiesSuite.get(0), diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index d6c16bb0e238..f761bab3b4fc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -101,4 +101,7 @@ private SparkSQLProperties() {} // Controls whether to report available column statistics to Spark for query optimization. public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats"; public static final boolean REPORT_COLUMN_STATS_DEFAULT = true; + + // Prefix for custom snapshot properties + public static final String SNAPSHOT_PROPERTY_PREFIX = "spark.sql.iceberg.snapshot-property."; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index b3e8af5fe056..5de973343612 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -252,6 +252,15 @@ public long targetDeleteFileSize() { public Map extraSnapshotMetadata() { Map extraSnapshotMetadata = Maps.newHashMap(); + // Check session configuration for properties with SNAPSHOT_PROPERTY_PREFIX + sessionConf.getAll().forEach((key, value) -> { + if (key.startsWith(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX)) { + extraSnapshotMetadata.put( + key.substring(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX.length()), value); + } + }); + + // Add write options, overriding session configuration if necessary writeOptions.forEach( (key, value) -> { if (key.startsWith(SnapshotSummary.EXTRA_METADATA_PREFIX)) { From 912dabbbf4f8a589ab622f9a59bacb052249402d Mon Sep 17 00:00:00 2001 From: Jory Seguin <84862552+cccs-jory@users.noreply.github.com> Date: Thu, 8 May 2025 12:30:56 +0000 Subject: [PATCH 2/8] standardize changes for v3.4 and v3.5 --- .../apache/iceberg/spark/SparkWriteConf.java | 8 +++++ .../iceberg/spark/TestSparkWriteConf.java | 30 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index a13cff6e99a5..528d7e580031 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -212,6 +212,14 @@ public long targetDeleteFileSize() { public Map extraSnapshotMetadata() { Map extraSnapshotMetadata = Maps.newHashMap(); + // Check session configuration for properties with SNAPSHOT_PROPERTY_PREFIX + sessionConf.getAll().forEach((key, value) -> { + if (key.startsWith(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX)) { + extraSnapshotMetadata.put( + key.substring(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX.length()), value); + } + }); + writeOptions.forEach( (key, value) -> { if (key.startsWith(SnapshotSummary.EXTRA_METADATA_PREFIX)) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 6041c22e2465..e69dc5cb34dd 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -393,6 +393,36 @@ public void testSparkConfOverride() { } } + @Test + public void testExtraSnapshotMetadataReflectsSessionConfig() { + withSQLConf( + ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "test-value"), + () -> { + Table table = validationCatalog.loadTable(tableIdent); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + Map metadata = writeConf.extraSnapshotMetadata(); + + assertThat(metadata).containsEntry("test-key", "test-value"); + }); + } + + @Test + public void testExtraSnapshotMetadataWriteOptionsOverrideSessionConfig() { + withSQLConf( + ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), + () -> { + Table table = validationCatalog.loadTable(tableIdent); + Map writeOptions = ImmutableMap.of("snapshot-property.test-key", "write-option-value"); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + + Map metadata = writeConf.extraSnapshotMetadata(); + + // Assert that writeOptions take precedence over session config + assertThat(metadata).containsEntry("test-key", "write-option-value"); + }); + } + @TestTemplate public void testDataPropsDefaultsAsDeleteProps() { List>> propertiesSuites = From c1cea14b62808072acacdce71cd02c2b27dc958a Mon Sep 17 00:00:00 2001 From: Jory Seguin <84862552+cccs-jory@users.noreply.github.com> Date: Wed, 14 May 2025 14:01:51 +0000 Subject: [PATCH 3/8] address nitpick --- .../java/org/apache/iceberg/spark/TestSparkWriteConf.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index e69dc5cb34dd..0b9f9696ab6f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -396,14 +396,14 @@ public void testSparkConfOverride() { @Test public void testExtraSnapshotMetadataReflectsSessionConfig() { withSQLConf( - ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "test-value"), + ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), () -> { Table table = validationCatalog.loadTable(tableIdent); SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); Map metadata = writeConf.extraSnapshotMetadata(); - assertThat(metadata).containsEntry("test-key", "test-value"); + assertThat(metadata).containsEntry("test-key", "session-value"); }); } From 9ded8ca296746ad1ab991cbce0fad032f98b1047 Mon Sep 17 00:00:00 2001 From: Jory Seguin <84862552+cccs-jory@users.noreply.github.com> Date: Wed, 14 May 2025 14:02:09 +0000 Subject: [PATCH 4/8] remove 3.4 change for now --- .../apache/iceberg/spark/SparkWriteConf.java | 22 -------------- .../iceberg/spark/TestSparkWriteConf.java | 30 ------------------- 2 files changed, 52 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 528d7e580031..b253f0bdfc40 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -209,28 +209,6 @@ public long targetDeleteFileSize() { .parse(); } - public Map extraSnapshotMetadata() { - Map extraSnapshotMetadata = Maps.newHashMap(); - - // Check session configuration for properties with SNAPSHOT_PROPERTY_PREFIX - sessionConf.getAll().forEach((key, value) -> { - if (key.startsWith(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX)) { - extraSnapshotMetadata.put( - key.substring(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX.length()), value); - } - }); - - writeOptions.forEach( - (key, value) -> { - if (key.startsWith(SnapshotSummary.EXTRA_METADATA_PREFIX)) { - extraSnapshotMetadata.put( - key.substring(SnapshotSummary.EXTRA_METADATA_PREFIX.length()), value); - } - }); - - return extraSnapshotMetadata; - } - public String rewrittenFileSetId() { return confParser .stringConf() diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 66d6c32c81ea..00faecf9a830 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -486,36 +486,6 @@ public void testDeleteFileWriteConf() { } } - @Test - public void testExtraSnapshotMetadataReflectsSessionConfig() { - withSQLConf( - ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "test-value"), - () -> { - Table table = validationCatalog.loadTable(tableIdent); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); - - Map metadata = writeConf.extraSnapshotMetadata(); - - assertThat(metadata).containsEntry("test-key", "test-value"); - }); - } - - @Test - public void testExtraSnapshotMetadataWriteOptionsOverrideSessionConfig() { - withSQLConf( - ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), - () -> { - Table table = validationCatalog.loadTable(tableIdent); - Map writeOptions = ImmutableMap.of("snapshot-property.test-key", "write-option-value"); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); - - Map metadata = writeConf.extraSnapshotMetadata(); - - // Assert that writeOptions take precedence over session config - assertThat(metadata).containsEntry("test-key", "write-option-value"); - }); - } - private void testWriteProperties(List> propertiesSuite) { withSQLConf( propertiesSuite.get(0), From 472c63ac0eadf72cef2c0bbf7acb1d6cd23a0e8d Mon Sep 17 00:00:00 2001 From: Jory Seguin <84862552+cccs-jory@users.noreply.github.com> Date: Wed, 14 May 2025 14:04:34 +0000 Subject: [PATCH 5/8] remove more 3.4 stuff --- .../apache/iceberg/spark/SparkSQLProperties.java | 3 --- .../org/apache/iceberg/spark/SparkWriteConf.java | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 32053159557f..0ca12369bd64 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -105,7 +105,4 @@ private SparkSQLProperties() {} // Controls whether to report available column statistics to Spark for query optimization. public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats"; public static final boolean REPORT_COLUMN_STATS_DEFAULT = true; - - // Prefix for custom snapshot properties - public static final String SNAPSHOT_PROPERTY_PREFIX = "spark.sql.iceberg.snapshot-property."; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index b253f0bdfc40..a13cff6e99a5 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -209,6 +209,20 @@ public long targetDeleteFileSize() { .parse(); } + public Map extraSnapshotMetadata() { + Map extraSnapshotMetadata = Maps.newHashMap(); + + writeOptions.forEach( + (key, value) -> { + if (key.startsWith(SnapshotSummary.EXTRA_METADATA_PREFIX)) { + extraSnapshotMetadata.put( + key.substring(SnapshotSummary.EXTRA_METADATA_PREFIX.length()), value); + } + }); + + return extraSnapshotMetadata; + } + public String rewrittenFileSetId() { return confParser .stringConf() From e4d17b1933e518f946f0ea9cb626d2972a97560e Mon Sep 17 00:00:00 2001 From: Jory Seguin <84862552+cccs-jory@users.noreply.github.com> Date: Fri, 23 May 2025 13:55:09 +0000 Subject: [PATCH 6/8] fix scala problem --- .../org/apache/iceberg/spark/SparkWriteConf.java | 15 +++++++++------ .../apache/iceberg/spark/TestSparkWriteConf.java | 3 ++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 5de973343612..f6c78e4058cf 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -56,6 +56,7 @@ import org.apache.spark.sql.internal.SQLConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; /** * A class for common Iceberg configs for Spark writes. @@ -253,12 +254,14 @@ public Map extraSnapshotMetadata() { Map extraSnapshotMetadata = Maps.newHashMap(); // Check session configuration for properties with SNAPSHOT_PROPERTY_PREFIX - sessionConf.getAll().forEach((key, value) -> { - if (key.startsWith(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX)) { - extraSnapshotMetadata.put( - key.substring(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX.length()), value); - } - }); + JavaConverters.mapAsJavaMap(sessionConf.getAll()) + .forEach( + (key, value) -> { + if (key.startsWith(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX)) { + extraSnapshotMetadata.put( + key.substring(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX.length()), value); + } + }); // Add write options, overriding session configuration if necessary writeOptions.forEach( diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 0b9f9696ab6f..82f4b7e1d5f4 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -413,7 +413,8 @@ public void testExtraSnapshotMetadataWriteOptionsOverrideSessionConfig() { ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), () -> { Table table = validationCatalog.loadTable(tableIdent); - Map writeOptions = ImmutableMap.of("snapshot-property.test-key", "write-option-value"); + Map writeOptions = + ImmutableMap.of("snapshot-property.test-key", "write-option-value"); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); Map metadata = writeConf.extraSnapshotMetadata(); From 8aa8304dc3c119e7e0cd71a80f8230d1da81a25b Mon Sep 17 00:00:00 2001 From: Jory Seguin <84862552+cccs-jory@users.noreply.github.com> Date: Fri, 23 May 2025 16:10:01 +0000 Subject: [PATCH 7/8] fix build failure --- .../java/org/apache/iceberg/spark/TestSparkWriteConf.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 82f4b7e1d5f4..8656dde055d5 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -393,7 +393,7 @@ public void testSparkConfOverride() { } } - @Test + @TestTemplate public void testExtraSnapshotMetadataReflectsSessionConfig() { withSQLConf( ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), @@ -407,7 +407,7 @@ public void testExtraSnapshotMetadataReflectsSessionConfig() { }); } - @Test + @TestTemplate public void testExtraSnapshotMetadataWriteOptionsOverrideSessionConfig() { withSQLConf( ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), From 839796e6a3fa96cd7ccc7ae6fce5ff51b9903772 Mon Sep 17 00:00:00 2001 From: Jory Seguin <84862552+cccs-jory@users.noreply.github.com> Date: Mon, 11 Aug 2025 14:44:07 +0000 Subject: [PATCH 8/8] use PropertyUtil for extra snapshot metadata --- .../org/apache/iceberg/spark/SparkWriteConf.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index f6c78e4058cf..11f910c9f1c4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -50,13 +50,13 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.sql.RuntimeConfig; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.write.RowLevelOperation.Command; import org.apache.spark.sql.internal.SQLConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConverters; /** * A class for common Iceberg configs for Spark writes. @@ -253,15 +253,10 @@ public long targetDeleteFileSize() { public Map extraSnapshotMetadata() { Map extraSnapshotMetadata = Maps.newHashMap(); - // Check session configuration for properties with SNAPSHOT_PROPERTY_PREFIX - JavaConverters.mapAsJavaMap(sessionConf.getAll()) - .forEach( - (key, value) -> { - if (key.startsWith(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX)) { - extraSnapshotMetadata.put( - key.substring(SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX.length()), value); - } - }); + // Add session configuration properties with SNAPSHOT_PROPERTY_PREFIX if necessary + extraSnapshotMetadata.putAll( + PropertyUtil.propertiesWithPrefix( + sessionConf.getAll(), SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX)); // Add write options, overriding session configuration if necessary writeOptions.forEach(