Skip to content

Commit 5d65f8e

Browse files
Spark: Don't create empty partition replace operations
When attempting to insert overwrite with an empty dataset we would previously throw an error. This patch causes spark to skip any no-op partition replacement operations.
1 parent b347252 commit 5d65f8e

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.iceberg.Table;
3434
import org.apache.iceberg.TableProperties;
3535
import org.apache.iceberg.hadoop.HadoopTables;
36+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
3637
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3738
import org.apache.iceberg.spark.SparkWriteOptions;
3839
import org.apache.iceberg.types.Types;
@@ -188,6 +189,50 @@ public void testAppend() throws IOException {
188189
Assert.assertEquals("Result rows should match", expected, actual);
189190
}
190191

192+
@Test
193+
public void testEmptyOverwrite() throws IOException {
194+
File parent = temp.newFolder(format.toString());
195+
File location = new File(parent, "test");
196+
197+
HadoopTables tables = new HadoopTables(CONF);
198+
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build();
199+
Table table = tables.create(SCHEMA, spec, location.toString());
200+
201+
List<SimpleRecord> records = Lists.newArrayList(
202+
new SimpleRecord(1, "a"),
203+
new SimpleRecord(2, "b"),
204+
new SimpleRecord(3, "c")
205+
);
206+
207+
List<SimpleRecord> expected = records;
208+
Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
209+
210+
df.select("id", "data").write()
211+
.format("iceberg")
212+
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
213+
.mode(SaveMode.Append)
214+
.save(location.toString());
215+
216+
Dataset<Row> empty = spark.createDataFrame(ImmutableList.of(), SimpleRecord.class);
217+
// overwrite with 2*id to replace record 2, append 4 and 6
218+
empty.select("id", "data").write()
219+
.format("iceberg")
220+
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
221+
.mode(SaveMode.Overwrite)
222+
.option("overwrite-mode", "dynamic")
223+
.save(location.toString());
224+
225+
table.refresh();
226+
227+
Dataset<Row> result = spark.read()
228+
.format("iceberg")
229+
.load(location.toString());
230+
231+
List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
232+
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
233+
Assert.assertEquals("Result rows should match", expected, actual);
234+
}
235+
191236
@Test
192237
public void testOverwrite() throws IOException {
193238
File parent = temp.newFolder(format.toString());

spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,16 @@ public void commit(WriterCommitMessage[] messages) {
262262
private class DynamicOverwrite extends BaseBatchWrite {
263263
@Override
264264
public void commit(WriterCommitMessage[] messages) {
265+
Iterable<DataFile> files = files(messages);
266+
if (Iterables.size(files) == 0) {
267+
LOG.info("Dynamic overwrite is empty, skipping commit");
268+
return;
269+
}
270+
265271
ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
266272

267273
int numFiles = 0;
268-
for (DataFile file : files(messages)) {
274+
for (DataFile file : files) {
269275
numFiles += 1;
270276
dynamicOverwrite.addFile(file);
271277
}

0 commit comments

Comments
 (0)