diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 985abe37eb0a..2be285e6d8a7 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -129,6 +129,9 @@ private TableProperties() { public static final String WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"; public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = Long.MAX_VALUE; + public static final String WRITE_PARTITIONED_FANOUT_ENABLED = "write.partitioned.fanout.enabled"; + public static final boolean WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT = false; + public static final String SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled"; public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false; diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java similarity index 84% rename from flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java rename to core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java index ad846974adcf..6150b2113c89 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java @@ -17,23 +17,19 @@ * under the License. */ -package org.apache.iceberg.flink.sink; +package org.apache.iceberg.io; import java.io.IOException; import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.io.BaseTaskWriter; -import org.apache.iceberg.io.FileAppenderFactory; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -abstract class PartitionedFanoutWriter extends BaseTaskWriter { +public abstract class PartitionedFanoutWriter extends BaseTaskWriter { private final Map writers = Maps.newHashMap(); - PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + protected PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 72e7a7941b77..3f8896f2f18f 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -42,6 +42,7 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.orc.ORC; diff --git a/site/docs/configuration.md b/site/docs/configuration.md index 7805db9c9ec1..62fafb15faa2 100644 --- a/site/docs/configuration.md +++ b/site/docs/configuration.md @@ -68,6 +68,7 @@ Iceberg tables support table properties to configure table behavior, like the de | write.summary.partition-limit | 0 | Includes partition-level summary stats in snapshot summaries if the changed partition count is less than this limit | | write.metadata.delete-after-commit.enabled | false | Controls whether to delete the oldest version metadata files after commit | | write.metadata.previous-versions-max | 100 | The max number of previous version metadata files to keep before deleting after commit | +| write.partitioned.fanout.enabled | false | Enables Partitioned-Fanout-Writer writes | ### Table behavior properties @@ -155,4 +156,5 @@ df.write | target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | | check-nullability | true | Sets the nullable check on fields | | snapshot-property._custom-key_ | null | Adds an entry with custom-key and corresponding value in the snapshot summary | +| partitioned.fanout.enabled | false | Overrides this table's write.partitioned.fanout.enabled | diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 37ca56c700a4..31e94b2cc834 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -40,6 +40,7 @@ import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.TaskContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.broadcast.Broadcast; @@ -104,10 +105,18 @@ private List rewriteDataForTask(CombinedScanTask task) throws Exceptio TaskWriter writer; if (spec.fields().isEmpty()) { - writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE); + writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io.value(), + Long.MAX_VALUE); + } else if (PropertyUtil.propertyAsBoolean(properties, + TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED, + TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) { + writer = new SparkPartitionedFanoutWriter( + spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema, + structType); } else { - writer = new SparkPartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, - schema, structType); + writer = new SparkPartitionedWriter( + spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema, + structType); } try { diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java new file mode 100644 index 000000000000..d38ae2f40316 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +public class SparkPartitionedFanoutWriter extends PartitionedFanoutWriter { + private final PartitionKey partitionKey; + private final InternalRowWrapper internalRowWrapper; + + public SparkPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.partitionKey = new PartitionKey(spec, schema); + this.internalRowWrapper = new InternalRowWrapper(sparkSchema); + } + + @Override + protected PartitionKey partition(InternalRow row) { + partitionKey.partition(internalRowWrapper.wrap(row)); + return partitionKey; + } +} diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java index e0c15221b8bc..575efcba47b5 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -49,6 +49,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.apache.iceberg.TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED; import static org.apache.iceberg.types.Types.NestedField.optional; @RunWith(Parameterized.class) @@ -327,51 +328,17 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws @Test public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOException { - File parent = temp.newFolder(format.toString()); - File location = new File(parent, "test"); - - HadoopTables tables = new HadoopTables(CONF); - PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); - Table table = tables.create(SCHEMA, spec, location.toString()); - - List expected = Lists.newArrayListWithCapacity(8000); - for (int i = 0; i < 2000; i++) { - expected.add(new SimpleRecord(i, "a")); - expected.add(new SimpleRecord(i, "b")); - expected.add(new SimpleRecord(i, "c")); - expected.add(new SimpleRecord(i, "d")); - } - - Dataset df = spark.createDataFrame(expected, SimpleRecord.class); - - df.select("id", "data").sort("data").write() - .format("iceberg") - .option("write-format", format.toString()) - .mode("append") - .option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger - .save(location.toString()); - - table.refresh(); - - Dataset result = spark.read() - .format("iceberg") - .load(location.toString()); + partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.NONE); + } - List actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); - Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); - Assert.assertEquals("Result rows should match", expected, actual); + @Test + public void testPartitionedFanoutCreateWithTargetFileSizeViaOption() throws IOException { + partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.TABLE); + } - List files = Lists.newArrayList(); - for (ManifestFile manifest : table.currentSnapshot().allManifests()) { - for (DataFile file : ManifestFiles.read(manifest, table.io())) { - files.add(file); - } - } - // TODO: ORC file now not support target file size - if (!format.equals(FileFormat.ORC)) { - Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); - Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); - } + @Test + public void testPartitionedFanoutCreateWithTargetFileSizeViaOption2() throws IOException { + partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType.JOB); } @Test @@ -505,4 +472,83 @@ public void testViewsReturnRecentResults() throws IOException { Assert.assertEquals("Number of rows should match", expected2.size(), actual2.size()); Assert.assertEquals("Result rows should match", expected2, actual2); } + + public void partitionedCreateWithTargetFileSizeViaOption(IcebergOptionsType option) + throws IOException { + File parent = temp.newFolder(format.toString()); + File location = new File(parent, "test"); + + HadoopTables tables = new HadoopTables(CONF); + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); + Table table = tables.create(SCHEMA, spec, location.toString()); + + List expected = Lists.newArrayListWithCapacity(8000); + for (int i = 0; i < 2000; i++) { + expected.add(new SimpleRecord(i, "a")); + expected.add(new SimpleRecord(i, "b")); + expected.add(new SimpleRecord(i, "c")); + expected.add(new SimpleRecord(i, "d")); + } + + Dataset df = spark.createDataFrame(expected, SimpleRecord.class); + + switch (option) { + case NONE: + df.select("id", "data").sort("data").write() + .format("iceberg") + .option("write-format", format.toString()) + .mode("append") + .option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger + .save(location.toString()); + break; + case TABLE: + table.updateProperties().set(WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); + df.select("id", "data").write() + .format("iceberg") + .option("write-format", format.toString()) + .mode("append") + .option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger + .save(location.toString()); + break; + case JOB: + df.select("id", "data").write() + .format("iceberg") + .option("write-format", format.toString()) + .mode("append") + .option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger + .option("partitioned.fanout.enabled", true) + .save(location.toString()); + break; + default: + break; + } + + table.refresh(); + + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + + List actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList(); + Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); + Assert.assertEquals("Result rows should match", expected, actual); + + List files = Lists.newArrayList(); + for (ManifestFile manifest : table.currentSnapshot().allManifests()) { + for (DataFile file : ManifestFiles.read(manifest, table.io())) { + files.add(file); + } + } + // TODO: ORC file now not support target file size + if (!format.equals(FileFormat.ORC)) { + Assert.assertEquals("Should have 8 DataFiles", 8, files.size()); + Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000)); + } + } + + public enum IcebergOptionsType { + NONE, + TABLE, + JOB + } } diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java index 6b6739a06a25..318dc6019bca 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -65,6 +65,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED; +import static org.apache.iceberg.TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; @@ -83,6 +85,7 @@ class Writer implements DataSourceWriter { private final Schema writeSchema; private final StructType dsSchema; private final Map extraSnapshotMetadata; + private final boolean partitionedFanoutEnabled; Writer(Table table, Broadcast io, Broadcast encryptionManager, DataSourceOptions options, boolean replacePartitions, String applicationId, Schema writeSchema, @@ -113,6 +116,10 @@ class Writer implements DataSourceWriter { long tableTargetFileSize = PropertyUtil.propertyAsLong( table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize); + + boolean tablePartitionedFanoutEnabled = PropertyUtil.propertyAsBoolean( + table.properties(), WRITE_PARTITIONED_FANOUT_ENABLED, WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT); + this.partitionedFanoutEnabled = options.getBoolean("partitioned.fanout.enabled", tablePartitionedFanoutEnabled); } private FileFormat getFileFormat(Map tableProperties, DataSourceOptions options) { @@ -131,7 +138,7 @@ private boolean isWapTable() { public DataWriterFactory createWriterFactory() { return new WriterFactory( table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize, - writeSchema, dsSchema); + writeSchema, dsSchema, partitionedFanoutEnabled); } @Override @@ -246,11 +253,12 @@ static class WriterFactory implements DataWriterFactory { private final long targetFileSize; private final Schema writeSchema; private final StructType dsSchema; + private final boolean partitionedFanoutEnabled; WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, Map properties, Broadcast io, Broadcast encryptionManager, long targetFileSize, - Schema writeSchema, StructType dsSchema) { + Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled) { this.spec = spec; this.format = format; this.locations = locations; @@ -260,6 +268,7 @@ static class WriterFactory implements DataWriterFactory { this.targetFileSize = targetFileSize; this.writeSchema = writeSchema; this.dsSchema = dsSchema; + this.partitionedFanoutEnabled = partitionedFanoutEnabled; } @Override @@ -270,9 +279,12 @@ public DataWriter createDataWriter(int partitionId, long taskId, lo if (spec.fields().isEmpty()) { return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); + } else if (partitionedFanoutEnabled) { + return new PartitionedFanout24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, + writeSchema, dsSchema); } else { - return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), - targetFileSize, writeSchema, dsSchema); + return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, + writeSchema, dsSchema); } } } @@ -307,4 +319,23 @@ public WriterCommitMessage commit() throws IOException { return new TaskCommit(complete()); } } + + private static class PartitionedFanout24Writer extends SparkPartitionedFanoutWriter + implements DataWriter { + + PartitionedFanout24Writer(PartitionSpec spec, FileFormat format, + SparkAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, + Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, schema, + sparkSchema); + } + + @Override + public WriterCommitMessage commit() throws IOException { + close(); + + return new TaskCommit(complete()); + } + } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index efbc319197f0..2c6e019dbbd3 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -72,6 +72,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED; +import static org.apache.iceberg.TableProperties.WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; @@ -89,6 +91,7 @@ class SparkWrite { private final Schema writeSchema; private final StructType dsSchema; private final Map extraSnapshotMetadata; + private final boolean partitionedFanoutEnabled; SparkWrite(Table table, Broadcast io, Broadcast encryptionManager, LogicalWriteInfo writeInfo, String applicationId, String wapId, @@ -113,6 +116,11 @@ class SparkWrite { long tableTargetFileSize = PropertyUtil.propertyAsLong( table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); this.targetFileSize = writeInfo.options().getLong("target-file-size-bytes", tableTargetFileSize); + + boolean tablePartitionedFanoutEnabled = PropertyUtil.propertyAsBoolean( + table.properties(), WRITE_PARTITIONED_FANOUT_ENABLED, WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT); + this.partitionedFanoutEnabled = writeInfo.options() + .getBoolean("partitioned.fanout.enabled", tablePartitionedFanoutEnabled); } BatchWrite asBatchAppend() { @@ -151,7 +159,7 @@ private boolean isWapTable() { private WriterFactory createWriterFactory() { return new WriterFactory( table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize, - writeSchema, dsSchema); + writeSchema, dsSchema, partitionedFanoutEnabled); } private void commitOperation(SnapshotUpdate operation, String description) { @@ -391,11 +399,12 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final long targetFileSize; private final Schema writeSchema; private final StructType dsSchema; + private final boolean partitionedFanoutEnabled; protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, Map properties, Broadcast io, Broadcast encryptionManager, long targetFileSize, - Schema writeSchema, StructType dsSchema) { + Schema writeSchema, StructType dsSchema, boolean partitionedFanoutEnabled) { this.spec = spec; this.format = format; this.locations = locations; @@ -405,6 +414,7 @@ protected WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider this.targetFileSize = targetFileSize; this.writeSchema = writeSchema; this.dsSchema = dsSchema; + this.partitionedFanoutEnabled = partitionedFanoutEnabled; } @Override @@ -419,6 +429,9 @@ public DataWriter createWriter(int partitionId, long taskId, long e SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema); if (spec.fields().isEmpty()) { return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); + } else if (partitionedFanoutEnabled) { + return new PartitionedFanout3Writer( + spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema, dsSchema); } else { return new Partitioned3Writer( spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema, dsSchema); @@ -455,4 +468,20 @@ public WriterCommitMessage commit() throws IOException { return new TaskCommit(complete()); } } + + private static class PartitionedFanout3Writer extends SparkPartitionedFanoutWriter + implements DataWriter { + PartitionedFanout3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema); + } + + @Override + public WriterCommitMessage commit() throws IOException { + this.close(); + + return new TaskCommit(complete()); + } + } }