From d172c8d3598a9a969bc5430f54c100308a913bac Mon Sep 17 00:00:00 2001 From: huzheng Date: Tue, 1 Dec 2020 16:31:32 +0800 Subject: [PATCH 1/5] Core: Add SortedPosDeleteWriter --- .../iceberg/io/SortedPosDeleteWriter.java | 190 +++++++++++ .../io/TestGenericSortedPosDeleteWriter.java | 315 ++++++++++++++++++ 2 files changed, 505 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java create mode 100644 data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java new file mode 100644 index 000000000000..e9ec893da5e1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.CharSequenceWrapper; + +class SortedPosDeleteWriter implements Closeable { + private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L; + + private final Map>> posDeletes = Maps.newHashMap(); + private final List completedFiles = Lists.newArrayList(); + private final Set referencedDataFiles = CharSequenceSet.empty(); + private final PosValueComparator posValueComparator = new PosValueComparator<>(); + private final CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(null); + + private final FileAppenderFactory appenderFactory; + private final OutputFileFactory fileFactory; + private final FileFormat format; + private final PartitionKey partition; + private final long recordsNumThreshold; + + private int records = 0; + + SortedPosDeleteWriter(FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileFormat format, + PartitionKey partition, + long recordsNumThreshold) { + this.appenderFactory = appenderFactory; + this.fileFactory = fileFactory; + this.format = format; + this.partition = partition; + this.recordsNumThreshold = recordsNumThreshold; + } + + SortedPosDeleteWriter(FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileFormat format, + PartitionKey partition) { + this(appenderFactory, fileFactory, format, partition, DEFAULT_RECORDS_NUM_THRESHOLD); + } + + public void delete(CharSequence path, long pos) { + delete(path, pos, null); + } + + public void delete(CharSequence path, long pos, T row) { + posDeletes.compute(CharSequenceWrapper.wrap(path), (k, v) -> { + if (v == null) { + return Lists.newArrayList(PosValue.of(pos, row)); + } else { + v.add(PosValue.of(pos, row)); + return v; + } + }); + + records += 1; + + // TODO Flush buffer based on the policy that checking whether whole heap memory size exceed the threshold. + if (records >= recordsNumThreshold) { + flushBuffer(); + } + } + + public List complete() throws IOException { + close(); + + return completedFiles; + } + + public Set referencedDataFiles() { + return referencedDataFiles; + } + + @Override + public void close() throws IOException { + flushBuffer(); + } + + private void flushBuffer() { + if (posDeletes.isEmpty()) { + return; + } + + // Create a new output file. + EncryptedOutputFile outputFile; + if (partition == null) { + outputFile = fileFactory.newOutputFile(); + } else { + outputFile = fileFactory.newOutputFile(partition); + } + + PositionDeleteWriter writer = appenderFactory.newPosDeleteWriter(outputFile, format, partition); + try (PositionDeleteWriter closeableWriter = writer) { + // Sort all the paths. + CharSequence[] paths = new CharSequence[posDeletes.size()]; + int index = 0; + for (CharSequenceWrapper charSequenceWrapper : posDeletes.keySet()) { + paths[index] = charSequenceWrapper.get(); + index += 1; + } + Arrays.sort(paths, Comparators.charSequences()); + + // Write all the sorted triples. + for (CharSequence path : paths) { + List> positions = posDeletes.get(wrapper.set(path)); + positions.sort(posValueComparator); + + positions.forEach(posValue -> closeableWriter.delete(path, posValue.pos(), posValue.row())); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to write the sorted path/pos pairs to pos-delete file: " + + outputFile.encryptingOutputFile().location(), e); + } + + // Clear the buffered pos-deletions. + posDeletes.clear(); + records = 0; + + // Add the referenced data files. + referencedDataFiles.addAll(writer.referencedDataFiles()); + + // Add the completed delete files. + completedFiles.add(writer.toDeleteFile()); + } + + private static class PosValue { + private final long pos; + private final R row; + + static PosValue of(long pos, R row) { + return new PosValue<>(pos, row); + } + + private PosValue(long pos, R row) { + this.pos = pos; + this.row = row; + } + + long pos() { + return pos; + } + + R row() { + return row; + } + } + + private static class PosValueComparator implements Comparator> { + + @Override + public int compare(PosValue o1, PosValue o2) { + return Long.compare(o1.pos, o2.pos); + } + } +} diff --git a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java new file mode 100644 index 000000000000..92b40d3e7e18 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableTestBase; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestGenericSortedPosDeleteWriter extends TableTestBase { + private static final int FORMAT_V2 = 2; + + private final FileFormat format; + + private OutputFileFactory fileFactory; + private Record gRecord; + + @Parameterized.Parameters(name = "FileFormat={0}") + public static Object[] parameters() { + return new Object[][] { + new Object[] {"avro"}, + new Object[] {"parquet"}, + }; + } + + public TestGenericSortedPosDeleteWriter(String fileFormat) { + super(FORMAT_V2); + this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + } + + @Before + public void setupTable() throws IOException { + this.tableDir = temp.newFolder(); + Assert.assertTrue(tableDir.delete()); + + this.metadataDir = new File(tableDir, "metadata"); + this.table = create(SCHEMA, PartitionSpec.unpartitioned()); + this.gRecord = GenericRecord.create(SCHEMA); + + this.fileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(), + table.encryption(), 1, 1); + + table.updateProperties() + .defaultFormat(format) + .commit(); + } + + private EncryptedOutputFile createEncryptedOutputFile() { + return fileFactory.newOutputFile(); + } + + private DataFile prepareDataFile(FileAppenderFactory appenderFactory, List rowSet) + throws IOException { + DataWriter writer = appenderFactory.newDataWriter(createEncryptedOutputFile(), format, null); + try (DataWriter closeableWriter = writer) { + for (Record record : rowSet) { + closeableWriter.add(record); + } + } + + return writer.toDataFile(); + } + + private Record createRow(Integer id, String data) { + Record row = gRecord.copy(); + row.setField("id", id); + row.setField("data", data); + return row; + } + + private StructLikeSet expectedRowSet(Iterable records) { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + records.forEach(set::add); + return set; + } + + private StructLikeSet actualRowSet(String... columns) throws IOException { + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { + reader.forEach(set::add); + } + return set; + } + + @Test + public void testSortedPosDelete() throws IOException { + List rowSet = Lists.newArrayList( + createRow(0, "aaa"), + createRow(1, "bbb"), + createRow(2, "ccc"), + createRow(3, "ddd"), + createRow(4, "eee") + ); + + FileAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + null, null, null); + DataFile dataFile = prepareDataFile(appenderFactory, rowSet); + + SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, null, 100); + try (SortedPosDeleteWriter closeableWriter = writer) { + for (int index = 0; index < rowSet.size(); index += 2) { + closeableWriter.delete(dataFile.path(), index); + } + } + + List deleteFiles = writer.complete(); + Assert.assertEquals(1, deleteFiles.size()); + DeleteFile deleteFile = deleteFiles.get(0); + + // Check whether the path-pos pairs are sorted as expected. + Schema pathPosSchema = DeleteSchemaUtil.pathPosSchema(); + Record record = GenericRecord.create(pathPosSchema); + List expectedDeletes = Lists.newArrayList( + record.copy("file_path", dataFile.path(), "pos", 0L), + record.copy("file_path", dataFile.path(), "pos", 2L), + record.copy("file_path", dataFile.path(), "pos", 4L) + ); + Assert.assertEquals(expectedDeletes, readRecordsAsList(pathPosSchema, deleteFile.path())); + + table.newRowDelta() + .addRows(dataFile) + .addDeletes(deleteFiles.get(0)) + .validateDataFilesExist(writer.referencedDataFiles()) + .validateDeletedFiles() + .commit(); + + List expectedData = Lists.newArrayList( + createRow(1, "bbb"), + createRow(3, "ddd") + ); + Assert.assertEquals("Should have the expected records", expectedRowSet(expectedData), actualRowSet("*")); + } + + @Test + public void testSortedPosDeleteWithRow() throws IOException { + List rowSet = Lists.newArrayList( + createRow(0, "aaa"), + createRow(1, "bbb"), + createRow(2, "ccc"), + createRow(3, "ddd"), + createRow(4, "eee") + ); + + FileAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + null, null, table.schema()); + DataFile dataFile = prepareDataFile(appenderFactory, rowSet); + + SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, null, 100); + try (SortedPosDeleteWriter closeableWriter = writer) { + for (int index = 0; index < rowSet.size(); index += 2) { + closeableWriter.delete(dataFile.path(), index, rowSet.get(index)); // Write deletes with row. + } + } + + List deleteFiles = writer.complete(); + Assert.assertEquals(1, deleteFiles.size()); + DeleteFile deleteFile = deleteFiles.get(0); + + // Check whether the path-pos pairs are sorted as expected. + Schema pathPosSchema = DeleteSchemaUtil.posDeleteSchema(table.schema()); + Record record = GenericRecord.create(pathPosSchema); + List expectedDeletes = Lists.newArrayList( + record.copy("file_path", dataFile.path(), "pos", 0L, "row", createRow(0, "aaa")), + record.copy("file_path", dataFile.path(), "pos", 2L, "row", createRow(2, "ccc")), + record.copy("file_path", dataFile.path(), "pos", 4L, "row", createRow(4, "eee")) + ); + Assert.assertEquals(expectedDeletes, readRecordsAsList(pathPosSchema, deleteFile.path())); + + table.newRowDelta() + .addRows(dataFile) + .addDeletes(deleteFiles.get(0)) + .validateDataFilesExist(writer.referencedDataFiles()) + .validateDeletedFiles() + .commit(); + + List expectedData = Lists.newArrayList( + createRow(1, "bbb"), + createRow(3, "ddd") + ); + Assert.assertEquals("Should have the expected records", expectedRowSet(expectedData), actualRowSet("*")); + } + + @Test + public void testMultipleFlush() throws IOException { + FileAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + null, null, null); + + // It will produce 5 record lists, each list will write into a separate data file: + // The 1th file has: <0 , val-0> , <1 , val-1> , ... , <99 , val-99> + // The 2th file has: <100, val-100> , <101, val-101> , ... , <199, val-199> + // The 3th file has: <200, val-200> , <201, val-201> , ... , <299, val-299> + // The 4th file has: <300, val-300> , <301, val-301> , ... , <399, val-399> + // The 5th file has: <400, val-400> , <401, val-401> , ... , <499, val-499> + List dataFiles = Lists.newArrayList(); + for (int fileIndex = 0; fileIndex < 5; fileIndex++) { + List recordList = Lists.newLinkedList(); + for (int recordIndex = 0; recordIndex < 100; recordIndex++) { + int id = fileIndex * 100 + recordIndex; + recordList.add(createRow(id, String.format("val-%s", id))); + } + + // Write the records and generate the data file. + dataFiles.add(prepareDataFile(appenderFactory, recordList)); + } + + // Commit those data files to iceberg table. + RowDelta rowDelta = table.newRowDelta(); + dataFiles.forEach(rowDelta::addRows); + rowDelta.commit(); + + SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, null, 50); + try (SortedPosDeleteWriter closeableWriter = writer) { + for (int pos = 0; pos < 100; pos++) { + for (int fileIndex = 0; fileIndex < 5; fileIndex++) { + closeableWriter.delete(dataFiles.get(fileIndex).path(), pos); + } + } + } + + List deleteFiles = writer.complete(); + Assert.assertEquals(10, deleteFiles.size()); + + Schema pathPosSchema = DeleteSchemaUtil.pathPosSchema(); + Record record = GenericRecord.create(pathPosSchema); + for (int deleteFileIndex = 0; deleteFileIndex < 10; deleteFileIndex++) { + List expectedDeletes = Lists.newArrayList(); + for (int dataFileIndex = 0; dataFileIndex < 5; dataFileIndex++) { + DataFile dataFile = dataFiles.get(dataFileIndex); + for (long pos = deleteFileIndex * 10; pos < deleteFileIndex * 10 + 10; pos++) { + expectedDeletes.add(record.copy("file_path", dataFile.path(), "pos", pos)); + } + } + + DeleteFile deleteFile = deleteFiles.get(deleteFileIndex); + Assert.assertEquals(expectedDeletes, readRecordsAsList(pathPosSchema, deleteFile.path())); + } + + rowDelta = table.newRowDelta(); + deleteFiles.forEach(rowDelta::addDeletes); + rowDelta.commit(); + + Assert.assertEquals("Should have no record.", expectedRowSet(ImmutableList.of()), actualRowSet("*")); + } + + private List readRecordsAsList(Schema schema, CharSequence path) throws IOException { + CloseableIterable iterable; + + InputFile inputFile = Files.localInput(path.toString()); + switch (format) { + case PARQUET: + iterable = Parquet.read(inputFile) + .project(schema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build(); + break; + + case AVRO: + iterable = Avro.read(inputFile) + .project(schema) + .createReaderFunc(DataReader::create) + .build(); + break; + + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + + try (CloseableIterable closeableIterable = iterable) { + return Lists.newArrayList(closeableIterable); + } + } +} From d90e3ca5c1fce787297bcace51457ec5635ec51d Mon Sep 17 00:00:00 2001 From: huzheng Date: Fri, 4 Dec 2020 12:44:35 +0800 Subject: [PATCH 2/5] Minor fixes --- .../iceberg/io/SortedPosDeleteWriter.java | 29 +++++-------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index e9ec893da5e1..50872a97d762 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -34,6 +33,8 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Multimap; +import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; @@ -41,10 +42,10 @@ class SortedPosDeleteWriter implements Closeable { private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L; - private final Map>> posDeletes = Maps.newHashMap(); + private final Multimap> posDeletes = + Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); private final List completedFiles = Lists.newArrayList(); private final Set referencedDataFiles = CharSequenceSet.empty(); - private final PosValueComparator posValueComparator = new PosValueComparator<>(); private final CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(null); private final FileAppenderFactory appenderFactory; @@ -79,15 +80,7 @@ public void delete(CharSequence path, long pos) { } public void delete(CharSequence path, long pos, T row) { - posDeletes.compute(CharSequenceWrapper.wrap(path), (k, v) -> { - if (v == null) { - return Lists.newArrayList(PosValue.of(pos, row)); - } else { - v.add(PosValue.of(pos, row)); - return v; - } - }); - + posDeletes.put(CharSequenceWrapper.wrap(path), PosValue.of(pos, row)); records += 1; // TODO Flush buffer based on the policy that checking whether whole heap memory size exceed the threshold. @@ -137,8 +130,8 @@ private void flushBuffer() { // Write all the sorted triples. for (CharSequence path : paths) { - List> positions = posDeletes.get(wrapper.set(path)); - positions.sort(posValueComparator); + List> positions = (List>) posDeletes.get(wrapper.set(path)); + positions.sort(Comparator.comparingLong(PosValue::pos)); positions.forEach(posValue -> closeableWriter.delete(path, posValue.pos(), posValue.row())); } @@ -179,12 +172,4 @@ R row() { return row; } } - - private static class PosValueComparator implements Comparator> { - - @Override - public int compare(PosValue o1, PosValue o2) { - return Long.compare(o1.pos, o2.pos); - } - } } From e35955ff1b0eca24aa345f71d4cc9fed3bfa64e6 Mon Sep 17 00:00:00 2001 From: huzheng Date: Fri, 4 Dec 2020 13:01:31 +0800 Subject: [PATCH 3/5] Improve unit tests. --- .../org/apache/iceberg/io/SortedPosDeleteWriter.java | 9 +++------ .../iceberg/io/TestGenericSortedPosDeleteWriter.java | 8 ++++---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 50872a97d762..180f2d3de366 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -22,7 +22,6 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -120,13 +119,11 @@ private void flushBuffer() { PositionDeleteWriter writer = appenderFactory.newPosDeleteWriter(outputFile, format, partition); try (PositionDeleteWriter closeableWriter = writer) { // Sort all the paths. - CharSequence[] paths = new CharSequence[posDeletes.size()]; - int index = 0; + List paths = Lists.newArrayListWithCapacity(posDeletes.keySet().size()); for (CharSequenceWrapper charSequenceWrapper : posDeletes.keySet()) { - paths[index] = charSequenceWrapper.get(); - index += 1; + paths.add(charSequenceWrapper.get()); } - Arrays.sort(paths, Comparators.charSequences()); + paths.sort(Comparators.charSequences()); // Write all the sorted triples. for (CharSequence path : paths) { diff --git a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java index 92b40d3e7e18..a455778d8c4f 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java @@ -141,7 +141,7 @@ public void testSortedPosDelete() throws IOException { SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, null, 100); try (SortedPosDeleteWriter closeableWriter = writer) { - for (int index = 0; index < rowSet.size(); index += 2) { + for (int index = rowSet.size() - 1; index >= 0; index -= 2) { closeableWriter.delete(dataFile.path(), index); } } @@ -190,7 +190,7 @@ public void testSortedPosDeleteWithRow() throws IOException { SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, null, 100); try (SortedPosDeleteWriter closeableWriter = writer) { - for (int index = 0; index < rowSet.size(); index += 2) { + for (int index = rowSet.size() - 1; index >= 0; index -= 2) { closeableWriter.delete(dataFile.path(), index, rowSet.get(index)); // Write deletes with row. } } @@ -253,8 +253,8 @@ public void testMultipleFlush() throws IOException { SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, null, 50); try (SortedPosDeleteWriter closeableWriter = writer) { - for (int pos = 0; pos < 100; pos++) { - for (int fileIndex = 0; fileIndex < 5; fileIndex++) { + for (int pos = 0; pos < 100; pos++) { // Delete in DESC order. + for (int fileIndex = 4; fileIndex >= 0; fileIndex--) { closeableWriter.delete(dataFiles.get(fileIndex).path(), pos); } } From f104c335ea2be4034e76539e0240d67cfa36bb1e Mon Sep 17 00:00:00 2001 From: huzheng Date: Sun, 6 Dec 2020 20:17:10 +0800 Subject: [PATCH 4/5] Addressing comments. --- .../java/org/apache/iceberg/avro/Avro.java | 14 ++------ .../iceberg/io/SortedPosDeleteWriter.java | 34 +++++++++++-------- .../io/TestGenericSortedPosDeleteWriter.java | 27 ++++++++++++++- 3 files changed, 48 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 584c28070c9b..5a34eedc80d5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -38,7 +38,6 @@ import org.apache.avro.io.Encoder; import org.apache.avro.specific.SpecificData; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.StructLike; @@ -47,13 +46,13 @@ import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.ArrayUtil; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; @@ -302,20 +301,13 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { if (rowSchema != null && createWriterFunc != null) { // the appender uses the row schema wrapped with position fields - appenderBuilder.schema(new org.apache.iceberg.Schema( - MetadataColumns.DELETE_FILE_PATH, - MetadataColumns.DELETE_FILE_POS, - NestedField.optional( - MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(), - MetadataColumns.DELETE_FILE_ROW_DOC))); + appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema)); appenderBuilder.createWriterFunc( avroSchema -> new PositionAndRowDatumWriter<>(createWriterFunc.apply(avroSchema))); } else { - appenderBuilder.schema(new org.apache.iceberg.Schema( - MetadataColumns.DELETE_FILE_PATH, - MetadataColumns.DELETE_FILE_POS)); + appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema()); appenderBuilder.createWriterFunc(ignored -> new PositionDatumWriter()); } diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java index 180f2d3de366..c9f8c5a155f5 100644 --- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java @@ -24,6 +24,7 @@ import java.io.UncheckedIOException; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -32,8 +33,6 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Multimap; -import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; @@ -41,8 +40,7 @@ class SortedPosDeleteWriter implements Closeable { private static final long DEFAULT_RECORDS_NUM_THRESHOLD = 100_000L; - private final Multimap> posDeletes = - Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); + private final Map>> posDeletes = Maps.newHashMap(); private final List completedFiles = Lists.newArrayList(); private final Set referencedDataFiles = CharSequenceSet.empty(); private final CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(null); @@ -79,12 +77,18 @@ public void delete(CharSequence path, long pos) { } public void delete(CharSequence path, long pos, T row) { - posDeletes.put(CharSequenceWrapper.wrap(path), PosValue.of(pos, row)); + List> posRows = posDeletes.get(wrapper.set(path)); + if (posRows != null) { + posRows.add(PosRow.of(pos, row)); + } else { + posDeletes.put(CharSequenceWrapper.wrap(path), Lists.newArrayList(PosRow.of(pos, row))); + } + records += 1; // TODO Flush buffer based on the policy that checking whether whole heap memory size exceed the threshold. if (records >= recordsNumThreshold) { - flushBuffer(); + flushDeletes(); } } @@ -100,10 +104,10 @@ public Set referencedDataFiles() { @Override public void close() throws IOException { - flushBuffer(); + flushDeletes(); } - private void flushBuffer() { + private void flushDeletes() { if (posDeletes.isEmpty()) { return; } @@ -127,10 +131,10 @@ private void flushBuffer() { // Write all the sorted triples. for (CharSequence path : paths) { - List> positions = (List>) posDeletes.get(wrapper.set(path)); - positions.sort(Comparator.comparingLong(PosValue::pos)); + List> positions = posDeletes.get(wrapper.set(path)); + positions.sort(Comparator.comparingLong(PosRow::pos)); - positions.forEach(posValue -> closeableWriter.delete(path, posValue.pos(), posValue.row())); + positions.forEach(posRow -> closeableWriter.delete(path, posRow.pos(), posRow.row())); } } catch (IOException e) { throw new UncheckedIOException("Failed to write the sorted path/pos pairs to pos-delete file: " + @@ -148,15 +152,15 @@ private void flushBuffer() { completedFiles.add(writer.toDeleteFile()); } - private static class PosValue { + private static class PosRow { private final long pos; private final R row; - static PosValue of(long pos, R row) { - return new PosValue<>(pos, row); + static PosRow of(long pos, R row) { + return new PosRow<>(pos, row); } - private PosValue(long pos, R row) { + private PosRow(long pos, R row) { this.pos = pos; this.row = row; } diff --git a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java index a455778d8c4f..0ebea57f365e 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java @@ -174,6 +174,31 @@ public void testSortedPosDelete() throws IOException { Assert.assertEquals("Should have the expected records", expectedRowSet(expectedData), actualRowSet("*")); } + @Test + public void testSortedPosDeleteWithSchemaAndNullRow() throws IOException { + List rowSet = Lists.newArrayList( + createRow(0, "aaa"), + createRow(1, "bbb"), + createRow(2, "ccc") + ); + + // Create a FileAppenderFactory which requires pos-delete row schema. + FileAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema(), table.spec(), + null, null, table.schema()); + DataFile dataFile = prepareDataFile(appenderFactory, rowSet); + + try (SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, + null, 100)) { + boolean caughtError = false; + try { + writer.delete(dataFile.path(), 0L); + } catch (Exception e) { + caughtError = true; + } + Assert.assertTrue("Should fail because the appender are required non-null rows to write", caughtError); + } + } + @Test public void testSortedPosDeleteWithRow() throws IOException { List rowSet = Lists.newArrayList( @@ -253,7 +278,7 @@ public void testMultipleFlush() throws IOException { SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, null, 50); try (SortedPosDeleteWriter closeableWriter = writer) { - for (int pos = 0; pos < 100; pos++) { // Delete in DESC order. + for (int pos = 0; pos < 100; pos++) { for (int fileIndex = 4; fileIndex >= 0; fileIndex--) { closeableWriter.delete(dataFiles.get(fileIndex).path(), pos); } From e3b1f2a29668e777780395995c5fd7435e3587b6 Mon Sep 17 00:00:00 2001 From: huzheng Date: Mon, 7 Dec 2020 11:51:16 +0800 Subject: [PATCH 5/5] Fix broken unit tests. --- .../io/TestGenericSortedPosDeleteWriter.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java index 0ebea57f365e..ac2d937a3457 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java @@ -187,16 +187,14 @@ public void testSortedPosDeleteWithSchemaAndNullRow() throws IOException { null, null, table.schema()); DataFile dataFile = prepareDataFile(appenderFactory, rowSet); - try (SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, - null, 100)) { - boolean caughtError = false; - try { - writer.delete(dataFile.path(), 0L); - } catch (Exception e) { - caughtError = true; - } - Assert.assertTrue("Should fail because the appender are required non-null rows to write", caughtError); + SortedPosDeleteWriter writer = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, null, 1); + boolean caughtError = false; + try { + writer.delete(dataFile.path(), 0L); + } catch (Exception e) { + caughtError = true; } + Assert.assertTrue("Should fail because the appender are required non-null rows to write", caughtError); } @Test