diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java new file mode 100644 index 000000000000..fd3f6326b485 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutEqualityDeleteWriter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * An equality delete writer capable of writing to multiple specs and partitions that keeps + * delete writers for each seen spec/partition pair open until this writer is closed. + */ +public class FanoutEqualityDeleteWriter extends FanoutWriter { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List deleteFiles; + + public FanoutEqualityDeleteWriter( + FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + } + + @Override + protected FileWriter newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition); + } else { + return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + } + + @Override + protected void addResult(DeleteWriteResult result) { + Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files"); + deleteFiles.addAll(result.deleteFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java new file mode 100644 index 000000000000..5a85baf7051a --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FanoutPositionDeleteWriter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.util.List; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.CharSequenceSet; + +/** + * A position delete writer capable of writing to multiple specs and partitions that keeps + * delete writers for each seen spec/partition pair open until this writer is closed. + */ +public class FanoutPositionDeleteWriter extends FanoutWriter, DeleteWriteResult> { + + private final FileWriterFactory writerFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final FileFormat fileFormat; + private final long targetFileSizeInBytes; + private final List deleteFiles; + private final CharSequenceSet referencedDataFiles; + + public FanoutPositionDeleteWriter( + FileWriterFactory writerFactory, OutputFileFactory fileFactory, + FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) { + this.writerFactory = writerFactory; + this.fileFactory = fileFactory; + this.io = io; + this.fileFormat = fileFormat; + this.targetFileSizeInBytes = targetFileSizeInBytes; + this.deleteFiles = Lists.newArrayList(); + this.referencedDataFiles = CharSequenceSet.empty(); + } + + @Override + protected FileWriter, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { + // TODO: support ORC rolling writers + if (fileFormat == FileFormat.ORC) { + EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition); + return writerFactory.newPositionDeleteWriter(outputFile, spec, partition); + } else { + return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); + } + } + + @Override + protected void addResult(DeleteWriteResult result) { + deleteFiles.addAll(result.deleteFiles()); + referencedDataFiles.addAll(result.referencedDataFiles()); + } + + @Override + protected DeleteWriteResult aggregatedResult() { + return new DeleteWriteResult(deleteFiles, referencedDataFiles); + } +}