diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java new file mode 100644 index 000000000000..58137250003a --- /dev/null +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ExceptionUtil; + +/** + * utility class to accept thread local commit properties + */ +public class CommitMetadata { + + private CommitMetadata() { + + } + + private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); + + /** + * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with + * the metadata defined in properties + * @param properties extra commit metadata to attach to the snapshot committed within callable + * @param callable the code to be executed + * @param exClass the expected type of exception which would be thrown from callable + */ + public static R withCommitProperties( + Map properties, Callable callable, Class exClass) throws E { + COMMIT_PROPERTIES.set(properties); + try { + return callable.call(); + } catch (Throwable e) { + ExceptionUtil.castAndThrow(e, exClass); + return null; + } finally { + COMMIT_PROPERTIES.set(ImmutableMap.of()); + } + } + + public static Map commitProperties() { + return COMMIT_PROPERTIES.get(); + } +} diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 80e6de96a054..930f49ef6ecc 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.util.PropertyUtil; @@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); + } + if (isWapTable() && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 55605288b808..4fa8fdfc6d75 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -23,6 +23,7 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -31,13 +32,16 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; @@ -404,4 +408,41 @@ public void testExtraSnapshotMetadata() throws IOException { Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue")); Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue")); } + + @Test + public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + HadoopTables tables = new HadoopTables(CONF); + + Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(tableLocation); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); + Thread writerThread = new Thread(() -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties(properties, () -> { + spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); + return 0; + }, RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-writer-thread"); + writerThread.start(); + writerThread.join(); + Set threadNames = Sets.newHashSet(); + for (Snapshot snapshot : table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + Assert.assertEquals(2, threadNames.size()); + Assert.assertTrue(threadNames.contains(null)); + Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java new file mode 100644 index 000000000000..58137250003a --- /dev/null +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ExceptionUtil; + +/** + * utility class to accept thread local commit properties + */ +public class CommitMetadata { + + private CommitMetadata() { + + } + + private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); + + /** + * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with + * the metadata defined in properties + * @param properties extra commit metadata to attach to the snapshot committed within callable + * @param callable the code to be executed + * @param exClass the expected type of exception which would be thrown from callable + */ + public static R withCommitProperties( + Map properties, Callable callable, Class exClass) throws E { + COMMIT_PROPERTIES.set(properties); + try { + return callable.call(); + } catch (Throwable e) { + ExceptionUtil.castAndThrow(e, exClass); + return null; + } finally { + COMMIT_PROPERTIES.set(ImmutableMap.of()); + } + } + + public static Map commitProperties() { + return COMMIT_PROPERTIES.get(); + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 80e6de96a054..930f49ef6ecc 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.util.PropertyUtil; @@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); + } + if (isWapTable() && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 55605288b808..4fa8fdfc6d75 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -23,6 +23,7 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -31,13 +32,16 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; @@ -404,4 +408,41 @@ public void testExtraSnapshotMetadata() throws IOException { Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue")); Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue")); } + + @Test + public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + HadoopTables tables = new HadoopTables(CONF); + + Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(tableLocation); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); + Thread writerThread = new Thread(() -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties(properties, () -> { + spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); + return 0; + }, RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-writer-thread"); + writerThread.start(); + writerThread.join(); + Set threadNames = Sets.newHashSet(); + for (Snapshot snapshot : table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + Assert.assertEquals(2, threadNames.size()); + Assert.assertTrue(threadNames.contains(null)); + Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java new file mode 100644 index 000000000000..58137250003a --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ExceptionUtil; + +/** + * utility class to accept thread local commit properties + */ +public class CommitMetadata { + + private CommitMetadata() { + + } + + private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); + + /** + * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with + * the metadata defined in properties + * @param properties extra commit metadata to attach to the snapshot committed within callable + * @param callable the code to be executed + * @param exClass the expected type of exception which would be thrown from callable + */ + public static R withCommitProperties( + Map properties, Callable callable, Class exClass) throws E { + COMMIT_PROPERTIES.set(properties); + try { + return callable.call(); + } catch (Throwable e) { + ExceptionUtil.castAndThrow(e, exClass); + return null; + } finally { + COMMIT_PROPERTIES.set(ImmutableMap.of()); + } + } + + public static Map commitProperties() { + return COMMIT_PROPERTIES.get(); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 7c5b80afbb81..846e0735e64e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -54,6 +54,7 @@ import org.apache.iceberg.io.PositionDeltaWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.types.Types; @@ -249,6 +250,10 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); + } + if (wapEnabled && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index e919a83abcac..b714b805566e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -56,6 +56,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.util.PropertyUtil; @@ -192,6 +193,10 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); + } + if (wapEnabled && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 091696d58f7a..00a11c55a9d1 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -23,6 +23,7 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -31,13 +32,16 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; @@ -404,4 +408,41 @@ public void testExtraSnapshotMetadata() throws IOException { Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue")); Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue")); } + + @Test + public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + HadoopTables tables = new HadoopTables(CONF); + + Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(tableLocation); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); + Thread writerThread = new Thread(() -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CommitMetadata.withCommitProperties(properties, () -> { + spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); + return 0; + }, RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-writer-thread"); + writerThread.start(); + writerThread.join(); + Set threadNames = Sets.newHashSet(); + for (Snapshot snapshot : table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + Assert.assertEquals(2, threadNames.size()); + Assert.assertTrue(threadNames.contains(null)); + Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + } }