diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index d6c16bb0e238..f761bab3b4fc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -101,4 +101,7 @@ private SparkSQLProperties() {} // Controls whether to report available column statistics to Spark for query optimization. public static final String REPORT_COLUMN_STATS = "spark.sql.iceberg.report-column-stats"; public static final boolean REPORT_COLUMN_STATS_DEFAULT = true; + + // Prefix for custom snapshot properties + public static final String SNAPSHOT_PROPERTY_PREFIX = "spark.sql.iceberg.snapshot-property."; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index b3e8af5fe056..11f910c9f1c4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -50,6 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.sql.RuntimeConfig; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.write.RowLevelOperation.Command; @@ -252,6 +253,12 @@ public long targetDeleteFileSize() { public Map extraSnapshotMetadata() { Map extraSnapshotMetadata = Maps.newHashMap(); + // Add session configuration properties with SNAPSHOT_PROPERTY_PREFIX if necessary + extraSnapshotMetadata.putAll( + PropertyUtil.propertiesWithPrefix( + sessionConf.getAll(), SparkSQLProperties.SNAPSHOT_PROPERTY_PREFIX)); + + // Add write options, overriding session configuration if necessary writeOptions.forEach( (key, value) -> { if (key.startsWith(SnapshotSummary.EXTRA_METADATA_PREFIX)) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 6041c22e2465..8656dde055d5 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -393,6 +393,37 @@ public void testSparkConfOverride() { } } + @TestTemplate + public void testExtraSnapshotMetadataReflectsSessionConfig() { + withSQLConf( + ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), + () -> { + Table table = validationCatalog.loadTable(tableIdent); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + + Map metadata = writeConf.extraSnapshotMetadata(); + + assertThat(metadata).containsEntry("test-key", "session-value"); + }); + } + + @TestTemplate + public void testExtraSnapshotMetadataWriteOptionsOverrideSessionConfig() { + withSQLConf( + ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), + () -> { + Table table = validationCatalog.loadTable(tableIdent); + Map writeOptions = + ImmutableMap.of("snapshot-property.test-key", "write-option-value"); + SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); + + Map metadata = writeConf.extraSnapshotMetadata(); + + // Assert that writeOptions take precedence over session config + assertThat(metadata).containsEntry("test-key", "write-option-value"); + }); + } + @TestTemplate public void testDataPropsDefaultsAsDeleteProps() { List>> propertiesSuites =