From 807cbd1264ba4d51243f3bc459e98653e134af97 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 31 May 2022 22:03:16 -0700 Subject: [PATCH 01/18] thread local --- .../org/apache/iceberg/SnapshotSummary.java | 2 + .../apache/iceberg/util/CommitMetadata.java | 39 +++++++++++++++++++ .../iceberg/spark/source/SparkWrite.java | 5 +++ 3 files changed, 46 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/util/CommitMetadata.java diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index b2d5e2be9bf5..f5748bd22582 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -25,6 +25,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner.MapJoiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.CommitMetadata; public class SnapshotSummary { public static final String ADDED_FILES_PROP = "added-data-files"; @@ -183,6 +184,7 @@ public Map build() { // copy custom summary properties builder.putAll(properties); + builder.putAll(CommitMetadata.commitProperties()); metrics.addTo(builder); setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles); diff --git a/core/src/main/java/org/apache/iceberg/util/CommitMetadata.java b/core/src/main/java/org/apache/iceberg/util/CommitMetadata.java new file mode 100644 index 000000000000..cb24f1baaef1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/CommitMetadata.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.util; + +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +/** + * utility class to accept thread local commit properties + */ +public class CommitMetadata { + private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); + + public static R withCommitProperties(Map properties, Callable callable) throws Exception { + COMMIT_PROPERTIES.set(properties); + try { + return callable.call(); + } finally { + COMMIT_PROPERTIES.set(ImmutableMap.of()); + } + } + + public static Map commitProperties() { + return COMMIT_PROPERTIES.get(); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index e919a83abcac..58a50d6f0e9c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -58,6 +58,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.util.CommitMetadata; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -192,6 +193,10 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); + } + if (wapEnabled && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot From df42e420bd97258452ee7d4923481e3c916c7cfd Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Jun 2022 20:53:02 -0700 Subject: [PATCH 02/18] temp --- .../java/org/apache/iceberg/util/Tasks.java | 2 +- .../iceberg/util/TestCommitMetadata.java | 102 ++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index 02ebd63b422a..6ae85618d0b6 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -64,7 +64,7 @@ public interface FailureTask { } public interface Task { - void run(I item) throws E; + void run(I item) throws Exception; } public static class Builder { diff --git a/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java b/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java new file mode 100644 index 000000000000..f844edec131c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java @@ -0,0 +1,102 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.util; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static junit.framework.TestCase.assertEquals; + +@RunWith(Parameterized.class) +public class TestCommitMetadata extends TableTestBase { + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] { 1, 2 }; + } + + static final DataFile FILE_A = DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + + public TestCommitMetadata(int formatVersion) { + super(formatVersion); + } + + @Test + public void testSetCommitMetadataConcurrently() throws IOException { + File dir = temp.newFolder(); + dir.delete(); + int threadsCount = 3; + int numberOfCommitedFilesPerThread = 1; + + String fileName = UUID.randomUUID().toString(); + DataFile file = DataFiles.builder(table.spec()) + .withPath(FileFormat.PARQUET.addExtension(fileName)) + .withRecordCount(2) + .withFileSizeInBytes(0) + .build(); + ExecutorService executorService = Executors.newFixedThreadPool(threadsCount); + + AtomicInteger barrier = new AtomicInteger(0); + Tasks + .range(threadsCount) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(executorService) + .run(index -> { + for (int numCommittedFiles = 0; numCommittedFiles < numberOfCommitedFilesPerThread; numCommittedFiles++) { + while (barrier.get() < numCommittedFiles * threadsCount) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getId())); + CommitMetadata.withCommitProperties(properties, () -> { + table.newFastAppend().appendFile(file).commit(); + return 0; + }); + barrier.incrementAndGet(); + } + }); + table.refresh(); + assertEquals(threadsCount * numberOfCommitedFilesPerThread, Lists.newArrayList(table.snapshots()).size()); + for (Snapshot snapshot : table.snapshots()) { + System.out.println(snapshot.summary().get("writer-thread")); + } + } +} From 54cea12988d7c93d76ca0883994a37a0a15ea66e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Jun 2022 21:18:57 -0700 Subject: [PATCH 03/18] test --- .../java/org/apache/iceberg/util/Tasks.java | 2 +- .../iceberg/util/TestCommitMetadata.java | 89 +++++++++++-------- 2 files changed, 54 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index 6ae85618d0b6..02ebd63b422a 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -64,7 +64,7 @@ public interface FailureTask { } public interface Task { - void run(I item) throws Exception; + void run(I item) throws E; } public static class Builder { diff --git a/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java b/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java index f844edec131c..681bf9fde7bc 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java +++ b/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java @@ -16,50 +16,61 @@ import java.io.File; import java.io.IOException; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; -import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTableTestBase; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.junit.Rule; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.rules.TemporaryFolder; import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.types.Types.NestedField.required; -@RunWith(Parameterized.class) -public class TestCommitMetadata extends TableTestBase { - @Parameterized.Parameters(name = "formatVersion = {0}") - public static Object[] parameters() { - return new Object[] { 1, 2 }; - } +public class TestCommitMetadata { + + static final Schema TABLE_SCHEMA = new Schema( + required(1, "id", Types.IntegerType.get(), "unique ID"), + required(2, "data", Types.StringType.get()) + ); - static final DataFile FILE_A = DataFiles.builder(SPEC) - .withPath("/path/to/data-a.parquet") - .withFileSizeInBytes(10) - .withPartitionPath("data_bucket=0") // easy way to set partition data for now - .withRecordCount(1) + // Partition spec used to create tables + static final PartitionSpec SPEC = PartitionSpec.builderFor(TABLE_SCHEMA) + .bucket("data", 16) .build(); - public TestCommitMetadata(int formatVersion) { - super(formatVersion); - } + @Rule + public TemporaryFolder temp = new TemporaryFolder(); @Test public void testSetCommitMetadataConcurrently() throws IOException { File dir = temp.newFolder(); dir.delete(); int threadsCount = 3; - int numberOfCommitedFilesPerThread = 1; + Table table = new HadoopTables(new Configuration()).create(TABLE_SCHEMA, SPEC, + ImmutableMap.of(COMMIT_NUM_RETRIES, String.valueOf(threadsCount)), dir.toURI().toString()); String fileName = UUID.randomUUID().toString(); DataFile file = DataFiles.builder(table.spec()) @@ -67,36 +78,42 @@ public void testSetCommitMetadataConcurrently() throws IOException { .withRecordCount(2) .withFileSizeInBytes(0) .build(); - ExecutorService executorService = Executors.newFixedThreadPool(threadsCount); + ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() { + + private AtomicInteger currentThreadCount = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "thread-" + currentThreadCount.getAndIncrement()); + } + }); - AtomicInteger barrier = new AtomicInteger(0); Tasks .range(threadsCount) .stopOnFailure() .throwFailureWhenFinished() .executeWith(executorService) .run(index -> { - for (int numCommittedFiles = 0; numCommittedFiles < numberOfCommitedFilesPerThread; numCommittedFiles++) { - while (barrier.get() < numCommittedFiles * threadsCount) { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); + CommitMetadata.withCommitProperties(properties, () -> { + table.newFastAppend().appendFile(file).commit(); + return 0; + }); + } catch (Exception e) { + e.printStackTrace(); } } - Map properties = Maps.newHashMap(); - properties.put("writer-thread", String.valueOf(Thread.currentThread().getId())); - CommitMetadata.withCommitProperties(properties, () -> { - table.newFastAppend().appendFile(file).commit(); - return 0; - }); - barrier.incrementAndGet(); - } - }); + ); table.refresh(); - assertEquals(threadsCount * numberOfCommitedFilesPerThread, Lists.newArrayList(table.snapshots()).size()); + assertEquals(threadsCount, Lists.newArrayList(table.snapshots()).size()); + Set threadNames = new HashSet<>(); for (Snapshot snapshot : table.snapshots()) { - System.out.println(snapshot.summary().get("writer-thread")); + threadNames.add(snapshot.summary().get("writer-thread")); } + assertTrue(threadNames.contains("thread-0")); + assertTrue(threadNames.contains("thread-1")); + assertTrue(threadNames.contains("thread-2")); } } From 22cc18a433ec4ab0971ef133dd42caf6336e0f61 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Jun 2022 21:20:39 -0700 Subject: [PATCH 04/18] formatting --- .../apache/iceberg/util/TestCommitMetadata.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java b/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java index 681bf9fde7bc..4fa40818d470 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java +++ b/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java @@ -17,11 +17,9 @@ import java.io.File; import java.io.IOException; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -34,7 +32,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.hadoop.HadoopTableTestBase; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -56,11 +53,6 @@ public class TestCommitMetadata { required(2, "data", Types.StringType.get()) ); - // Partition spec used to create tables - static final PartitionSpec SPEC = PartitionSpec.builderFor(TABLE_SCHEMA) - .bucket("data", 16) - .build(); - @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -69,7 +61,11 @@ public void testSetCommitMetadataConcurrently() throws IOException { File dir = temp.newFolder(); dir.delete(); int threadsCount = 3; - Table table = new HadoopTables(new Configuration()).create(TABLE_SCHEMA, SPEC, + Table table = new HadoopTables(new Configuration()).create( + TABLE_SCHEMA, + PartitionSpec.builderFor(TABLE_SCHEMA) + .bucket("data", 16) + .build(), ImmutableMap.of(COMMIT_NUM_RETRIES, String.valueOf(threadsCount)), dir.toURI().toString()); String fileName = UUID.randomUUID().toString(); From 8ee624ccfedaad4ee6c34a7aedd1eee9868b51c5 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 2 Jun 2022 22:46:59 -0700 Subject: [PATCH 05/18] test --- .../iceberg/util/TestCommitMetadata.java | 6 +- .../extensions/TestCommitMetadataWithSQL.java | 108 ++++++++++++++++++ .../iceberg/spark/source/SparkWrite.java | 4 - .../spark/source/TestDataSourceOptions.java | 1 + 4 files changed, 111 insertions(+), 8 deletions(-) create mode 100644 spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCommitMetadataWithSQL.java diff --git a/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java b/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java index 4fa40818d470..f9a649df9f5e 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java +++ b/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java @@ -63,9 +63,7 @@ public void testSetCommitMetadataConcurrently() throws IOException { int threadsCount = 3; Table table = new HadoopTables(new Configuration()).create( TABLE_SCHEMA, - PartitionSpec.builderFor(TABLE_SCHEMA) - .bucket("data", 16) - .build(), + PartitionSpec.unpartitioned(), ImmutableMap.of(COMMIT_NUM_RETRIES, String.valueOf(threadsCount)), dir.toURI().toString()); String fileName = UUID.randomUUID().toString(); @@ -90,7 +88,7 @@ public Thread newThread(Runnable r) { .throwFailureWhenFinished() .executeWith(executorService) .run(index -> { - Map properties = Maps.newHashMap(); + Map properties = Maps.newHashMap(); properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); try { CommitMetadata.withCommitProperties(properties, () -> { diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCommitMetadataWithSQL.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCommitMetadataWithSQL.java new file mode 100644 index 000000000000..dc7f078729a8 --- /dev/null +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCommitMetadataWithSQL.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.CommitMetadata; +import org.apache.iceberg.util.Tasks; +import org.junit.After; +import org.junit.Test; + +import static junit.framework.TestCase.assertTrue; + +public class TestCommitMetadataWithSQL extends SparkRowLevelOperationsTestBase { + + public TestCommitMetadataWithSQL(String catalogName, String implementation, Map config, + String fileFormat, boolean vectorized, String distributionMode) { + super(catalogName, implementation, config, fileFormat, vectorized, distributionMode); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS source"); + } + + @Override + protected Map extraTableProperties() { + return ImmutableMap.of( + TableProperties.FORMAT_VERSION, "2", + TableProperties.MERGE_MODE, "merge-on-read", + TableProperties.COMMIT_NUM_RETRIES, "4" + ); + } + + @Test + public void testExtraSnapshotMetadataWithSQL() throws IOException { + createAndInitTable("id BIGINT, dep STRING"); + sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName); + // add a data file to the 'software' partition + append(tableName, "{ \"id\": 0, \"dep\": \"software\" }"); + + int threadsCount = 3; + ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() { + + private AtomicInteger currentThreadCount = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "thread-" + currentThreadCount.getAndIncrement()); + } + }); + + Tasks + .range(threadsCount) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(executorService) + .run(index -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + try { + CommitMetadata.withCommitProperties(properties, () -> { + createOrReplaceView("source", + "{ \"id\": 1, \"dep\": \"finance\" }\n" + + "{ \"id\": 2, \"dep\": \"hardware\" }"); + sql("MERGE INTO %s target USING source on target.id = source.id" + + " WHEN MATCHED THEN UPDATE SET target.dep='product'", tableName); + return 0; + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + ); + Table table = validationCatalog.loadTable(tableIdent); + Set threadNames = new HashSet<>(); + for (Snapshot snapshot : table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + assertTrue(threadNames.contains("thread-0")); + assertTrue(threadNames.contains("thread-1")); + assertTrue(threadNames.contains("thread-2")); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 58a50d6f0e9c..280663e1861f 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -193,10 +193,6 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } - if (!CommitMetadata.commitProperties().isEmpty()) { - CommitMetadata.commitProperties().forEach(operation::set); - } - if (wapEnabled && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 091696d58f7a..0351d743fb14 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -42,6 +42,7 @@ import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TestCommitMetadata; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; From dcb888ec44e244bec65265ecadb8265dcd9dbad7 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 3 Jun 2022 08:45:55 -0700 Subject: [PATCH 06/18] move to spark-core --- .../org/apache/iceberg/SnapshotSummary.java | 2 - .../iceberg/util/TestCommitMetadata.java | 113 ------------------ .../extensions/TestCommitMetadataWithSQL.java | 108 ----------------- .../apache/iceberg/spark}/CommitMetadata.java | 30 ++++- .../iceberg/spark/source/SparkWrite.java | 6 +- .../spark/source/TestDataSourceOptions.java | 68 ++++++++++- 6 files changed, 101 insertions(+), 226 deletions(-) delete mode 100644 core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java delete mode 100644 spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCommitMetadataWithSQL.java rename {core/src/main/java/org/apache/iceberg/util => spark/v3.2/spark/src/main/java/org/apache/iceberg/spark}/CommitMetadata.java (52%) diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index f5748bd22582..b2d5e2be9bf5 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -25,7 +25,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner.MapJoiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.CommitMetadata; public class SnapshotSummary { public static final String ADDED_FILES_PROP = "added-data-files"; @@ -184,7 +183,6 @@ public Map build() { // copy custom summary properties builder.putAll(properties); - builder.putAll(CommitMetadata.commitProperties()); metrics.addTo(builder); setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles); diff --git a/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java b/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java deleted file mode 100644 index f9a649df9f5e..000000000000 --- a/core/src/test/java/org/apache/iceberg/util/TestCommitMetadata.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iceberg.util; - -import java.io.File; -import java.io.IOException; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; -import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.Types; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertTrue; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; -import static org.apache.iceberg.types.Types.NestedField.required; - -public class TestCommitMetadata { - - static final Schema TABLE_SCHEMA = new Schema( - required(1, "id", Types.IntegerType.get(), "unique ID"), - required(2, "data", Types.StringType.get()) - ); - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - @Test - public void testSetCommitMetadataConcurrently() throws IOException { - File dir = temp.newFolder(); - dir.delete(); - int threadsCount = 3; - Table table = new HadoopTables(new Configuration()).create( - TABLE_SCHEMA, - PartitionSpec.unpartitioned(), - ImmutableMap.of(COMMIT_NUM_RETRIES, String.valueOf(threadsCount)), dir.toURI().toString()); - - String fileName = UUID.randomUUID().toString(); - DataFile file = DataFiles.builder(table.spec()) - .withPath(FileFormat.PARQUET.addExtension(fileName)) - .withRecordCount(2) - .withFileSizeInBytes(0) - .build(); - ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() { - - private AtomicInteger currentThreadCount = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "thread-" + currentThreadCount.getAndIncrement()); - } - }); - - Tasks - .range(threadsCount) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(executorService) - .run(index -> { - Map properties = Maps.newHashMap(); - properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - try { - CommitMetadata.withCommitProperties(properties, () -> { - table.newFastAppend().appendFile(file).commit(); - return 0; - }); - } catch (Exception e) { - e.printStackTrace(); - } - } - ); - table.refresh(); - assertEquals(threadsCount, Lists.newArrayList(table.snapshots()).size()); - Set threadNames = new HashSet<>(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - assertTrue(threadNames.contains("thread-0")); - assertTrue(threadNames.contains("thread-1")); - assertTrue(threadNames.contains("thread-2")); - } -} diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCommitMetadataWithSQL.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCommitMetadataWithSQL.java deleted file mode 100644 index dc7f078729a8..000000000000 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCommitMetadataWithSQL.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iceberg.spark.extensions; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.util.CommitMetadata; -import org.apache.iceberg.util.Tasks; -import org.junit.After; -import org.junit.Test; - -import static junit.framework.TestCase.assertTrue; - -public class TestCommitMetadataWithSQL extends SparkRowLevelOperationsTestBase { - - public TestCommitMetadataWithSQL(String catalogName, String implementation, Map config, - String fileFormat, boolean vectorized, String distributionMode) { - super(catalogName, implementation, config, fileFormat, vectorized, distributionMode); - } - - @After - public void removeTables() { - sql("DROP TABLE IF EXISTS %s", tableName); - sql("DROP TABLE IF EXISTS source"); - } - - @Override - protected Map extraTableProperties() { - return ImmutableMap.of( - TableProperties.FORMAT_VERSION, "2", - TableProperties.MERGE_MODE, "merge-on-read", - TableProperties.COMMIT_NUM_RETRIES, "4" - ); - } - - @Test - public void testExtraSnapshotMetadataWithSQL() throws IOException { - createAndInitTable("id BIGINT, dep STRING"); - sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName); - // add a data file to the 'software' partition - append(tableName, "{ \"id\": 0, \"dep\": \"software\" }"); - - int threadsCount = 3; - ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() { - - private AtomicInteger currentThreadCount = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "thread-" + currentThreadCount.getAndIncrement()); - } - }); - - Tasks - .range(threadsCount) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(executorService) - .run(index -> { - Map properties = Maps.newHashMap(); - properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - try { - CommitMetadata.withCommitProperties(properties, () -> { - createOrReplaceView("source", - "{ \"id\": 1, \"dep\": \"finance\" }\n" + - "{ \"id\": 2, \"dep\": \"hardware\" }"); - sql("MERGE INTO %s target USING source on target.id = source.id" + - " WHEN MATCHED THEN UPDATE SET target.dep='product'", tableName); - return 0; - }); - } catch (Exception e) { - e.printStackTrace(); - } - } - ); - Table table = validationCatalog.loadTable(tableIdent); - Set threadNames = new HashSet<>(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - assertTrue(threadNames.contains("thread-0")); - assertTrue(threadNames.contains("thread-1")); - assertTrue(threadNames.contains("thread-2")); - } -} diff --git a/core/src/main/java/org/apache/iceberg/util/CommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java similarity index 52% rename from core/src/main/java/org/apache/iceberg/util/CommitMetadata.java rename to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java index cb24f1baaef1..9ad6b4c7da14 100644 --- a/core/src/main/java/org/apache/iceberg/util/CommitMetadata.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java @@ -12,7 +12,35 @@ * limitations under the License. */ -package org.apache.iceberg.util; +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.spark; import java.util.Map; import java.util.concurrent.Callable; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 280663e1861f..b714b805566e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -56,9 +56,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; -import org.apache.iceberg.util.CommitMetadata; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -193,6 +193,10 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); + } + if (wapEnabled && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 0351d743fb14..7beca4a02a23 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -21,8 +21,14 @@ import java.io.IOException; import java.math.RoundingMode; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -31,6 +37,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; @@ -38,11 +45,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.math.LongMath; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.iceberg.util.TestCommitMetadata; +import org.apache.iceberg.util.Tasks; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -55,6 +63,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static junit.framework.TestCase.assertTrue; import static org.apache.iceberg.types.Types.NestedField.optional; public class TestDataSourceOptions { @@ -405,4 +414,61 @@ public void testExtraSnapshotMetadata() throws IOException { Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue")); Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue")); } + + @Test + public void testExtraSnapshotMetadataWithSQL() throws IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + HadoopTables tables = new HadoopTables(CONF); + int threadsCount = 3; + ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() { + + private AtomicInteger currentThreadCount = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "thread-" + currentThreadCount.getAndIncrement()); + } + }); + + Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue") + .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue") + .save(tableLocation); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); + Tasks + .range(threadsCount) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(executorService) + .run(index -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + try { + CommitMetadata.withCommitProperties(properties, () -> { + spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); + return 0; + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + ); + Set threadNames = new HashSet<>(); + for (Snapshot snapshot : table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + assertTrue(threadNames.contains("thread-0")); + assertTrue(threadNames.contains("thread-1")); + assertTrue(threadNames.contains("thread-2")); + + } } From bce118825ff159fff181ccc662188fe3739067e2 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Fri, 3 Jun 2022 22:16:57 -0700 Subject: [PATCH 07/18] address comments --- .../spark/CallerWithCommitMetadata.java | 50 +++++++++++ .../apache/iceberg/spark/CommitMetadata.java | 67 -------------- .../iceberg/spark/source/SparkWrite.java | 6 +- .../spark/source/TestDataSourceOptions.java | 88 +++++++++---------- 4 files changed, 94 insertions(+), 117 deletions(-) create mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java delete mode 100644 spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java new file mode 100644 index 000000000000..4375a3035a76 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +/** + * utility class to accept thread local commit properties + */ +public class CallerWithCommitMetadata { + + private CallerWithCommitMetadata() {} + + private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); + + public static R withCommitProperties(Map properties, Callable callable) + throws RuntimeException { + COMMIT_PROPERTIES.set(properties); + try { + return callable.call(); + } catch (Exception e){ + throw new RuntimeException(e); + } finally { + COMMIT_PROPERTIES.set(ImmutableMap.of()); + } + } + + public static Map commitProperties() { + return COMMIT_PROPERTIES.get(); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java deleted file mode 100644 index 9ad6b4c7da14..000000000000 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iceberg.spark; - -import java.util.Map; -import java.util.concurrent.Callable; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; - -/** - * utility class to accept thread local commit properties - */ -public class CommitMetadata { - private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); - - public static R withCommitProperties(Map properties, Callable callable) throws Exception { - COMMIT_PROPERTIES.set(properties); - try { - return callable.call(); - } finally { - COMMIT_PROPERTIES.set(ImmutableMap.of()); - } - } - - public static Map commitProperties() { - return COMMIT_PROPERTIES.get(); - } -} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index b714b805566e..36fee7db5398 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -56,7 +56,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.CommitMetadata; +import org.apache.iceberg.spark.CallerWithCommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.util.PropertyUtil; @@ -193,8 +193,8 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } - if (!CommitMetadata.commitProperties().isEmpty()) { - CommitMetadata.commitProperties().forEach(operation::set); + if (!CallerWithCommitMetadata.commitProperties().isEmpty()) { + CallerWithCommitMetadata.commitProperties().forEach(operation::set); } if (wapEnabled && wapId != null) { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 7beca4a02a23..b84d9806b947 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -21,14 +21,10 @@ import java.io.IOException; import java.math.RoundingMode; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -45,17 +41,19 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.math.LongMath; -import org.apache.iceberg.spark.CommitMetadata; +import org.apache.iceberg.spark.CallerWithCommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.glassfish.jersey.internal.guava.Sets; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -420,55 +418,51 @@ public void testExtraSnapshotMetadataWithSQL() throws IOException { String tableLocation = temp.newFolder("iceberg-table").toString(); HadoopTables tables = new HadoopTables(CONF); int threadsCount = 3; - ExecutorService executorService = Executors.newFixedThreadPool(threadsCount, new ThreadFactory() { - - private AtomicInteger currentThreadCount = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "thread-" + currentThreadCount.getAndIncrement()); - } - }); + ExecutorService executorService = null; + try { + executorService = ThreadPools.newWorkerPool("thread", threadsCount); - Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); - List expectedRecords = Lists.newArrayList( - new SimpleRecord(1, "a"), - new SimpleRecord(2, "b") - ); - Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); - originalDf.select("id", "data").write() - .format("iceberg") - .mode("append") - .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".extra-key", "someValue") - .option(SparkWriteOptions.SNAPSHOT_PROPERTY_PREFIX + ".another-key", "anotherValue") - .save(tableLocation); - spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); - Tasks - .range(threadsCount) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(executorService) - .run(index -> { - Map properties = Maps.newHashMap(); - properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - try { - CommitMetadata.withCommitProperties(properties, () -> { + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(tableLocation); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); + + Tasks + .range(threadsCount) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(executorService) + .run(index -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CallerWithCommitMetadata.withCommitProperties(properties, () -> { spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); return 0; }); - } catch (Exception e) { - e.printStackTrace(); } - } - ); - Set threadNames = new HashSet<>(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); + ); + + Set threadNames = Sets.newHashSet(); + for (Snapshot snapshot : table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + + assertTrue(threadNames.contains("thread-0")); + assertTrue(threadNames.contains("thread-1")); + assertTrue(threadNames.contains("thread-2")); + } finally { + if (executorService != null) { + executorService.shutdown(); + } } - assertTrue(threadNames.contains("thread-0")); - assertTrue(threadNames.contains("thread-1")); - assertTrue(threadNames.contains("thread-2")); } } From 0191a77ed75a27e9a6e2e9e332d96d729a240a8b Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 15:05:49 -0700 Subject: [PATCH 08/18] address comments --- .../spark/CallerWithCommitMetadata.java | 9 ++- .../spark/source/TestDataSourceOptions.java | 75 ++++++++----------- 2 files changed, 35 insertions(+), 49 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java index 4375a3035a76..17e7729be691 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.Callable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ExceptionUtil; /** * utility class to accept thread local commit properties @@ -32,16 +33,16 @@ private CallerWithCommitMetadata() {} private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); - public static R withCommitProperties(Map properties, Callable callable) - throws RuntimeException { + public static R withCommitProperties(Map properties, Callable callable) { COMMIT_PROPERTIES.set(properties); try { return callable.call(); - } catch (Exception e){ - throw new RuntimeException(e); + } catch (Throwable e) { + ExceptionUtil.castAndThrow(e, RuntimeException.class); } finally { COMMIT_PROPERTIES.set(ImmutableMap.of()); } + return null; } public static Map commitProperties() { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index b84d9806b947..8501effe9c98 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -61,6 +61,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertTrue; import static org.apache.iceberg.types.Types.NestedField.optional; @@ -414,55 +415,39 @@ public void testExtraSnapshotMetadata() throws IOException { } @Test - public void testExtraSnapshotMetadataWithSQL() throws IOException { + public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException { String tableLocation = temp.newFolder("iceberg-table").toString(); HadoopTables tables = new HadoopTables(CONF); - int threadsCount = 3; - ExecutorService executorService = null; - try { - executorService = ThreadPools.newWorkerPool("thread", threadsCount); - Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); - List expectedRecords = Lists.newArrayList( - new SimpleRecord(1, "a"), - new SimpleRecord(2, "b") - ); - Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); - originalDf.select("id", "data").write() - .format("iceberg") - .mode("append") - .save(tableLocation); - spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); - - Tasks - .range(threadsCount) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(executorService) - .run(index -> { - Map properties = Maps.newHashMap(); - properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - CallerWithCommitMetadata.withCommitProperties(properties, () -> { - spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); - return 0; - }); - } - ); - - Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot : table.snapshots()) { - threadNames.add(snapshot.summary().get("writer-thread")); - } - - assertTrue(threadNames.contains("thread-0")); - assertTrue(threadNames.contains("thread-1")); - assertTrue(threadNames.contains("thread-2")); - } finally { - if (executorService != null) { - executorService.shutdown(); - } + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(tableLocation); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); + Thread writerThread = new Thread(() -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CallerWithCommitMetadata.withCommitProperties(properties, () -> { + spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); + return 0; + }); + }); + writerThread.setName("test-extra-commit-message-writer-thread"); + writerThread.start(); + writerThread.join(); + Set threadNames = Sets.newHashSet(); + for (Snapshot snapshot: table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); } - + assertEquals(2, threadNames.size()); + assertTrue(threadNames.contains(null)); + assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); } } From aadf10462458df80de9df2f09ed6cea6c947e897 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 15:13:49 -0700 Subject: [PATCH 09/18] parameterize exception classes --- .../iceberg/spark/CallerWithCommitMetadata.java | 12 ++++++++++-- .../iceberg/spark/source/TestDataSourceOptions.java | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java index 17e7729be691..07b948e02799 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java @@ -33,12 +33,20 @@ private CallerWithCommitMetadata() {} private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); - public static R withCommitProperties(Map properties, Callable callable) { + /** + * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with + * the metadata defined in properties + * @param properties extra commit metadata to attach to the snapshot committed within callable + * @param callable the code to be executed + * @param exClass the expected type of exception which would be thrown from callable + */ + public static R withCommitProperties( + Map properties, Callable callable, Class exClass) throws E { COMMIT_PROPERTIES.set(properties); try { return callable.call(); } catch (Throwable e) { - ExceptionUtil.castAndThrow(e, RuntimeException.class); + ExceptionUtil.castAndThrow(e, exClass); } finally { COMMIT_PROPERTIES.set(ImmutableMap.of()); } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 8501effe9c98..120163febc31 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -437,7 +437,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx CallerWithCommitMetadata.withCommitProperties(properties, () -> { spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); return 0; - }); + }, RuntimeException.class); }); writerThread.setName("test-extra-commit-message-writer-thread"); writerThread.start(); From c11ee4f1d3d647ac2729a7297b993f1055797a24 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 15:45:06 -0700 Subject: [PATCH 10/18] stylistic fix --- .../org/apache/iceberg/spark/CallerWithCommitMetadata.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java index 07b948e02799..2b839c69324e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java @@ -29,7 +29,9 @@ */ public class CallerWithCommitMetadata { - private CallerWithCommitMetadata() {} + private CallerWithCommitMetadata() { + + } private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); From 3e0c364fd5a6d4c0897e4232e393ffb1fbee9ae5 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 15:46:36 -0700 Subject: [PATCH 11/18] bit --- .../java/org/apache/iceberg/spark/CallerWithCommitMetadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java index 2b839c69324e..53cbba556b04 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java @@ -49,10 +49,10 @@ public static R withCommitProperties( return callable.call(); } catch (Throwable e) { ExceptionUtil.castAndThrow(e, exClass); + return null; } finally { COMMIT_PROPERTIES.set(ImmutableMap.of()); } - return null; } public static Map commitProperties() { From a5f55121530227676a988a517f37687204df6644 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 16:00:09 -0700 Subject: [PATCH 12/18] remove unused imports --- .../main/java/org/apache/iceberg/spark/CallerWithMetadata.java | 0 .../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 3 --- 2 files changed, 3 deletions(-) create mode 100644 spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithMetadata.java diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithMetadata.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithMetadata.java new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 120163febc31..2b7ae17a97f1 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -46,8 +45,6 @@ import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; -import org.apache.iceberg.util.Tasks; -import org.apache.iceberg.util.ThreadPools; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; From a3d7445a1c69f9bcc55d9d188bc55b2a96d80683 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 16:00:49 -0700 Subject: [PATCH 13/18] remove file --- .../main/java/org/apache/iceberg/spark/CallerWithMetadata.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithMetadata.java diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithMetadata.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithMetadata.java deleted file mode 100644 index e69de29bb2d1..000000000000 From e1cd0567970b1eb70d54b8abd6bf46783f11c9f7 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 16:09:32 -0700 Subject: [PATCH 14/18] 3.0 and 3.1 --- .../spark/CallerWithCommitMetadata.java | 61 +++++++++++++++++++ .../iceberg/spark/source/SparkWrite.java | 5 ++ .../spark/source/TestDataSourceOptions.java | 43 +++++++++++++ .../spark/CallerWithCommitMetadata.java | 61 +++++++++++++++++++ .../iceberg/spark/source/SparkWrite.java | 5 ++ .../spark/source/TestDataSourceOptions.java | 49 ++++++++++++++- 6 files changed, 221 insertions(+), 3 deletions(-) create mode 100644 spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java create mode 100644 spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java new file mode 100644 index 000000000000..53cbba556b04 --- /dev/null +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ExceptionUtil; + +/** + * utility class to accept thread local commit properties + */ +public class CallerWithCommitMetadata { + + private CallerWithCommitMetadata() { + + } + + private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); + + /** + * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with + * the metadata defined in properties + * @param properties extra commit metadata to attach to the snapshot committed within callable + * @param callable the code to be executed + * @param exClass the expected type of exception which would be thrown from callable + */ + public static R withCommitProperties( + Map properties, Callable callable, Class exClass) throws E { + COMMIT_PROPERTIES.set(properties); + try { + return callable.call(); + } catch (Throwable e) { + ExceptionUtil.castAndThrow(e, exClass); + return null; + } finally { + COMMIT_PROPERTIES.set(ImmutableMap.of()); + } + } + + public static Map commitProperties() { + return COMMIT_PROPERTIES.get(); + } +} diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 80e6de96a054..edc88d275814 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.CallerWithCommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.util.PropertyUtil; @@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } + if (!CallerWithCommitMetadata.commitProperties().isEmpty()) { + CallerWithCommitMetadata.commitProperties().forEach(operation::set); + } + if (isWapTable() && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 55605288b808..d1464a89cf9f 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -23,6 +23,7 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -31,6 +32,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; @@ -38,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.math.LongMath; +import org.apache.iceberg.spark.CallerWithCommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; @@ -47,6 +50,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.glassfish.jersey.internal.guava.Sets; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -54,6 +58,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; import static org.apache.iceberg.types.Types.NestedField.optional; public class TestDataSourceOptions { @@ -404,4 +410,41 @@ public void testExtraSnapshotMetadata() throws IOException { Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue")); Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue")); } + + @Test + public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + HadoopTables tables = new HadoopTables(CONF); + + Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(tableLocation); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); + Thread writerThread = new Thread(() -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CallerWithCommitMetadata.withCommitProperties(properties, () -> { + spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); + return 0; + }, RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-writer-thread"); + writerThread.start(); + writerThread.join(); + Set threadNames = Sets.newHashSet(); + for (Snapshot snapshot: table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + assertEquals(2, threadNames.size()); + assertTrue(threadNames.contains(null)); + assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java new file mode 100644 index 000000000000..53cbba556b04 --- /dev/null +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.ExceptionUtil; + +/** + * utility class to accept thread local commit properties + */ +public class CallerWithCommitMetadata { + + private CallerWithCommitMetadata() { + + } + + private static final ThreadLocal> COMMIT_PROPERTIES = ThreadLocal.withInitial(ImmutableMap::of); + + /** + * running the code wrapped as a caller, and any snapshot committed within the callable object will be attached with + * the metadata defined in properties + * @param properties extra commit metadata to attach to the snapshot committed within callable + * @param callable the code to be executed + * @param exClass the expected type of exception which would be thrown from callable + */ + public static R withCommitProperties( + Map properties, Callable callable, Class exClass) throws E { + COMMIT_PROPERTIES.set(properties); + try { + return callable.call(); + } catch (Throwable e) { + ExceptionUtil.castAndThrow(e, exClass); + return null; + } finally { + COMMIT_PROPERTIES.set(ImmutableMap.of()); + } + } + + public static Map commitProperties() { + return COMMIT_PROPERTIES.get(); + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 80e6de96a054..edc88d275814 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.CallerWithCommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.util.PropertyUtil; @@ -174,6 +175,10 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } + if (!CallerWithCommitMetadata.commitProperties().isEmpty()) { + CallerWithCommitMetadata.commitProperties().forEach(operation::set); + } + if (isWapTable() && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 55605288b808..2b7ae17a97f1 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -23,6 +23,7 @@ import java.math.RoundingMode; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; @@ -31,6 +32,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; @@ -38,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.math.LongMath; +import org.apache.iceberg.spark.CallerWithCommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; @@ -47,6 +50,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.glassfish.jersey.internal.guava.Sets; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -54,6 +58,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; import static org.apache.iceberg.types.Types.NestedField.optional; public class TestDataSourceOptions { @@ -245,7 +251,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan", + "Cannot set start-snapshot-id and end-snapshot-id for incremental scans", () -> { spark.read() .format("iceberg") @@ -258,7 +264,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan", + "Cannot set start-snapshot-id and end-snapshot-id for incremental scans", () -> { spark.read() .format("iceberg") @@ -272,7 +278,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot only specify option end-snapshot-id to do incremental scan", + "Cannot set only end-snapshot-id for incremental scans", () -> { spark.read() .format("iceberg") @@ -404,4 +410,41 @@ public void testExtraSnapshotMetadata() throws IOException { Assert.assertTrue(table.currentSnapshot().summary().get("extra-key").equals("someValue")); Assert.assertTrue(table.currentSnapshot().summary().get("another-key").equals("anotherValue")); } + + @Test + public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOException { + String tableLocation = temp.newFolder("iceberg-table").toString(); + HadoopTables tables = new HadoopTables(CONF); + + Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + + List expectedRecords = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); + originalDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(tableLocation); + spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); + Thread writerThread = new Thread(() -> { + Map properties = Maps.newHashMap(); + properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); + CallerWithCommitMetadata.withCommitProperties(properties, () -> { + spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); + return 0; + }, RuntimeException.class); + }); + writerThread.setName("test-extra-commit-message-writer-thread"); + writerThread.start(); + writerThread.join(); + Set threadNames = Sets.newHashSet(); + for (Snapshot snapshot: table.snapshots()) { + threadNames.add(snapshot.summary().get("writer-thread")); + } + assertEquals(2, threadNames.size()); + assertTrue(threadNames.contains(null)); + assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + } } From 9b94da44baa07c6b62ae9e910684fb56e4d1f593 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 16:37:45 -0700 Subject: [PATCH 15/18] fix building issues --- .palantir/revapi.yml | 7 +++-- .../spark/source/TestDataSourceOptions.java | 29 +++++++++---------- .../spark/source/TestDataSourceOptions.java | 13 ++++----- .../spark/source/TestDataSourceOptions.java | 13 ++++----- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index b6af2e88686e..f8ce6a819258 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1,4 +1,5 @@ versionOverrides: + org.apache.iceberg:iceberg-api:release-base-0.12.0: "0.13.1" org.apache.iceberg:iceberg-api:release-base-0.13.0: "0.13.0" acceptedBreaks: release-base-0.13.0: @@ -44,10 +45,12 @@ acceptedBreaks: justification: "Allow adding a new method to the interface - old method is deprecated" - code: "java.method.addedToInterface" new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedEqualityDeleteFilesCount()" - justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" + justification: "Interface is backward compatible, very unlikely anyone implements\ + \ this Result bean interface" - code: "java.method.addedToInterface" new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedPositionDeleteFilesCount()" - justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" + justification: "Interface is backward compatible, very unlikely anyone implements\ + \ this Result bean interface" - code: "java.method.addedToInterface" new: "method org.apache.iceberg.ExpireSnapshots org.apache.iceberg.ExpireSnapshots::planWith(java.util.concurrent.ExecutorService)" justification: "Accept all changes prior to introducing API compatibility checks" diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index d1464a89cf9f..d2704603797b 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -39,6 +39,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CallerWithCommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; @@ -50,7 +51,6 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.glassfish.jersey.internal.guava.Sets; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -58,8 +58,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertTrue; + import static org.apache.iceberg.types.Types.NestedField.optional; public class TestDataSourceOptions { @@ -251,7 +250,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan", + "Cannot set start-snapshot-id and end-snapshot-id for incremental scans", () -> { spark.read() .format("iceberg") @@ -264,7 +263,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan", + "Cannot set start-snapshot-id and end-snapshot-id for incremental scans", () -> { spark.read() .format("iceberg") @@ -278,7 +277,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot only specify option end-snapshot-id to do incremental scan", + "Cannot set only end-snapshot-id for incremental scans", () -> { spark.read() .format("iceberg") @@ -419,14 +418,14 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); List expectedRecords = Lists.newArrayList( - new SimpleRecord(1, "a"), - new SimpleRecord(2, "b") + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") ); Dataset originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class); originalDf.select("id", "data").write() - .format("iceberg") - .mode("append") - .save(tableLocation); + .format("iceberg") + .mode("append") + .save(tableLocation); spark.read().format("iceberg").load(tableLocation).createOrReplaceTempView("target"); Thread writerThread = new Thread(() -> { Map properties = Maps.newHashMap(); @@ -440,11 +439,11 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.start(); writerThread.join(); Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot: table.snapshots()) { + for (Snapshot snapshot : table.snapshots()) { threadNames.add(snapshot.summary().get("writer-thread")); } - assertEquals(2, threadNames.size()); - assertTrue(threadNames.contains(null)); - assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + Assert.assertEquals(2, threadNames.size()); + Assert.assertTrue(threadNames.contains(null)); + Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); } } diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 2b7ae17a97f1..d2704603797b 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -39,6 +39,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CallerWithCommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; @@ -50,7 +51,6 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.glassfish.jersey.internal.guava.Sets; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -58,8 +58,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertTrue; + import static org.apache.iceberg.types.Types.NestedField.optional; public class TestDataSourceOptions { @@ -440,11 +439,11 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.start(); writerThread.join(); Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot: table.snapshots()) { + for (Snapshot snapshot : table.snapshots()) { threadNames.add(snapshot.summary().get("writer-thread")); } - assertEquals(2, threadNames.size()); - assertTrue(threadNames.contains(null)); - assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + Assert.assertEquals(2, threadNames.size()); + Assert.assertTrue(threadNames.contains(null)); + Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 2b7ae17a97f1..d2704603797b 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -39,6 +39,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.spark.CallerWithCommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; @@ -50,7 +51,6 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.glassfish.jersey.internal.guava.Sets; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -58,8 +58,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertTrue; + import static org.apache.iceberg.types.Types.NestedField.optional; public class TestDataSourceOptions { @@ -440,11 +439,11 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx writerThread.start(); writerThread.join(); Set threadNames = Sets.newHashSet(); - for (Snapshot snapshot: table.snapshots()) { + for (Snapshot snapshot : table.snapshots()) { threadNames.add(snapshot.summary().get("writer-thread")); } - assertEquals(2, threadNames.size()); - assertTrue(threadNames.contains(null)); - assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); + Assert.assertEquals(2, threadNames.size()); + Assert.assertTrue(threadNames.contains(null)); + Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread")); } } From 023de7069c15865c9f6982740a8dccd9d06478ca Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 16:40:11 -0700 Subject: [PATCH 16/18] redo some changes --- .palantir/revapi.yml | 7 ++----- .../apache/iceberg/spark/source/TestDataSourceOptions.java | 1 - .../apache/iceberg/spark/source/TestDataSourceOptions.java | 1 - .../apache/iceberg/spark/source/TestDataSourceOptions.java | 1 - 4 files changed, 2 insertions(+), 8 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index f8ce6a819258..b6af2e88686e 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1,5 +1,4 @@ versionOverrides: - org.apache.iceberg:iceberg-api:release-base-0.12.0: "0.13.1" org.apache.iceberg:iceberg-api:release-base-0.13.0: "0.13.0" acceptedBreaks: release-base-0.13.0: @@ -45,12 +44,10 @@ acceptedBreaks: justification: "Allow adding a new method to the interface - old method is deprecated" - code: "java.method.addedToInterface" new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedEqualityDeleteFilesCount()" - justification: "Interface is backward compatible, very unlikely anyone implements\ - \ this Result bean interface" + justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" - code: "java.method.addedToInterface" new: "method long org.apache.iceberg.actions.ExpireSnapshots.Result::deletedPositionDeleteFilesCount()" - justification: "Interface is backward compatible, very unlikely anyone implements\ - \ this Result bean interface" + justification: "Interface is backward compatible, very unlikely anyone implements this Result bean interface" - code: "java.method.addedToInterface" new: "method org.apache.iceberg.ExpireSnapshots org.apache.iceberg.ExpireSnapshots::planWith(java.util.concurrent.ExecutorService)" justification: "Accept all changes prior to introducing API compatibility checks" diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index d2704603797b..6d1ce1c9e20b 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -58,7 +58,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; - import static org.apache.iceberg.types.Types.NestedField.optional; public class TestDataSourceOptions { diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index d2704603797b..6d1ce1c9e20b 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -58,7 +58,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; - import static org.apache.iceberg.types.Types.NestedField.optional; public class TestDataSourceOptions { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index d2704603797b..6d1ce1c9e20b 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -58,7 +58,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; - import static org.apache.iceberg.types.Types.NestedField.optional; public class TestDataSourceOptions { From c89a49cc0f07680cbdf8cccce68ea5a479f9a798 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 5 Jun 2022 17:50:23 -0700 Subject: [PATCH 17/18] fix tests --- .../apache/iceberg/spark/source/TestDataSourceOptions.java | 6 +++--- .../apache/iceberg/spark/source/TestDataSourceOptions.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 6d1ce1c9e20b..c3948acb8399 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -249,7 +249,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot set start-snapshot-id and end-snapshot-id for incremental scans", + "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan", () -> { spark.read() .format("iceberg") @@ -262,7 +262,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot set start-snapshot-id and end-snapshot-id for incremental scans", + "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan", () -> { spark.read() .format("iceberg") @@ -276,7 +276,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot set only end-snapshot-id for incremental scans", + "Cannot only specify option end-snapshot-id to do incremental scan", () -> { spark.read() .format("iceberg") diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 6d1ce1c9e20b..c3948acb8399 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -249,7 +249,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot set start-snapshot-id and end-snapshot-id for incremental scans", + "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan", () -> { spark.read() .format("iceberg") @@ -262,7 +262,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot set start-snapshot-id and end-snapshot-id for incremental scans", + "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan", () -> { spark.read() .format("iceberg") @@ -276,7 +276,7 @@ public void testIncrementalScanOptions() throws IOException { AssertHelpers.assertThrows( "Check both start-snapshot-id and snapshot-id are configured", IllegalArgumentException.class, - "Cannot set only end-snapshot-id for incremental scans", + "Cannot only specify option end-snapshot-id to do incremental scan", () -> { spark.read() .format("iceberg") From cd4147fbf6f87a8a78a5d3b1965398b2bf8e72fb Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 6 Jun 2022 11:01:42 -0700 Subject: [PATCH 18/18] addr comments --- .../{CallerWithCommitMetadata.java => CommitMetadata.java} | 4 ++-- .../java/org/apache/iceberg/spark/source/SparkWrite.java | 6 +++--- .../apache/iceberg/spark/source/TestDataSourceOptions.java | 4 ++-- .../{CallerWithCommitMetadata.java => CommitMetadata.java} | 4 ++-- .../java/org/apache/iceberg/spark/source/SparkWrite.java | 6 +++--- .../apache/iceberg/spark/source/TestDataSourceOptions.java | 4 ++-- .../{CallerWithCommitMetadata.java => CommitMetadata.java} | 4 ++-- .../iceberg/spark/source/SparkPositionDeltaWrite.java | 5 +++++ .../java/org/apache/iceberg/spark/source/SparkWrite.java | 6 +++--- .../apache/iceberg/spark/source/TestDataSourceOptions.java | 4 ++-- 10 files changed, 26 insertions(+), 21 deletions(-) rename spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/{CallerWithCommitMetadata.java => CommitMetadata.java} (96%) rename spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/{CallerWithCommitMetadata.java => CommitMetadata.java} (96%) rename spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/{CallerWithCommitMetadata.java => CommitMetadata.java} (96%) diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java similarity index 96% rename from spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java rename to spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java index 53cbba556b04..58137250003a 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java @@ -27,9 +27,9 @@ /** * utility class to accept thread local commit properties */ -public class CallerWithCommitMetadata { +public class CommitMetadata { - private CallerWithCommitMetadata() { + private CommitMetadata() { } diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index edc88d275814..930f49ef6ecc 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -57,7 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.CallerWithCommitMetadata; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.util.PropertyUtil; @@ -175,8 +175,8 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } - if (!CallerWithCommitMetadata.commitProperties().isEmpty()) { - CallerWithCommitMetadata.commitProperties().forEach(operation::set); + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); } if (isWapTable() && wapId != null) { diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index c3948acb8399..4fa8fdfc6d75 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -41,7 +41,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; -import org.apache.iceberg.spark.CallerWithCommitMetadata; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; @@ -429,7 +429,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Thread writerThread = new Thread(() -> { Map properties = Maps.newHashMap(); properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - CallerWithCommitMetadata.withCommitProperties(properties, () -> { + CommitMetadata.withCommitProperties(properties, () -> { spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); return 0; }, RuntimeException.class); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java similarity index 96% rename from spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java rename to spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java index 53cbba556b04..58137250003a 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java @@ -27,9 +27,9 @@ /** * utility class to accept thread local commit properties */ -public class CallerWithCommitMetadata { +public class CommitMetadata { - private CallerWithCommitMetadata() { + private CommitMetadata() { } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index edc88d275814..930f49ef6ecc 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -57,7 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.CallerWithCommitMetadata; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.util.PropertyUtil; @@ -175,8 +175,8 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } - if (!CallerWithCommitMetadata.commitProperties().isEmpty()) { - CallerWithCommitMetadata.commitProperties().forEach(operation::set); + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); } if (isWapTable() && wapId != null) { diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index c3948acb8399..4fa8fdfc6d75 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -41,7 +41,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; -import org.apache.iceberg.spark.CallerWithCommitMetadata; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; @@ -429,7 +429,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Thread writerThread = new Thread(() -> { Map properties = Maps.newHashMap(); properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - CallerWithCommitMetadata.withCommitProperties(properties, () -> { + CommitMetadata.withCommitProperties(properties, () -> { spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); return 0; }, RuntimeException.class); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java similarity index 96% rename from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java rename to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java index 53cbba556b04..58137250003a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CallerWithCommitMetadata.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/CommitMetadata.java @@ -27,9 +27,9 @@ /** * utility class to accept thread local commit properties */ -public class CallerWithCommitMetadata { +public class CommitMetadata { - private CallerWithCommitMetadata() { + private CommitMetadata() { } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 7c5b80afbb81..846e0735e64e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -54,6 +54,7 @@ import org.apache.iceberg.io.PositionDeltaWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.types.Types; @@ -249,6 +250,10 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); + } + if (wapEnabled && wapId != null) { // write-audit-publish is enabled for this table and job // stage the changes without changing the current snapshot diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 36fee7db5398..b714b805566e 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -56,7 +56,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.CallerWithCommitMetadata; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.util.PropertyUtil; @@ -193,8 +193,8 @@ private void commitOperation(SnapshotUpdate operation, String description) { extraSnapshotMetadata.forEach(operation::set); } - if (!CallerWithCommitMetadata.commitProperties().isEmpty()) { - CallerWithCommitMetadata.commitProperties().forEach(operation::set); + if (!CommitMetadata.commitProperties().isEmpty()) { + CommitMetadata.commitProperties().forEach(operation::set); } if (wapEnabled && wapId != null) { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java index 6d1ce1c9e20b..00a11c55a9d1 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -41,7 +41,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.LongMath; -import org.apache.iceberg.spark.CallerWithCommitMetadata; +import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; @@ -429,7 +429,7 @@ public void testExtraSnapshotMetadataWithSQL() throws InterruptedException, IOEx Thread writerThread = new Thread(() -> { Map properties = Maps.newHashMap(); properties.put("writer-thread", String.valueOf(Thread.currentThread().getName())); - CallerWithCommitMetadata.withCommitProperties(properties, () -> { + CommitMetadata.withCommitProperties(properties, () -> { spark.sql("INSERT INTO target VALUES (3, 'c'), (4, 'd')"); return 0; }, RuntimeException.class);