From 9813d823ebd709b5cebf96b322bd78a9e7cc171b Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 14 Nov 2023 18:25:35 -0600 Subject: [PATCH 01/12] Initial commit --- .../parquet/base/ColumnChunkReader.java | 13 ++ .../parquet/base/ColumnChunkReaderImpl.java | 32 +++- .../parquet/base/ColumnPageReaderImpl.java | 9 + .../parquet/base/ColumnWriterImpl.java | 17 +- .../parquet/base/ParquetFileReader.java | 7 +- .../parquet/base/RowGroupReaderImpl.java | 10 +- .../table/location/ParquetColumnLocation.java | 14 +- .../table/location/ParquetTableLocation.java | 4 +- .../parquet/table/metadata/TableInfo.java | 4 +- .../table/pagestore/ColumnChunkPageStore.java | 71 ++++++-- .../FixedPageSizeColumnChunkPageStore.java | 9 +- .../OffsetIndexBasedColumnChunkPageStore.java | 154 ++++++++++++++++++ .../VariablePageSizeColumnChunkPageStore.java | 2 +- .../table/ParquetTableReadWriteTest.java | 91 ++++++++++- .../ReferenceParquetArrayData.parquet | 3 + .../resources/ReferenceParquetData.parquet | 3 + .../ReferenceParquetVectorData.parquet | 3 + 17 files changed, 403 insertions(+), 43 deletions(-) create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java create mode 100644 extensions/parquet/table/src/test/resources/ReferenceParquetArrayData.parquet create mode 100644 extensions/parquet/table/src/test/resources/ReferenceParquetData.parquet create mode 100644 extensions/parquet/table/src/test/resources/ReferenceParquetVectorData.parquet diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index b65c7502471..81f7d4e43c8 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -4,7 +4,9 @@ package io.deephaven.parquet.base; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.schema.PrimitiveType; +import org.jetbrains.annotations.Nullable; import java.io.IOException; import java.util.Iterator; @@ -32,9 +34,17 @@ public interface ColumnChunkReader { */ int getMaxRl(); + default @Nullable OffsetIndex getOffsetIndex() { + return null; + } + interface ColumnPageReaderIterator extends Iterator, AutoCloseable { @Override void close() throws IOException; + + default ColumnPageReader getPageReader(int pageNum) { + return null; + }; } /** @@ -69,4 +79,7 @@ public int getMaxId() { } PrimitiveType getType(); + + @Nullable + String getVersion(); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index f56216ebe86..918e2ecea0e 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -46,10 +46,12 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { private final PageMaterializer.Factory nullMaterializerFactory; private Path filePath; + private final long numRows; + private final String version; - ColumnChunkReaderImpl( - ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, - Path rootPath, MessageType type, OffsetIndex offsetIndex, List fieldTypes) { + ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, Path rootPath, + MessageType type, OffsetIndex offsetIndex, List fieldTypes, final long numRows, + final String version) { this.channelsProvider = channelsProvider; this.columnChunk = columnChunk; this.rootPath = rootPath; @@ -65,6 +67,8 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { this.fieldTypes = fieldTypes; this.dictionarySupplier = new LazyCachingSupplier<>(this::getDictionary); this.nullMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType().getPrimitiveTypeName()); + this.numRows = numRows; + this.version = version; } @Override @@ -74,7 +78,7 @@ public int getPageFixedSize() { @Override public long numRows() { - return numValues(); + return numRows; } @Override @@ -87,6 +91,10 @@ public int getMaxRl() { return path.getMaxRepetitionLevel(); } + public final OffsetIndex getOffsetIndex() { + return offsetIndex; + } + @Override public ColumnPageReaderIterator getPageIterator() { final long dataPageOffset = columnChunk.meta_data.getData_page_offset(); @@ -166,6 +174,11 @@ public PrimitiveType getType() { return path.getPrimitiveType(); } + @Override + public String getVersion() { + return version; + } + @NotNull private Dictionary readDictionary(ReadableByteChannel file) throws IOException { // explicitly not closing this, caller is responsible @@ -300,6 +313,17 @@ nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset( return columnPageReader; } + @Override + public ColumnPageReader getPageReader(final int pageNum) { + if (pageNum > offsetIndex.getPageCount()) { + throw new RuntimeException( + "pageNum=" + pageNum + " > offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); + } + final int numValues = -1; // Will be populated properly when we read the page header + return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, + path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null, numValues); + } + @Override public void close() throws IOException {} } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 142499f1fac..c57acbb4964 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -577,6 +577,15 @@ private void ensureNumValues() throws IOException { if (numValues >= 0) { return; } + if (pageHeader == null) { + try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { + readChannel.position(offset); + ensurePageHeader(readChannel); + // Above will automatically populate numValues + Assert.geq(numValues, "numValues", 0); + return; + } + } Assert.neqNull(pageHeader, "pageHeader"); switch (pageHeader.type) { case DATA_PAGE: diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index e17107a648b..ad8bced463f 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -102,7 +102,7 @@ public void addPageNoNulls(@NotNull final Object pageData, dlEncoder.writeInt(1); // TODO implement a bulk RLE writer } } - writePage(bulkWriter.getByteBufferView(), valuesCount); + writePage(bulkWriter.getByteBufferView(), valuesCount, valuesCount); bulkWriter.reset(); } @@ -210,7 +210,7 @@ public void addPage(@NotNull final Object pageData, initWriter(); // noinspection unchecked bulkWriter.writeBulkFilterNulls(pageData, dlEncoder, valuesCount, statistics); - writePage(bulkWriter.getByteBufferView(), valuesCount); + writePage(bulkWriter.getByteBufferView(), valuesCount, valuesCount); bulkWriter.reset(); } @@ -229,7 +229,7 @@ public void addVectorPage( // noinspection unchecked final int valueCount = bulkWriter.writeBulkVector(pageData, repeatCount, rlEncoder, dlEncoder, nonNullValueCount, statistics); - writePage(bulkWriter.getByteBufferView(), valueCount); + writePage(bulkWriter.getByteBufferView(), valueCount, repeatCount.limit()); bulkWriter.reset(); } @@ -313,7 +313,8 @@ public void writePageV2( compressedData.writeAllTo(bufferedOutput); } - private void writePage(final BytesInput bytes, final int valueCount, final Encoding valuesEncoding) + private void writePage(final BytesInput bytes, final int valueCount, final long rowCount, + final Encoding valuesEncoding) throws IOException { final long initialOffset = bufferedOutput.position(); if (firstDataPageOffset == -1) { @@ -354,7 +355,7 @@ private void writePage(final BytesInput bytes, final int valueCount, final Encod this.pageCount += 1; compressedBytes.writeAllTo(bufferedOutput); - offsetIndexBuilder.add((int) (bufferedOutput.position() - initialOffset), valueCount); + offsetIndexBuilder.add((int) (bufferedOutput.position() - initialOffset), rowCount); encodings.add(valuesEncoding); encodingStatsBuilder.addDataEncoding(valuesEncoding); } @@ -391,7 +392,7 @@ private static PageHeader newDataPageHeader( * * @param valueCount how many rows have been written so far */ - private void writePage(final ByteBuffer encodedData, final long valueCount) { + private void writePage(final ByteBuffer encodedData, final long valueCount, final long rowCount) { try { BytesInput bytes = BytesInput.from(encodedData); if (dlEncoder != null) { @@ -402,9 +403,7 @@ private void writePage(final ByteBuffer encodedData, final long valueCount) { final BytesInput rlBytesInput = rlEncoder.toBytes(); bytes = BytesInput.concat(BytesInput.fromInt((int) rlBytesInput.size()), rlBytesInput, bytes); } - writePage( - bytes, - (int) valueCount, hasDictionary ? Encoding.RLE_DICTIONARY : Encoding.PLAIN); + writePage(bytes, (int) valueCount, rowCount, hasDictionary ? Encoding.RLE_DICTIONARY : Encoding.PLAIN); } catch (IOException e) { throw new ParquetEncodingException("could not write page for " + column.getPath()[0], e); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 3621d69b5a0..9613b2bea7e 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -174,12 +174,17 @@ private int readIntLittleEndian(SeekableByteChannel f) throws IOException { } public RowGroupReader getRowGroup(int groupNumber) { + return getRowGroup(groupNumber, null); + } + + public RowGroupReader getRowGroup(int groupNumber, final String version) { return new RowGroupReaderImpl( fileMetaData.getRow_groups().get(groupNumber), channelsProvider, rootPath, type, - getSchema()); + getSchema(), + version); } private static MessageType fromParquetSchema(List schema, List columnOrders) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java index 33007f26c7d..90d4e20add3 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java @@ -12,6 +12,7 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.io.BufferedInputStream; import java.io.IOException; @@ -34,13 +35,15 @@ public class RowGroupReaderImpl implements RowGroupReader { private final Map chunkMap = new HashMap<>(); private final Path rootPath; + private final String version; RowGroupReaderImpl( @NotNull final RowGroup rowGroup, @NotNull final SeekableChannelsProvider channelsProvider, @NotNull final Path rootPath, @NotNull final MessageType type, - @NotNull final MessageType schema) { + @NotNull final MessageType schema, + @Nullable final String version) { this.channelsProvider = channelsProvider; this.rowGroup = rowGroup; this.rootPath = rootPath; @@ -59,6 +62,7 @@ public class RowGroupReaderImpl implements RowGroupReader { } schemaMap.put(key, nonRequiredFields); } + this.version = version; } @Override @@ -80,8 +84,8 @@ public ColumnChunkReaderImpl getColumnChunk(@NotNull final List path) { throw new UncheckedIOException(e); } } - return new ColumnChunkReaderImpl(columnChunk, channelsProvider, rootPath, - type, offsetIndex, fieldTypes); + return new ColumnChunkReaderImpl(columnChunk, channelsProvider, rootPath, type, offsetIndex, fieldTypes, + numRows(), version); } @Override diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 1adf7044ce7..3750f5a9e7a 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -225,17 +225,20 @@ public METADATA_TYPE getMetadata(@NotNull final ColumnDefinition localPageCache.castAttr(), groupingKeyReader, ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, makeToPage(columnTypes.get(GROUPING_KEY), ParquetInstructions.EMPTY, - GROUPING_KEY, groupingKeyReader, columnDefinition)).pageStore, + GROUPING_KEY, groupingKeyReader, columnDefinition), + columnDefinition).pageStore, ColumnChunkPageStore.create( localPageCache.castAttr(), beginPosReader, ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, makeToPage(columnTypes.get(BEGIN_POS), ParquetInstructions.EMPTY, BEGIN_POS, - beginPosReader, FIRST_KEY_COL_DEF)).pageStore, + beginPosReader, FIRST_KEY_COL_DEF), + columnDefinition).pageStore, ColumnChunkPageStore.create( localPageCache.castAttr(), endPosReader, ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, makeToPage(columnTypes.get(END_POS), ParquetInstructions.EMPTY, END_POS, - endPosReader, LAST_KEY_COL_DEF)).pageStore) + endPosReader, LAST_KEY_COL_DEF), + columnDefinition).pageStore) .get(); } catch (IOException e) { throw new UncheckedIOException(e); @@ -431,7 +434,8 @@ private void fetchValues(@NotNull final ColumnDefinition columnDefinition) { tl().getRegionParameters().regionMask, makeToPage(tl().getColumnTypes().get(parquetColumnName), tl().getReadInstructions(), parquetColumnName, columnChunkReader, - columnDefinition)); + columnDefinition), + columnDefinition); pageStores[psi] = creatorResult.pageStore; dictionaryChunkSuppliers[psi] = creatorResult.dictionaryChunkSupplier; dictionaryKeysPageStores[psi] = creatorResult.dictionaryKeysPageStore; @@ -469,7 +473,7 @@ public Object get() { if (metaData != null) { return metaData; } - final int numRows = (int) keyColumn.size(); + final int numRows = (int) keyColumn.numRows(); try ( final ChunkBoxer.BoxerKernel boxerKernel = diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 78b8796e81c..88d2ae87acc 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -42,6 +42,7 @@ public class ParquetTableLocation extends AbstractTableLocation { private final Map parquetColumnNameToPath; private final Map groupingColumns; private final Map columnTypes; + private final String version; private volatile RowGroupReader[] rowGroupReaders; @@ -84,6 +85,7 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, ParquetSchemaReader.parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData()); groupingColumns = tableInfo.map(TableInfo::groupingColumnMap).orElse(Collections.emptyMap()); columnTypes = tableInfo.map(TableInfo::columnTypeMap).orElse(Collections.emptyMap()); + version = tableInfo.map(TableInfo::version).orElse(null); handleUpdate(computeIndex(), tableLocationKey.getFile().lastModified()); } @@ -130,7 +132,7 @@ private RowGroupReader[] getRowGroupReaders() { return local; } return rowGroupReaders = IntStream.of(rowGroupIndices) - .mapToObj(parquetFileReader::getRowGroup) + .mapToObj(idx -> parquetFileReader.getRowGroup(idx, version)) .sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal())) .toArray(RowGroupReader[]::new); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java index 125e112500d..844164fb2ad 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java @@ -61,11 +61,11 @@ public final Map columnTypeMap() { } /** - * @return The Deephaven release version when this metadata format was defined + * @return The Deephaven release version when this metadata format was updated */ @Value.Default public String version() { - return "0.4.0"; + return "0.31.0"; } /** diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java index d98dd9c638e..fe52777791d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java @@ -4,6 +4,7 @@ package io.deephaven.parquet.table.pagestore; import io.deephaven.base.verify.Require; +import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.parquet.table.pagestore.topage.ToPage; import io.deephaven.engine.table.Releasable; import io.deephaven.chunk.attributes.Any; @@ -16,21 +17,26 @@ import io.deephaven.parquet.base.ColumnChunkReader; import io.deephaven.parquet.base.ColumnPageReader; import io.deephaven.util.SafeCloseable; +import io.deephaven.vector.Vector; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; import java.io.IOException; import java.io.UncheckedIOException; import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public abstract class ColumnChunkPageStore implements PageStore>, Page, SafeCloseable, Releasable { - protected final PageCache pageCache; + final PageCache pageCache; private final ColumnChunkReader columnChunkReader; private final long mask; private final ToPage toPage; - private final long size; + private final long numRows; final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator; public static class CreatorResult { @@ -48,25 +54,62 @@ private CreatorResult(@NotNull final ColumnChunkPageStore pageStore, } } + private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChunkReader columnChunkReader, + @NotNull final ColumnDefinition columnDefinition) { + if (columnChunkReader.getOffsetIndex() == null) { + return false; + } + final String version = columnChunkReader.getVersion(); + if (version == null) { + // Parquet file not written by deephaven + return true; + } + // For vector and array column types, versions before 0.31.0 had a bug in offset index calculation + final Class columnType = columnDefinition.getDataType(); + if (columnType.isArray() || Vector.class.isAssignableFrom(columnType)) { + return satisfiesMinimumVersionRequirements(version); + } + return true; + } + + private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)"); + + /** + * Check if the version is greater than 0.31.0 + */ + @VisibleForTesting + public static boolean satisfiesMinimumVersionRequirements(@Nullable final String version) { + if (version == null) { + return false; + } + final Matcher matcher = VERSION_PATTERN.matcher(version); + if (!matcher.matches()) { + throw new IllegalArgumentException("Malformed version:" + version); + } + final int major = Integer.parseInt(matcher.group(1)); + final int minor = Integer.parseInt(matcher.group(2)); + return major > 0 || major == 0 && minor >= 31; + } + public static CreatorResult create( @NotNull final PageCache pageCache, @NotNull final ColumnChunkReader columnChunkReader, final long mask, - @NotNull final ToPage toPage) throws IOException { - final boolean fixedSizePages = columnChunkReader.getPageFixedSize() >= 1; - final ColumnChunkPageStore columnChunkPageStore = fixedSizePages - ? new FixedPageSizeColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage) + @NotNull final ToPage toPage, + @NotNull final ColumnDefinition columnDefinition) throws IOException { + final boolean canUseOffsetIndex = canUseOffsetIndexBasedPageStore(columnChunkReader, columnDefinition); + final ColumnChunkPageStore columnChunkPageStore = canUseOffsetIndex + ? new OffsetIndexBasedColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage) : new VariablePageSizeColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage); final ToPage dictionaryKeysToPage = toPage.getDictionaryKeysToPage(); final ColumnChunkPageStore dictionaryKeysColumnChunkPageStore = dictionaryKeysToPage == null ? null - : fixedSizePages - ? new FixedPageSizeColumnChunkPageStore<>(pageCache.castAttr(), columnChunkReader, mask, - dictionaryKeysToPage) + : canUseOffsetIndex + ? new OffsetIndexBasedColumnChunkPageStore<>(pageCache.castAttr(), columnChunkReader, + mask, dictionaryKeysToPage) : new VariablePageSizeColumnChunkPageStore<>(pageCache.castAttr(), columnChunkReader, - mask, - dictionaryKeysToPage); + mask, dictionaryKeysToPage); return new CreatorResult<>(columnChunkPageStore, toPage::getDictionaryChunk, dictionaryKeysColumnChunkPageStore); } @@ -82,7 +125,7 @@ public static CreatorResult create( this.mask = mask; this.toPage = toPage; - this.size = Require.inRange(columnChunkReader.numRows(), "numRows", mask, "mask"); + this.numRows = Require.inRange(columnChunkReader.numRows(), "numRows", mask, "mask"); this.columnPageReaderIterator = columnChunkReader.getPageIterator(); } @@ -101,8 +144,8 @@ public long firstRowOffset() { return 0; } - public long size() { - return size; + public long numRows() { + return numRows; } @Override diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java index 3a66f93b3a3..89004ff8f20 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java @@ -17,6 +17,11 @@ import java.lang.ref.WeakReference; import java.util.Arrays; +/** + * Unused class, use {@link OffsetIndexBasedColumnChunkPageStore} instead because the only way we could find to check if + * the page sizes are fixed without actually reading the page headers was through offset index. Also, test this class + * before using it since it might not work as-is. + */ class FixedPageSizeColumnChunkPageStore extends ColumnChunkPageStore { private final int pageFixedSize; @@ -34,7 +39,7 @@ class FixedPageSizeColumnChunkPageStore extends ColumnChunkPag Require.gtZero(pageFixedSize, "pageFixedSize"); - final int numPages = Math.toIntExact((size() - 1) / pageFixedSize + 1); + final int numPages = Math.toIntExact((numRows() - 1) / pageFixedSize + 1); this.columnPageReaders = new ColumnPageReader[numPages]; // noinspection unchecked @@ -84,7 +89,7 @@ private ChunkPage getPage(final int pageNum) { public @NotNull ChunkPage getPageContaining(FillContext fillContext, final long elementIndex) { final long row = elementIndex & mask(); - Require.inRange(row, "row", size(), "numRows"); + Require.inRange(row, "row", numRows(), "numRows"); // This is safe because of our check in the constructor, and we know the row is in range. final int pageNum = (int) (row / pageFixedSize); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java new file mode 100644 index 00000000000..6fd95617ce6 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -0,0 +1,154 @@ +/** + * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.parquet.table.pagestore; + +import io.deephaven.base.verify.Assert; +import io.deephaven.base.verify.Require; +import io.deephaven.chunk.attributes.Any; +import io.deephaven.engine.page.ChunkPage; +import io.deephaven.parquet.table.pagestore.topage.ToPage; +import io.deephaven.parquet.base.ColumnChunkReader; +import io.deephaven.parquet.base.ColumnPageReader; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.ref.WeakReference; +import java.util.Arrays; + +final class OffsetIndexBasedColumnChunkPageStore extends ColumnChunkPageStore { + private final OffsetIndex offsetIndex; + private final int numPages; + /** + * Set if first ({@link #numPages}-1) pages have equal number of rows + */ + private boolean isPageSizeFixed; + /** + * Fixed number of rows per page, only valid if {@link #isPageSizeFixed} is true. Used to map from row index to page + * number. + */ + private final long fixedPageSize; + private final Object[] objectsForSynchronizingPageAccess; + private final ColumnPageReader[] columnPageReaders; + private final WeakReference>[] pages; + + OffsetIndexBasedColumnChunkPageStore(@NotNull final PageCache pageCache, + @NotNull final ColumnChunkReader columnChunkReader, + final long mask, + @NotNull final ToPage toPage) throws IOException { + super(pageCache, columnChunkReader, mask, toPage); + offsetIndex = columnChunkReader.getOffsetIndex(); + Assert.assertion(offsetIndex != null, "offsetIndex != null"); + numPages = offsetIndex.getPageCount(); + + // noinspection unchecked + pages = (WeakReference>[]) new WeakReference[numPages]; + columnPageReaders = new ColumnPageReader[numPages]; + + isPageSizeFixed = true; + final long firstPageSize; + if (numPages > 1) { + firstPageSize = offsetIndex.getFirstRowIndex(1) - offsetIndex.getFirstRowIndex(0); + } else { + firstPageSize = numRows(); + } + objectsForSynchronizingPageAccess = new Object[numPages]; + for (int i = 0; i < numPages; ++i) { + objectsForSynchronizingPageAccess[i] = new Object(); + if (isPageSizeFixed && i > 0 + && offsetIndex.getFirstRowIndex(i) - offsetIndex.getFirstRowIndex(i - 1) != firstPageSize) { + isPageSizeFixed = false; + } + } + if (isPageSizeFixed) { + fixedPageSize = firstPageSize; + } else { + fixedPageSize = -1; + } + } + + /** + * Binary search in offset index to find the page number that contains the row. Logic duplicated from + * {@link Arrays#binarySearch(long[], long)} to use the offset index. + */ + private static int findPageNumUsingOffsetIndex(final OffsetIndex offsetIndex, final long row) { + int low = 0; + int high = offsetIndex.getPageCount() - 1; + + while (low <= high) { + final int mid = (low + high) >>> 1; + final long midVal = offsetIndex.getFirstRowIndex(mid); + + if (midVal < row) + low = mid + 1; + else if (midVal > row) + high = mid - 1; + else + return mid; // key found + } + return -(low + 1); // key not found. + } + + private ChunkPage getPage(final int pageNum) { + if (pageNum < 0 || pageNum >= numPages) { + throw new IllegalArgumentException("pageNum " + pageNum + " is out of range [0, " + numPages + ")"); + } + final PageCache.IntrusivePage page; + WeakReference> pageRef = pages[pageNum]; + if (pageRef == null || pageRef.get() == null) { + synchronized (objectsForSynchronizingPageAccess[pageNum]) { + // Make sure no one materialized this page as we waited for the lock + pageRef = pages[pageNum]; + if (pageRef == null || pageRef.get() == null) { + if (columnPageReaders[pageNum] == null) { + columnPageReaders[pageNum] = columnPageReaderIterator.getPageReader(pageNum); + } + try { + page = new PageCache.IntrusivePage<>(toPage(offsetIndex.getFirstRowIndex(pageNum), + columnPageReaders[pageNum])); + } catch (final IOException except) { + throw new UncheckedIOException(except); + } + pages[pageNum] = new WeakReference<>(page); + } else { + page = pageRef.get(); + } + } + } else { + page = pageRef.get(); + } + if (page == null) { + throw new IllegalStateException("Page should not be null"); + } + pageCache.touch(page); + return page.getPage(); + } + + @NotNull + @Override + public ChunkPage getPageContaining(@NotNull final FillContext fillContext, long row) { + row &= mask(); + Require.inRange(row, "row", numRows(), "numRows"); + + int pageNum; + if (isPageSizeFixed) { + pageNum = (int) (row / fixedPageSize); + if (pageNum >= numPages) { + // This can happen if the last page is of different size from rest of the pages, assert this condition. + // We have already checked that row is less than numRows. + Assert.assertion(row >= offsetIndex.getFirstRowIndex(numPages - 1), + "row >= offsetIndex.getFirstRowIndex(numPages - 1)"); + pageNum = (numPages - 1); + } + } else { + pageNum = findPageNumUsingOffsetIndex(offsetIndex, row); + if (pageNum < 0) { + pageNum = -2 - pageNum; + } + + } + return getPage(pageNum); + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java index 2356393ea6d..f89d541726d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java @@ -139,7 +139,7 @@ private ChunkPage getPage(final int pageNum) { @Override public ChunkPage getPageContaining(@NotNull final FillContext fillContext, long row) { row &= mask(); - Require.inRange(row - pageRowOffsets[0], "row", size(), "numRows"); + Require.inRange(row - pageRowOffsets[0], "row", numRows(), "numRows"); int localNumPages = numPages; int pageNum = Arrays.binarySearch(pageRowOffsets, 1, localNumPages + 1, row); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index c578137e336..b2404463603 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -28,6 +28,7 @@ import io.deephaven.parquet.base.NullStatistics; import io.deephaven.parquet.base.InvalidParquetFileException; import io.deephaven.parquet.table.location.ParquetTableLocationKey; +import io.deephaven.parquet.table.pagestore.ColumnChunkPageStore; import io.deephaven.parquet.table.transfer.StringDictionary; import io.deephaven.stringset.ArrayStringSet; import io.deephaven.engine.table.Table; @@ -62,7 +63,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; -import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -614,6 +614,95 @@ private Table maybeFixBigDecimal(Table toFix) { .dropColumns("bdColE"); } + /** + * Test if the current code can read the parquet data written by the old code. There is logic in + * {@link ColumnChunkPageStore#create} that decides page store based on the version of the parquet file. The old + * data is generated using following logic: + * + *
+     *  // Enforce a smaller page size to write multiple pages
+     *  final ParquetInstructions writeInstructions = new ParquetInstructions.Builder()
+     *        .setTargetPageSize(ParquetInstructions.MIN_TARGET_PAGE_SIZE)
+     *        .build();
+     *
+     *  final Table table = getTableFlat(5000, true, false);
+     *  ParquetTools.writeTable(table, new File("ReferenceParquetData.parquet"), writeInstructions);
+     *
+     *  Table vectorTable = table.groupBy().select();
+     *  vectorTable = vectorTable.join(TableTools.emptyTable(100)).select();
+     *  ParquetTools.writeTable(vectorTable, new File("ReferenceParquetVectorData.parquet"), writeInstructions);
+     *
+     *  final Table arrayTable = vectorTable.updateView(vectorTable.getColumnSourceMap().keySet().stream()
+     *         .map(name -> name + " = " + name + ".toArray()")
+     *         .toArray(String[]::new));
+     *  ParquetTools.writeTable(arrayTable, new File("ReferenceParquetArrayData.parquet"), writeInstructions);
+     * 
+ */ + @Test + public void testReadOldParquetData() { + String path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetData.parquet").getFile(); + try { + ParquetTools.readTable(new File(path)).select(); + } catch (RuntimeException e) { + if (e.getCause() instanceof InvalidParquetFileException) { + final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + + "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + + "the repo to pull the files from LFS. Check cause of exception for more details."; + throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); + } + throw e; + } + final ParquetMetadata metadata = new ParquetTableLocationKey(new File(path), 0, null).getMetadata(); + assertTrue(metadata.getFileMetaData().getKeyValueMetaData().get("deephaven").contains("\"version\":\"0.4.0\"")); + + path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetVectorData.parquet").getFile(); + ParquetTools.readTable(new File(path)).select(); + + path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetArrayData.parquet").getFile(); + ParquetTools.readTable(new File(path)).select(); + } + + @Test + public void testVersionChecks() { + assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements(null)); + assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.0.0")); + assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.4.0")); + try { + ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.3"); + TestCase.fail("Exception expected for invalid version string"); + } catch (IllegalArgumentException expected) { + } + assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.0")); + assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.1")); + assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.32.0")); + assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("1.3.0")); + } + + @Test + public void testWritingDifferentPageSizes() { + // Make a table with arrays of decreasing sizes such that different pages will have different number of rows + Table arrayTable = TableTools.emptyTable(100).update( + "intArrays = java.util.stream.IntStream.range(0, i).toArray()").reverse(); + final File dest = new File(rootFile + File.separator + "testWritingDifferentPageSizes.parquet"); + final ParquetInstructions writeInstructions = new ParquetInstructions.Builder() + .setTargetPageSize(ParquetInstructions.MIN_TARGET_PAGE_SIZE) + .build(); + ParquetTools.writeTable(arrayTable, dest, writeInstructions); + Table fromDisk = ParquetTools.readTable(dest).select(); + TstUtils.assertTableEquals(arrayTable, fromDisk); + + // Make a table such that only the last page has different number of rows, all else have equal number + final long NUM_ROWS = 1000; + arrayTable = TableTools.emptyTable(NUM_ROWS).update( + "intArrays = (i <= 900) ? java.util.stream.IntStream.range(i, i+50).toArray() : " + + "java.util.stream.IntStream.range(i, i+2).toArray()"); + ParquetTools.writeTable(arrayTable, dest, writeInstructions); + fromDisk = ParquetTools.readTable(dest); + // Access something on the last page to make sure we can read it + final int[] data = (int[]) fromDisk.getColumnSource("intArrays").get(998); + assertTrue(data.length == 2 && data[0] == 998 && data[1] == 999); + TstUtils.assertTableEquals(arrayTable, fromDisk.select()); + } // Following is used for testing both writing APIs for parquet tables private interface TestParquetTableWriter { diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetArrayData.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetArrayData.parquet new file mode 100644 index 00000000000..16851b6b630 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetArrayData.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a21ee158eb36ed817b85d28202215fe70e90aaad81bebfdfb44e9da049e17a18 +size 22075750 diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetData.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetData.parquet new file mode 100644 index 00000000000..6010fc66edf --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetData.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:dbbb899c95833c163717f3be3984f1ea3efbc858f536b794d70f2f5dabcd6cd7 +size 320991 diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetVectorData.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetVectorData.parquet new file mode 100644 index 00000000000..58ee60ef1af --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetVectorData.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8f1b80574b46b9508c2a0b35aa93b45062e52ff3c133eb0734fb3d493086b661 +size 22098162 From a6dfba2268d8337c460204e1cf43468047c537af Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 15 Nov 2023 16:22:04 -0600 Subject: [PATCH 02/12] Added comments --- .../parquet/base/ColumnChunkReader.java | 18 ++++++++++++++---- .../parquet/base/ColumnChunkReaderImpl.java | 6 ++++++ .../parquet/base/ColumnWriterImpl.java | 6 +++--- .../parquet/base/ParquetFileReader.java | 11 ++++++----- .../table/location/ParquetColumnLocation.java | 10 ++++++---- .../table/pagestore/ColumnChunkPageStore.java | 14 +++++++------- .../FixedPageSizeColumnChunkPageStore.java | 5 ++--- .../OffsetIndexBasedColumnChunkPageStore.java | 2 +- .../table/ParquetTableReadWriteTest.java | 1 - 9 files changed, 45 insertions(+), 28 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index 81f7d4e43c8..60ca30629a7 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -34,15 +34,22 @@ public interface ColumnChunkReader { */ int getMaxRl(); - default @Nullable OffsetIndex getOffsetIndex() { - return null; - } + /** + * @return The offset index for this column chunk, or null if it not found in the metadata. + */ + @Nullable + OffsetIndex getOffsetIndex(); interface ColumnPageReaderIterator extends Iterator, AutoCloseable { @Override void close() throws IOException; - default ColumnPageReader getPageReader(int pageNum) { + /** + * Directly access a page reader for a given page number. This is an optional method that may not be + * implemented. Note that the user should either use {@link Iterator} methods or this method, but not both. + */ + @Nullable + default ColumnPageReader getPageReader(final int pageNum) { return null; }; } @@ -80,6 +87,9 @@ public int getMaxId() { PrimitiveType getType(); + /** + * @return The "version" string from deephaven specific parquet metadata, or null if it's not present. + */ @Nullable String getVersion(); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 918e2ecea0e..9de387e9600 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -46,7 +46,13 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { private final PageMaterializer.Factory nullMaterializerFactory; private Path filePath; + /** + * Number of rows in the row group of this column chunk. + */ private final long numRows; + /** + * Version string from deephaven specific parquet metadata, or null if it's not present. + */ private final String version; ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, Path rootPath, diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index ad8bced463f..1aea3081a3b 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -314,8 +314,7 @@ public void writePageV2( } private void writePage(final BytesInput bytes, final int valueCount, final long rowCount, - final Encoding valuesEncoding) - throws IOException { + final Encoding valuesEncoding) throws IOException { final long initialOffset = bufferedOutput.position(); if (firstDataPageOffset == -1) { firstDataPageOffset = initialOffset; @@ -390,7 +389,8 @@ private static PageHeader newDataPageHeader( /** * writes the current data to a new page in the page store * - * @param valueCount how many rows have been written so far + * @param valueCount how many values have been written so far + * @param rowCount how many rows have been written so far, can be different from valueCount for vector/arrays */ private void writePage(final ByteBuffer encodedData, final long valueCount, final long rowCount) { try { diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 9613b2bea7e..cb6a8e0616e 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -173,11 +173,12 @@ private int readIntLittleEndian(SeekableByteChannel f) throws IOException { return tempBuf.getInt(); } - public RowGroupReader getRowGroup(int groupNumber) { - return getRowGroup(groupNumber, null); - } - - public RowGroupReader getRowGroup(int groupNumber, final String version) { + /** + * Create a {@link RowGroupReader} object for provided row group number + * + * @param version The "version" string from deephaven specific parquet metadata, or null if it's not present. + */ + public RowGroupReader getRowGroup(final int groupNumber, final String version) { return new RowGroupReaderImpl( fileMetaData.getRow_groups().get(groupNumber), channelsProvider, diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 3750f5a9e7a..ad49ea797f3 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -197,12 +197,14 @@ public METADATA_TYPE getMetadata(@NotNull final ColumnDefinition } } } - final Map columnTypes = ParquetSchemaReader.parseMetadata( + final Optional tableInfo = ParquetSchemaReader.parseMetadata( new ParquetMetadataConverter().fromParquetMetadata(parquetFileReader.fileMetaData) - .getFileMetaData().getKeyValueMetaData()) - .map(TableInfo::columnTypeMap).orElse(Collections.emptyMap()); + .getFileMetaData().getKeyValueMetaData()); + final Map columnTypes = + tableInfo.map(TableInfo::columnTypeMap).orElse(Collections.emptyMap()); + final String version = tableInfo.map(TableInfo::version).orElse(null); - final RowGroupReader rowGroupReader = parquetFileReader.getRowGroup(0); + final RowGroupReader rowGroupReader = parquetFileReader.getRowGroup(0, version); final ColumnChunkReader groupingKeyReader = rowGroupReader.getColumnChunk(Collections.singletonList(GROUPING_KEY)); final ColumnChunkReader beginPosReader = diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java index fe52777791d..7a7039a294f 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java @@ -19,7 +19,6 @@ import io.deephaven.util.SafeCloseable; import io.deephaven.vector.Vector; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.VisibleForTesting; import java.io.IOException; @@ -61,10 +60,11 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun } final String version = columnChunkReader.getVersion(); if (version == null) { - // Parquet file not written by deephaven + // Parquet file not written by deephaven, can use offset index return true; } - // For vector and array column types, versions before 0.31.0 had a bug in offset index calculation + // For vector and array column types, versions before 0.31.0 had a bug in offset index calculation, fixed as + // part of deephaven-core#4844 final Class columnType = columnDefinition.getDataType(); if (columnType.isArray() || Vector.class.isAssignableFrom(columnType)) { return satisfiesMinimumVersionRequirements(version); @@ -78,10 +78,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun * Check if the version is greater than 0.31.0 */ @VisibleForTesting - public static boolean satisfiesMinimumVersionRequirements(@Nullable final String version) { - if (version == null) { - return false; - } + public static boolean satisfiesMinimumVersionRequirements(@NotNull final String version) { final Matcher matcher = VERSION_PATTERN.matcher(version); if (!matcher.matches()) { throw new IllegalArgumentException("Malformed version:" + version); @@ -144,6 +141,9 @@ public long firstRowOffset() { return 0; } + /** + * @return The number of rows in this ColumnChunk + */ public long numRows() { return numRows; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java index 89004ff8f20..77703a4e6c2 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java @@ -18,9 +18,8 @@ import java.util.Arrays; /** - * Unused class, use {@link OffsetIndexBasedColumnChunkPageStore} instead because the only way we could find to check if - * the page sizes are fixed without actually reading the page headers was through offset index. Also, test this class - * before using it since it might not work as-is. + * Unused class, use {@link OffsetIndexBasedColumnChunkPageStore} instead because to check if the page sizes are fixed + * without actually reading the page headers, we need offset index. */ class FixedPageSizeColumnChunkPageStore extends ColumnChunkPageStore { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java index 6fd95617ce6..a71b628f37f 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -136,7 +136,7 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, if (isPageSizeFixed) { pageNum = (int) (row / fixedPageSize); if (pageNum >= numPages) { - // This can happen if the last page is of different size from rest of the pages, assert this condition. + // This can happen if the last page is of different size from rest of the pages. // We have already checked that row is less than numRows. Assert.assertion(row >= offsetIndex.getFirstRowIndex(numPages - 1), "row >= offsetIndex.getFirstRowIndex(numPages - 1)"); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index b2404463603..f3b96aa7791 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -664,7 +664,6 @@ public void testReadOldParquetData() { @Test public void testVersionChecks() { - assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements(null)); assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.0.0")); assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.4.0")); try { From 8fefcc1b08d670ffc743a144d12aaa9775499af3 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 20 Nov 2023 11:33:45 -0600 Subject: [PATCH 03/12] Review comments part 1 --- .../parquet/base/ColumnChunkReader.java | 29 ++--- .../parquet/base/ColumnChunkReaderImpl.java | 44 ++++++-- .../table/pagestore/ColumnChunkPageStore.java | 13 +-- .../FixedPageSizeColumnChunkPageStore.java | 99 ----------------- .../OffsetIndexBasedColumnChunkPageStore.java | 103 ++++++++---------- .../VariablePageSizeColumnChunkPageStore.java | 15 ++- .../table/ParquetTableReadWriteTest.java | 71 +++++------- 7 files changed, 135 insertions(+), 239 deletions(-) delete mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index 60ca30629a7..e5727920612 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -13,11 +13,6 @@ import java.util.function.Supplier; public interface ColumnChunkReader { - /** - * @return -1 if the current column doesn't guarantee fixed page size, otherwise the fixed page size - */ - int getPageFixedSize(); - /** * @return The number of rows in this ColumnChunk, or -1 if it's unknown. */ @@ -42,22 +37,28 @@ public interface ColumnChunkReader { interface ColumnPageReaderIterator extends Iterator, AutoCloseable { @Override - void close() throws IOException; + void close(); + } + /** + * @return An iterator over individual parquet pages + */ + ColumnPageReaderIterator getPageIterator() throws IOException; + + interface ColumnPageDirectAccessor extends AutoCloseable { /** - * Directly access a page reader for a given page number. This is an optional method that may not be - * implemented. Note that the user should either use {@link Iterator} methods or this method, but not both. + * Directly access a page reader for a given page number. */ - @Nullable - default ColumnPageReader getPageReader(final int pageNum) { - return null; - }; + ColumnPageReader getPageReader(final int pageNum); + + @Override + void close(); } /** - * @return An iterator over individual parquet pages + * @return An accessor for individual parquet pages */ - ColumnPageReaderIterator getPageIterator() throws IOException; + ColumnPageDirectAccessor getPageAccessor(); /** * @return Whether this column chunk uses a dictionary-based encoding on every page diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 9de387e9600..69136cf08e0 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -28,6 +28,7 @@ import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.util.List; +import java.util.NoSuchElementException; import java.util.function.Supplier; import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY; @@ -77,11 +78,6 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { this.version = version; } - @Override - public int getPageFixedSize() { - return -1; - } - @Override public long numRows() { return numRows; @@ -112,6 +108,14 @@ public ColumnPageReaderIterator getPageIterator() { } } + @Override + public final ColumnPageDirectAccessor getPageAccessor() { + if (offsetIndex == null) { + throw new UnsupportedOperationException("Cannot use direct accessor without offset index"); + } + return new ColumnPageDirectAccessorImpl(path, channelsProvider); + } + private Path getFilePath() { if (filePath != null) { return filePath; @@ -306,23 +310,39 @@ public boolean hasNext() { @Override public ColumnPageReader next() { if (!hasNext()) { - throw new RuntimeException("No next element"); + throw new NoSuchElementException("No next element"); } - int rowCount = - (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values()) - - offsetIndex.getFirstRowIndex(pos) + 1); + // Following logic assumes that offsetIndex will store the number of values for a page instead of number + // of rows (which can be different for array and vector columns). This behavior is because of a bug on + // parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading + // parquet files written before deephaven-core/pull/4844. + final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values()) + - offsetIndex.getFirstRowIndex(pos) + 1); ColumnPageReaderImpl columnPageReader = new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset(pos), null, - rowCount); + numValues); pos++; return columnPageReader; } + @Override + public void close() {} + } + + class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor { + private final SeekableChannelsProvider channelsProvider; + private final ColumnDescriptor path; + + ColumnPageDirectAccessorImpl(final ColumnDescriptor path, final SeekableChannelsProvider channelsProvider) { + this.path = path; + this.channelsProvider = channelsProvider; + } + @Override public ColumnPageReader getPageReader(final int pageNum) { if (pageNum > offsetIndex.getPageCount()) { - throw new RuntimeException( + throw new NoSuchElementException( "pageNum=" + pageNum + " > offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); } final int numValues = -1; // Will be populated properly when we read the page header @@ -331,6 +351,6 @@ public ColumnPageReader getPageReader(final int pageNum) { } @Override - public void close() throws IOException {} + public void close() {} } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java index 7a7039a294f..15c0a56e7f3 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java @@ -22,7 +22,6 @@ import org.jetbrains.annotations.VisibleForTesting; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,7 +35,6 @@ public abstract class ColumnChunkPageStore private final ToPage toPage; private final long numRows; - final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator; public static class CreatorResult { @@ -75,7 +73,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)"); /** - * Check if the version is greater than 0.31.0 + * Check if the version is greater than or equal to 0.31.0 */ @VisibleForTesting public static boolean satisfiesMinimumVersionRequirements(@NotNull final String version) { @@ -123,7 +121,6 @@ public static CreatorResult create( this.toPage = toPage; this.numRows = Require.inRange(columnChunkReader.numRows(), "numRows", mask, "mask"); - this.columnPageReaderIterator = columnChunkReader.getPageIterator(); } ChunkPage toPage(final long offset, @NotNull final ColumnPageReader columnPageReader) @@ -170,11 +167,5 @@ public boolean usesDictionaryOnEveryPage() { } @Override - public void close() { - try { - columnPageReaderIterator.close(); - } catch (IOException except) { - throw new UncheckedIOException(except); - } - } + public void close() {} } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java deleted file mode 100644 index 77703a4e6c2..00000000000 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.parquet.table.pagestore; - -import io.deephaven.base.verify.Assert; -import io.deephaven.base.verify.Require; -import io.deephaven.chunk.attributes.Any; -import io.deephaven.engine.page.ChunkPage; -import io.deephaven.parquet.table.pagestore.topage.ToPage; -import io.deephaven.parquet.base.ColumnChunkReader; -import io.deephaven.parquet.base.ColumnPageReader; -import org.jetbrains.annotations.NotNull; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.lang.ref.WeakReference; -import java.util.Arrays; - -/** - * Unused class, use {@link OffsetIndexBasedColumnChunkPageStore} instead because to check if the page sizes are fixed - * without actually reading the page headers, we need offset index. - */ -class FixedPageSizeColumnChunkPageStore extends ColumnChunkPageStore { - - private final int pageFixedSize; - private volatile int numPages = 0; - private final ColumnPageReader[] columnPageReaders; - private final WeakReference>[] pages; - - FixedPageSizeColumnChunkPageStore(@NotNull final PageCache pageCache, - @NotNull final ColumnChunkReader columnChunkReader, - final long mask, - @NotNull final ToPage toPage) throws IOException { - super(pageCache, columnChunkReader, mask, toPage); - - this.pageFixedSize = columnChunkReader.getPageFixedSize(); - - Require.gtZero(pageFixedSize, "pageFixedSize"); - - final int numPages = Math.toIntExact((numRows() - 1) / pageFixedSize + 1); - this.columnPageReaders = new ColumnPageReader[numPages]; - - // noinspection unchecked - this.pages = (WeakReference>[]) new WeakReference[numPages]; - Arrays.fill(pages, PageCache.getNullPage()); - } - - private void fillToPage(final int pageNum) { - - while (numPages <= pageNum) { - synchronized (this) { - if (numPages <= pageNum) { - Assert.assertion(columnPageReaderIterator.hasNext(), - "columnPageReaderIterator.hasNext()", - "Parquet fixed page size and page iterator don't match, not enough pages."); - columnPageReaders[numPages++] = columnPageReaderIterator.next(); - } - } - } - } - - private ChunkPage getPage(final int pageNum) { - PageCache.IntrusivePage page = pages[pageNum].get(); - - if (page == null) { - synchronized (columnPageReaders[pageNum]) { - page = pages[pageNum].get(); - - if (page == null) { - try { - page = new PageCache.IntrusivePage<>( - toPage((long) pageNum * pageFixedSize, columnPageReaders[pageNum])); - } catch (IOException except) { - throw new UncheckedIOException(except); - } - - pages[pageNum] = new WeakReference<>(page); - } - } - } - - pageCache.touch(page); - return page.getPage(); - } - - @Override - public @NotNull ChunkPage getPageContaining(FillContext fillContext, - final long elementIndex) { - final long row = elementIndex & mask(); - Require.inRange(row, "row", numRows(), "numRows"); - - // This is safe because of our check in the constructor, and we know the row is in range. - final int pageNum = (int) (row / pageFixedSize); - - fillToPage(pageNum); - return getPage(pageNum); - } -} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java index a71b628f37f..8a24fd72bbf 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -17,22 +17,33 @@ import java.io.UncheckedIOException; import java.lang.ref.WeakReference; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReferenceArray; final class OffsetIndexBasedColumnChunkPageStore extends ColumnChunkPageStore { + private static final long PAGE_SIZE_NOT_FIXED = -1; + private final OffsetIndex offsetIndex; private final int numPages; /** - * Set if first ({@link #numPages}-1) pages have equal number of rows - */ - private boolean isPageSizeFixed; - /** - * Fixed number of rows per page, only valid if {@link #isPageSizeFixed} is true. Used to map from row index to page - * number. + * Fixed number of rows per page. Set as positive value if first ({@link #numPages}-1) pages have equal number of + * rows, else equal to {@value #PAGE_SIZE_NOT_FIXED}. We don't care about the size of the last page, because we can + * assume all pages to be of the same size and calculate the page number as + * {@code row_index / fixed_page_size -> page_number}. If it is greater than {@link #numPages}, we can assume the + * row to be coming from last page. */ private final long fixedPageSize; - private final Object[] objectsForSynchronizingPageAccess; - private final ColumnPageReader[] columnPageReaders; - private final WeakReference>[] pages; + + private final class PageState { + WeakReference> pageRef; + + PageState() { + // Initialized when used for the first time + pageRef = null; + } + } + + private final AtomicReferenceArray pageStates; + private final ColumnChunkReader.ColumnPageDirectAccessor columnPageDirectAccessor; OffsetIndexBasedColumnChunkPageStore(@NotNull final PageCache pageCache, @NotNull final ColumnChunkReader columnChunkReader, @@ -42,31 +53,22 @@ final class OffsetIndexBasedColumnChunkPageStore extends Colum offsetIndex = columnChunkReader.getOffsetIndex(); Assert.assertion(offsetIndex != null, "offsetIndex != null"); numPages = offsetIndex.getPageCount(); + Assert.assertion(numPages > 0, "numPages > 0"); + pageStates = new AtomicReferenceArray<>(numPages); + columnPageDirectAccessor = columnChunkReader.getPageAccessor(); - // noinspection unchecked - pages = (WeakReference>[]) new WeakReference[numPages]; - columnPageReaders = new ColumnPageReader[numPages]; - - isPageSizeFixed = true; - final long firstPageSize; - if (numPages > 1) { - firstPageSize = offsetIndex.getFirstRowIndex(1) - offsetIndex.getFirstRowIndex(0); - } else { - firstPageSize = numRows(); + if (numPages == 1) { + fixedPageSize = numRows(); + return; } - objectsForSynchronizingPageAccess = new Object[numPages]; - for (int i = 0; i < numPages; ++i) { - objectsForSynchronizingPageAccess[i] = new Object(); - if (isPageSizeFixed && i > 0 - && offsetIndex.getFirstRowIndex(i) - offsetIndex.getFirstRowIndex(i - 1) != firstPageSize) { + boolean isPageSizeFixed = true; + final long firstPageSize = offsetIndex.getFirstRowIndex(1) - offsetIndex.getFirstRowIndex(0); + for (int i = 2; i < numPages && isPageSizeFixed; ++i) { + if (offsetIndex.getFirstRowIndex(i) - offsetIndex.getFirstRowIndex(i - 1) != firstPageSize) { isPageSizeFixed = false; } } - if (isPageSizeFixed) { - fixedPageSize = firstPageSize; - } else { - fixedPageSize = -1; - } + fixedPageSize = isPageSizeFixed ? firstPageSize : PAGE_SIZE_NOT_FIXED; } /** @@ -95,32 +97,24 @@ private ChunkPage getPage(final int pageNum) { if (pageNum < 0 || pageNum >= numPages) { throw new IllegalArgumentException("pageNum " + pageNum + " is out of range [0, " + numPages + ")"); } - final PageCache.IntrusivePage page; - WeakReference> pageRef = pages[pageNum]; - if (pageRef == null || pageRef.get() == null) { - synchronized (objectsForSynchronizingPageAccess[pageNum]) { + PageCache.IntrusivePage page; + PageState pageState = pageStates.get(pageNum); + if (pageState == null) { + pageState = pageStates.updateAndGet(pageNum, p -> p == null ? new PageState() : p); + } + if (pageState.pageRef == null || (page = pageState.pageRef.get()) == null) { + synchronized (pageState) { // Make sure no one materialized this page as we waited for the lock - pageRef = pages[pageNum]; - if (pageRef == null || pageRef.get() == null) { - if (columnPageReaders[pageNum] == null) { - columnPageReaders[pageNum] = columnPageReaderIterator.getPageReader(pageNum); - } + if (pageState.pageRef == null || (page = pageState.pageRef.get()) == null) { + final ColumnPageReader reader = columnPageDirectAccessor.getPageReader(pageNum); try { - page = new PageCache.IntrusivePage<>(toPage(offsetIndex.getFirstRowIndex(pageNum), - columnPageReaders[pageNum])); + page = new PageCache.IntrusivePage<>(toPage(offsetIndex.getFirstRowIndex(pageNum), reader)); } catch (final IOException except) { throw new UncheckedIOException(except); } - pages[pageNum] = new WeakReference<>(page); - } else { - page = pageRef.get(); + pageState.pageRef = new WeakReference<>(page); } } - } else { - page = pageRef.get(); - } - if (page == null) { - throw new IllegalStateException("Page should not be null"); } pageCache.touch(page); return page.getPage(); @@ -133,7 +127,12 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, Require.inRange(row, "row", numRows(), "numRows"); int pageNum; - if (isPageSizeFixed) { + if (fixedPageSize == PAGE_SIZE_NOT_FIXED) { + pageNum = findPageNumUsingOffsetIndex(offsetIndex, row); + if (pageNum < 0) { + pageNum = -2 - pageNum; + } + } else { pageNum = (int) (row / fixedPageSize); if (pageNum >= numPages) { // This can happen if the last page is of different size from rest of the pages. @@ -142,12 +141,6 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, "row >= offsetIndex.getFirstRowIndex(numPages - 1)"); pageNum = (numPages - 1); } - } else { - pageNum = findPageNumUsingOffsetIndex(offsetIndex, row); - if (pageNum < 0) { - pageNum = -2 - pageNum; - } - } return getPage(pageNum); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java index f89d541726d..e9568936d4c 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java @@ -17,18 +17,17 @@ import java.lang.ref.WeakReference; import java.util.Arrays; -class VariablePageSizeColumnChunkPageStore extends ColumnChunkPageStore { +final class VariablePageSizeColumnChunkPageStore extends ColumnChunkPageStore { // We will set numPages after changing all of these arrays in place and/or setting additional - // elements to the - // end of the array. Thus, for i < numPages, array[i] will always have the same value, and be - // valid to use, as - // long as we fetch numPages before accessing the arrays. This is the thread-safe pattern used + // elements to the end of the array. Thus, for i < numPages, array[i] will always have the same value, and be + // valid to use, as long as we fetch numPages before accessing the arrays. This is the thread-safe pattern used // throughout. private volatile int numPages = 0; private volatile long[] pageRowOffsets; private volatile ColumnPageReader[] columnPageReaders; + private final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator; private volatile WeakReference>[] pages; VariablePageSizeColumnChunkPageStore(@NotNull final PageCache pageCache, @@ -41,6 +40,7 @@ class VariablePageSizeColumnChunkPageStore extends ColumnChunk pageRowOffsets = new long[INIT_ARRAY_SIZE + 1]; pageRowOffsets[0] = 0; columnPageReaders = new ColumnPageReader[INIT_ARRAY_SIZE]; + columnPageReaderIterator = columnChunkReader.getPageIterator(); // noinspection unchecked pages = (WeakReference>[]) new WeakReference[INIT_ARRAY_SIZE]; @@ -160,4 +160,9 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, return getPage(pageNum); } + + @Override + public void close() { + columnPageReaderIterator.close(); + } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 4790f6384f5..97174b5863b 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -628,6 +628,20 @@ private Table maybeFixBigDecimal(Table toFix) { .dropColumns("bdColE"); } + private static Table readParquetFileFromGitLFS(final File dest) { + try { + return readSingleFileTable(dest, EMPTY); + } catch (final RuntimeException e) { + if (e.getCause() instanceof InvalidParquetFileException) { + final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + + "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + + "the repo to pull the files from LFS. Check cause of exception for more details."; + throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); + } + throw e; + } + } + /** * Test if the current code can read the parquet data written by the old code. There is logic in * {@link ColumnChunkPageStore#create} that decides page store based on the version of the parquet file. The old @@ -655,25 +669,15 @@ private Table maybeFixBigDecimal(Table toFix) { @Test public void testReadOldParquetData() { String path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetData.parquet").getFile(); - try { - ParquetTools.readTable(new File(path)).select(); - } catch (RuntimeException e) { - if (e.getCause() instanceof InvalidParquetFileException) { - final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + - "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + - "the repo to pull the files from LFS. Check cause of exception for more details."; - throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); - } - throw e; - } + readParquetFileFromGitLFS(new File(path)).select(); final ParquetMetadata metadata = new ParquetTableLocationKey(new File(path), 0, null).getMetadata(); assertTrue(metadata.getFileMetaData().getKeyValueMetaData().get("deephaven").contains("\"version\":\"0.4.0\"")); path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetVectorData.parquet").getFile(); - ParquetTools.readTable(new File(path)).select(); + readParquetFileFromGitLFS(new File(path)).select(); path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetArrayData.parquet").getFile(); - ParquetTools.readTable(new File(path)).select(); + readParquetFileFromGitLFS(new File(path)).select(); } @Test @@ -696,13 +700,12 @@ public void testWritingDifferentPageSizes() { // Make a table with arrays of decreasing sizes such that different pages will have different number of rows Table arrayTable = TableTools.emptyTable(100).update( "intArrays = java.util.stream.IntStream.range(0, i).toArray()").reverse(); - final File dest = new File(rootFile + File.separator + "testWritingDifferentPageSizes.parquet"); + final File dest = new File(rootFile, "testWritingDifferentPageSizes.parquet"); final ParquetInstructions writeInstructions = new ParquetInstructions.Builder() .setTargetPageSize(ParquetInstructions.MIN_TARGET_PAGE_SIZE) .build(); ParquetTools.writeTable(arrayTable, dest, writeInstructions); - Table fromDisk = ParquetTools.readTable(dest).select(); - TstUtils.assertTableEquals(arrayTable, fromDisk); + checkSingleTable(arrayTable, dest); // Make a table such that only the last page has different number of rows, all else have equal number final long NUM_ROWS = 1000; @@ -710,11 +713,14 @@ public void testWritingDifferentPageSizes() { "intArrays = (i <= 900) ? java.util.stream.IntStream.range(i, i+50).toArray() : " + "java.util.stream.IntStream.range(i, i+2).toArray()"); ParquetTools.writeTable(arrayTable, dest, writeInstructions); - fromDisk = ParquetTools.readTable(dest); + final Table fromDisk = readSingleFileTable(dest, EMPTY); // Access something on the last page to make sure we can read it final int[] data = (int[]) fromDisk.getColumnSource("intArrays").get(998); - assertTrue(data.length == 2 && data[0] == 998 && data[1] == 999); - TstUtils.assertTableEquals(arrayTable, fromDisk.select()); + assertNotNull(data); + assertEquals(2, data.length); + assertEquals(998, data[0]); + assertEquals(999, data[1]); + assertTableEquals(arrayTable, fromDisk); } // Following is used for testing both writing APIs for parquet tables @@ -931,18 +937,7 @@ public void legacyGroupingFileReadTest() { final File destFile = new File(path); // Read the legacy file and verify that grouping column is read correctly - final Table fromDisk; - try { - fromDisk = readSingleFileTable(destFile, EMPTY); - } catch (RuntimeException e) { - if (e.getCause() instanceof InvalidParquetFileException) { - final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + - "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + - "the repo to pull the files from LFS. Check cause of exception for more details."; - throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); - } - throw e; - } + final Table fromDisk = readParquetFileFromGitLFS(destFile); final String groupingColName = "gcol"; assertTrue(fromDisk.getDefinition().getColumn(groupingColName).isGrouping()); @@ -1435,18 +1430,8 @@ public void readWriteDateTimeTest() { public void verifyPyArrowStatistics() { final String path = ParquetTableReadWriteTest.class.getResource("/e0/pyarrow_stats.parquet").getFile(); final File pyarrowDest = new File(path); - final Table pyarrowFromDisk; - try { - pyarrowFromDisk = readSingleFileTable(pyarrowDest, EMPTY); - } catch (RuntimeException e) { - if (e.getCause() instanceof InvalidParquetFileException) { - final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + - "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + - "the repo to pull the files from LFS. Check cause of exception for more details."; - throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); - } - throw e; - } + final Table pyarrowFromDisk = readParquetFileFromGitLFS(pyarrowDest); + // Verify that our verification code works for a pyarrow generated table. assertTableStatistics(pyarrowFromDisk, pyarrowDest); From c244a97931b4a00df201c3d7ac107356d8217055 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 20 Nov 2023 17:36:50 -0600 Subject: [PATCH 04/12] Added comments to explain offset field better --- .../parquet/base/ColumnPageReaderImpl.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index c57acbb4964..d9c864900ea 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -54,11 +54,19 @@ public class ColumnPageReaderImpl implements ColumnPageReader { private final Path filePath; private final List fieldTypes; + /** + * Stores the offset from where the next byte should be read. Can be the offset of page header if + * {@link #pageHeader} is {@code null}, else will be the offset of data. + */ private long offset; private PageHeader pageHeader; private int numValues; private int rowCount = -1; + /** + * @param offset The offset for page header if supplied {@code pageHeader} is {@code null}. Else, the offset of data + * in the page. + */ ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider, CompressorAdapter compressorAdapter, Supplier dictionarySupplier, @@ -108,9 +116,17 @@ public IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder) throws IO } } + /** + * If {@link #pageHeader} is {@code null}, read it from the file and increment the {@link #offset} and file position + * by the length of page header. This method assumes that file position is set to {@link #offset} before calling. + * This method also read the number of values in the page from the header. + */ private synchronized void ensurePageHeader(SeekableByteChannel file) throws IOException { if (pageHeader == null) { - offset = file.position(); + if (file.position() != offset) { + throw new IllegalStateException("File position = " + file.position() + " not equal to expected offset =" + + offset); + } int maxHeader = START_HEADER; boolean success; do { From 838da52621621d33305f7160db55907ea70ee393 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 21 Nov 2023 15:07:51 -0600 Subject: [PATCH 05/12] Review part 2 --- .../parquet/base/ColumnChunkReader.java | 5 +- .../parquet/base/ColumnChunkReaderImpl.java | 46 +++++-------------- .../parquet/base/ColumnPageReaderImpl.java | 43 +++++++++-------- .../OffsetIndexBasedColumnChunkPageStore.java | 12 ++--- 4 files changed, 43 insertions(+), 63 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index e5727920612..7c647ae7e5e 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -45,14 +45,11 @@ interface ColumnPageReaderIterator extends Iterator, AutoClose */ ColumnPageReaderIterator getPageIterator() throws IOException; - interface ColumnPageDirectAccessor extends AutoCloseable { + interface ColumnPageDirectAccessor { /** * Directly access a page reader for a given page number. */ ColumnPageReader getPageReader(final int pageNum); - - @Override - void close(); } /** diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 69136cf08e0..8ea8b7940fd 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -101,10 +101,9 @@ public final OffsetIndex getOffsetIndex() { public ColumnPageReaderIterator getPageIterator() { final long dataPageOffset = columnChunk.meta_data.getData_page_offset(); if (offsetIndex == null) { - return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values(), - path, channelsProvider); + return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values()); } else { - return new ColumnPageReaderIteratorIndexImpl(path, channelsProvider); + return new ColumnPageReaderIteratorIndexImpl(); } } @@ -113,7 +112,7 @@ public final ColumnPageDirectAccessor getPageAccessor() { if (offsetIndex == null) { throw new UnsupportedOperationException("Cannot use direct accessor without offset index"); } - return new ColumnPageDirectAccessorImpl(path, channelsProvider); + return new ColumnPageDirectAccessorImpl(); } private Path getFilePath() { @@ -215,21 +214,13 @@ private Dictionary readDictionary(ReadableByteChannel file) throws IOException { return dictionaryPage.getEncoding().initDictionary(path, dictionaryPage); } - class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator { - private final SeekableChannelsProvider channelsProvider; + private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator { private long currentOffset; - private final ColumnDescriptor path; - private long remainingValues; - ColumnPageReaderIteratorImpl(final long startOffset, - final long numValues, - @NotNull final ColumnDescriptor path, - @NotNull final SeekableChannelsProvider channelsProvider) { + ColumnPageReaderIteratorImpl(final long startOffset, final long numValues) { this.remainingValues = numValues; this.currentOffset = startOffset; - this.path = path; - this.channelsProvider = channelsProvider; } @Override @@ -290,15 +281,10 @@ nullMaterializerFactory, path, getFilePath(), fieldTypes, readChannel.position() public void close() {} } - class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator { - private final SeekableChannelsProvider channelsProvider; + private final class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator { private int pos; - private final ColumnDescriptor path; - ColumnPageReaderIteratorIndexImpl(ColumnDescriptor path, - SeekableChannelsProvider channelsProvider) { - this.path = path; - this.channelsProvider = channelsProvider; + ColumnPageReaderIteratorIndexImpl() { pos = 0; } @@ -330,27 +316,19 @@ nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset( public void close() {} } - class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor { - private final SeekableChannelsProvider channelsProvider; - private final ColumnDescriptor path; + private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor { - ColumnPageDirectAccessorImpl(final ColumnDescriptor path, final SeekableChannelsProvider channelsProvider) { - this.path = path; - this.channelsProvider = channelsProvider; - } + ColumnPageDirectAccessorImpl() {} @Override public ColumnPageReader getPageReader(final int pageNum) { - if (pageNum > offsetIndex.getPageCount()) { - throw new NoSuchElementException( - "pageNum=" + pageNum + " > offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); + if (pageNum < 0 || pageNum >= offsetIndex.getPageCount()) { + throw new IndexOutOfBoundsException( + "pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); } final int numValues = -1; // Will be populated properly when we read the page header return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null, numValues); } - - @Override - public void close() {} } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index d9c864900ea..75e177726f5 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -148,6 +148,13 @@ private synchronized void ensurePageHeader(SeekableByteChannel file) throws IOEx } } while (!success); file.position(offset); + if (numValues >= 0) { + // Make sure the number of values are same as those in the header + if (numValues != readNumValuesFromPageHeader(pageHeader)) { + throw new IllegalStateException("numValues = " + numValues + " different from number of values " + + "read from the page header"); + } + } } ensureNumValues(); } @@ -585,34 +592,34 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val @Override public int numValues() throws IOException { - ensureNumValues(); - return numValues; + if (numValues > 0) { + return numValues; + } + try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { + readChannel.position(offset); + ensurePageHeader(readChannel); + // Above will automatically populate numValues + Assert.geq(numValues, "numValues", 0); + return numValues; + } } private void ensureNumValues() throws IOException { if (numValues >= 0) { return; } - if (pageHeader == null) { - try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); - ensurePageHeader(readChannel); - // Above will automatically populate numValues - Assert.geq(numValues, "numValues", 0); - return; - } - } Assert.neqNull(pageHeader, "pageHeader"); - switch (pageHeader.type) { + numValues = readNumValuesFromPageHeader(pageHeader); + } + + private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException { + switch (header.type) { case DATA_PAGE: - numValues = pageHeader.getData_page_header().getNum_values(); - break; + return header.getData_page_header().getNum_values(); case DATA_PAGE_V2: - numValues = pageHeader.getData_page_header_v2().getNum_values(); - break; + return header.getData_page_header_v2().getNum_values(); default: - throw new IOException( - String.format("Unexpected page of type {%s}", pageHeader.getType())); + throw new IOException(String.format("Unexpected page of type {%s}", header.getType())); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java index 8a24fd72bbf..94d620a4102 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -63,9 +63,10 @@ private final class PageState { } boolean isPageSizeFixed = true; final long firstPageSize = offsetIndex.getFirstRowIndex(1) - offsetIndex.getFirstRowIndex(0); - for (int i = 2; i < numPages && isPageSizeFixed; ++i) { + for (int i = 2; i < numPages; ++i) { if (offsetIndex.getFirstRowIndex(i) - offsetIndex.getFirstRowIndex(i - 1) != firstPageSize) { isPageSizeFixed = false; + break; } } fixedPageSize = isPageSizeFixed ? firstPageSize : PAGE_SIZE_NOT_FIXED; @@ -88,9 +89,9 @@ private static int findPageNumUsingOffsetIndex(final OffsetIndex offsetIndex, fi else if (midVal > row) high = mid - 1; else - return mid; // key found + return mid; // 'row' is the first row of page } - return -(low + 1); // key not found. + return (low - 1); // 'row' is somewhere in the middle of page } private ChunkPage getPage(final int pageNum) { @@ -129,13 +130,10 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, int pageNum; if (fixedPageSize == PAGE_SIZE_NOT_FIXED) { pageNum = findPageNumUsingOffsetIndex(offsetIndex, row); - if (pageNum < 0) { - pageNum = -2 - pageNum; - } } else { pageNum = (int) (row / fixedPageSize); if (pageNum >= numPages) { - // This can happen if the last page is of different size from rest of the pages. + // This can happen if the last page is larger than rest of the pages, which are all the same size. // We have already checked that row is less than numRows. Assert.assertion(row >= offsetIndex.getFirstRowIndex(numPages - 1), "row >= offsetIndex.getFirstRowIndex(numPages - 1)"); From 86449ddcc243de1a07bac068993d1cfdf6924085 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 21 Nov 2023 16:05:18 -0600 Subject: [PATCH 06/12] Review part 3 --- .../java/io/deephaven/parquet/base/ColumnPageReaderImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 75e177726f5..8393a8d6146 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -592,7 +592,7 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val @Override public int numValues() throws IOException { - if (numValues > 0) { + if (numValues >= 0) { return numValues; } try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { From fb74f9791be12cc884761423a25ef49fe9bdcf82 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 21 Nov 2023 17:47:59 -0600 Subject: [PATCH 07/12] Review part 4 --- .../OffsetIndexBasedColumnChunkPageStore.java | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java index 94d620a4102..a9c7b37f57f 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -19,30 +19,32 @@ import java.util.Arrays; import java.util.concurrent.atomic.AtomicReferenceArray; +/** + * A {@link ColumnChunkPageStore} that uses {@link OffsetIndex} to find the page containing a row. + */ final class OffsetIndexBasedColumnChunkPageStore extends ColumnChunkPageStore { private static final long PAGE_SIZE_NOT_FIXED = -1; + private static final class PageState { + private volatile WeakReference> pageRef; + + PageState() { + pageRef = null; // Initialized when used for the first time + } + } + private final OffsetIndex offsetIndex; private final int numPages; /** * Fixed number of rows per page. Set as positive value if first ({@link #numPages}-1) pages have equal number of - * rows, else equal to {@value #PAGE_SIZE_NOT_FIXED}. We don't care about the size of the last page, because we can - * assume all pages to be of the same size and calculate the page number as - * {@code row_index / fixed_page_size -> page_number}. If it is greater than {@link #numPages}, we can assume the - * row to be coming from last page. + * rows, else equal to {@value #PAGE_SIZE_NOT_FIXED}. We cannot find the number of rows in the last page size from + * offset index, because it only has the first row index of each page. And we don't want to materialize any pages. + * So as a workaround, in case first ({@link #numPages}-1) pages have equal size, we can assume all pages to be of + * the same size and calculate the page number as {@code row_index / fixed_page_size -> page_number}. If it is + * greater than {@link #numPages}, we will infer that the row is coming from last page. */ private final long fixedPageSize; - - private final class PageState { - WeakReference> pageRef; - - PageState() { - // Initialized when used for the first time - pageRef = null; - } - } - - private final AtomicReferenceArray pageStates; + private final AtomicReferenceArray> pageStates; private final ColumnChunkReader.ColumnPageDirectAccessor columnPageDirectAccessor; OffsetIndexBasedColumnChunkPageStore(@NotNull final PageCache pageCache, @@ -98,15 +100,19 @@ private ChunkPage getPage(final int pageNum) { if (pageNum < 0 || pageNum >= numPages) { throw new IllegalArgumentException("pageNum " + pageNum + " is out of range [0, " + numPages + ")"); } - PageCache.IntrusivePage page; - PageState pageState = pageStates.get(pageNum); - if (pageState == null) { - pageState = pageStates.updateAndGet(pageNum, p -> p == null ? new PageState() : p); + PageState pageState; + while ((pageState = pageStates.get(pageNum)) == null) { + pageState = new PageState<>(); + if (pageStates.weakCompareAndSetVolatile(pageNum, null, pageState)) { + break; + } } - if (pageState.pageRef == null || (page = pageState.pageRef.get()) == null) { + PageCache.IntrusivePage page; + WeakReference> localRef; + if ((localRef = pageState.pageRef) == null || (page = localRef.get()) == null) { synchronized (pageState) { // Make sure no one materialized this page as we waited for the lock - if (pageState.pageRef == null || (page = pageState.pageRef.get()) == null) { + if ((localRef = pageState.pageRef) == null || (page = localRef.get()) == null) { final ColumnPageReader reader = columnPageDirectAccessor.getPageReader(pageNum); try { page = new PageCache.IntrusivePage<>(toPage(offsetIndex.getFirstRowIndex(pageNum), reader)); From 36bdb458639eb3a415d37dc02355de352877f8a1 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 21 Nov 2023 18:14:27 -0600 Subject: [PATCH 08/12] Rewrote some tests --- .../table/ParquetTableReadWriteTest.java | 53 +++++++++++++------ ...ParquetFileWithDifferentPageSizes1.parquet | 3 ++ ...ParquetFileWithDifferentPageSizes2.parquet | 3 ++ 3 files changed, 42 insertions(+), 17 deletions(-) create mode 100644 extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes1.parquet create mode 100644 extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes2.parquet diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 97174b5863b..9592eb21848 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -695,32 +695,51 @@ public void testVersionChecks() { assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("1.3.0")); } + + /** + * Test if the parquet reading code can read pre-generated parquet files which have different number of rows in each + * page. Following is how these files are generated. + * + *
+     * Table arrayTable = TableTools.emptyTable(100).update(
+     *         "intArrays = java.util.stream.IntStream.range(0, i).toArray()").reverse();
+     * File dest = new File(rootFile, "ReferenceParquetFileWithDifferentPageSizes1.parquet");
+     * final ParquetInstructions writeInstructions = new ParquetInstructions.Builder()
+     *         .setTargetPageSize(ParquetInstructions.MIN_TARGET_PAGE_SIZE)
+     *         .build();
+     * ParquetTools.writeTable(arrayTable, dest, writeInstructions);
+     *
+     * arrayTable = TableTools.emptyTable(1000).update(
+     *         "intArrays = (i <= 900) ? java.util.stream.IntStream.range(i, i+50).toArray() : " +
+     *                 "java.util.stream.IntStream.range(i, i+2).toArray()");
+     * dest = new File(rootFile, "ReferenceParquetFileWithDifferentPageSizes2.parquet");
+     * ParquetTools.writeTable(arrayTable, dest, writeInstructions);
+     * 
+ */ @Test - public void testWritingDifferentPageSizes() { - // Make a table with arrays of decreasing sizes such that different pages will have different number of rows - Table arrayTable = TableTools.emptyTable(100).update( + public void testReadingParquetFilesWithDifferentPageSizes() { + Table expected = TableTools.emptyTable(100).update( "intArrays = java.util.stream.IntStream.range(0, i).toArray()").reverse(); - final File dest = new File(rootFile, "testWritingDifferentPageSizes.parquet"); - final ParquetInstructions writeInstructions = new ParquetInstructions.Builder() - .setTargetPageSize(ParquetInstructions.MIN_TARGET_PAGE_SIZE) - .build(); - ParquetTools.writeTable(arrayTable, dest, writeInstructions); - checkSingleTable(arrayTable, dest); + String path = ParquetTableReadWriteTest.class + .getResource("/ReferenceParquetFileWithDifferentPageSizes1.parquet").getFile(); + Table fromDisk = readParquetFileFromGitLFS(new File(path)); + assertTableEquals(expected, fromDisk); + + path = ParquetTableReadWriteTest.class + .getResource("/ReferenceParquetFileWithDifferentPageSizes2.parquet").getFile(); + fromDisk = readParquetFileFromGitLFS(new File(path)); - // Make a table such that only the last page has different number of rows, all else have equal number - final long NUM_ROWS = 1000; - arrayTable = TableTools.emptyTable(NUM_ROWS).update( - "intArrays = (i <= 900) ? java.util.stream.IntStream.range(i, i+50).toArray() : " + - "java.util.stream.IntStream.range(i, i+2).toArray()"); - ParquetTools.writeTable(arrayTable, dest, writeInstructions); - final Table fromDisk = readSingleFileTable(dest, EMPTY); // Access something on the last page to make sure we can read it final int[] data = (int[]) fromDisk.getColumnSource("intArrays").get(998); assertNotNull(data); assertEquals(2, data.length); assertEquals(998, data[0]); assertEquals(999, data[1]); - assertTableEquals(arrayTable, fromDisk); + + expected = TableTools.emptyTable(1000).update( + "intArrays = (i <= 900) ? java.util.stream.IntStream.range(i, i+50).toArray() : " + + "java.util.stream.IntStream.range(i, i+2).toArray()"); + assertTableEquals(expected, fromDisk); } // Following is used for testing both writing APIs for parquet tables diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes1.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes1.parquet new file mode 100644 index 00000000000..c864f329d45 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes1.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e180f3133320f8fc5174d2cfbab89b30ae735c0a6485b6d13a98a5aace5f8740 +size 5259 diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes2.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes2.parquet new file mode 100644 index 00000000000..4bad668401a --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes2.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e5e70548a903371acfc038629c4e5155a3ed5ed79cd4d59ee622750e7285b4cf +size 35751 From edc5312916c41170cdb0da077dc00d0de595ddc2 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 22 Nov 2023 11:17:40 -0600 Subject: [PATCH 09/12] Updated version format --- .../io/deephaven/parquet/table/metadata/TableInfo.java | 9 +++++++-- .../parquet/table/pagestore/ColumnChunkPageStore.java | 7 +++++-- .../parquet/table/ParquetTableReadWriteTest.java | 2 ++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java index 844164fb2ad..23f6bb7bffe 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java @@ -61,11 +61,16 @@ public final Map columnTypeMap() { } /** - * @return The Deephaven release version when this metadata format was updated + * @return The Deephaven release version used to write the parquet file */ @Value.Default public String version() { - return "0.31.0"; + final String version = TableInfo.class.getPackage().getImplementationVersion(); + if (version == null) { + // For unit tests + return "0.dev.0"; + } + return version; } /** diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java index 15c0a56e7f3..f36c93cbc64 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java @@ -73,13 +73,14 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)"); /** - * Check if the version is greater than or equal to 0.31.0 + * Check if the version is greater than or equal to 0.31.0, or it doesn't follow the versioning schema X.Y.Z */ @VisibleForTesting public static boolean satisfiesMinimumVersionRequirements(@NotNull final String version) { final Matcher matcher = VERSION_PATTERN.matcher(version); if (!matcher.matches()) { - throw new IllegalArgumentException("Malformed version:" + version); + // Could be unit tests or some other versioning scheme + return true; } final int major = Integer.parseInt(matcher.group(1)); final int minor = Integer.parseInt(matcher.group(2)); @@ -93,6 +94,8 @@ public static CreatorResult create( @NotNull final ToPage toPage, @NotNull final ColumnDefinition columnDefinition) throws IOException { final boolean canUseOffsetIndex = canUseOffsetIndexBasedPageStore(columnChunkReader, columnDefinition); + // TODO(deephaven-core#4879): Rather than this fall back logic for supporting incorrect offset index, we should + // instead log an error and explain to user how to fix the parquet file final ColumnChunkPageStore columnChunkPageStore = canUseOffsetIndex ? new OffsetIndexBasedColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage) : new VariablePageSizeColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 9592eb21848..671a4ed3a15 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -693,6 +693,8 @@ public void testVersionChecks() { assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.1")); assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.32.0")); assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("1.3.0")); + assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.dev.0")); + assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.0-SNAPSHOT")); } From 46c77d584bbc0b791a69d260bb8bf5851f15fa99 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 22 Nov 2023 12:42:18 -0600 Subject: [PATCH 10/12] Remaining review comments --- .../parquet/base/ColumnChunkReader.java | 7 +- .../parquet/base/ColumnChunkReaderImpl.java | 37 +++-- .../parquet/base/ColumnPageReaderImpl.java | 132 +++++++++--------- .../parquet/table/metadata/TableInfo.java | 2 +- .../table/pagestore/ColumnChunkPageStore.java | 4 +- .../OffsetIndexBasedColumnChunkPageStore.java | 16 +-- .../VariablePageSizeColumnChunkPageStore.java | 8 +- .../table/ParquetTableReadWriteTest.java | 22 ++- 8 files changed, 108 insertions(+), 120 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index 7c647ae7e5e..b9290e96407 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -35,15 +35,10 @@ public interface ColumnChunkReader { @Nullable OffsetIndex getOffsetIndex(); - interface ColumnPageReaderIterator extends Iterator, AutoCloseable { - @Override - void close(); - } - /** * @return An iterator over individual parquet pages */ - ColumnPageReaderIterator getPageIterator() throws IOException; + Iterator getPageIterator() throws IOException; interface ColumnPageDirectAccessor { /** diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 8ea8b7940fd..ffddffadab1 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -15,6 +15,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.format.*; import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -27,6 +28,7 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; +import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.function.Supplier; @@ -98,7 +100,7 @@ public final OffsetIndex getOffsetIndex() { } @Override - public ColumnPageReaderIterator getPageIterator() { + public Iterator getPageIterator() { final long dataPageOffset = columnChunk.meta_data.getData_page_offset(); if (offsetIndex == null) { return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values()); @@ -214,7 +216,7 @@ private Dictionary readDictionary(ReadableByteChannel file) throws IOException { return dictionaryPage.getEncoding().initDictionary(path, dictionaryPage); } - private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator { + private final class ColumnPageReaderIteratorImpl implements Iterator { private long currentOffset; private long remainingValues; @@ -231,7 +233,7 @@ public boolean hasNext() { @Override public ColumnPageReader next() { if (!hasNext()) { - throw new RuntimeException("No next element"); + throw new NoSuchElementException("No next element"); } // NB: The channels provider typically caches channels; this avoids maintaining a handle per column chunk try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(getFilePath())) { @@ -264,24 +266,21 @@ public ColumnPageReader next() { throw new UncheckedDeephavenException( "Unknown parquet data page header type " + pageHeader.type); } - final Supplier pageDictionarySupplier = - (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) - ? dictionarySupplier - : () -> NULL_DICTIONARY; - return new ColumnPageReaderImpl( - channelsProvider, decompressor, pageDictionarySupplier, + if ((encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) + && dictionarySupplier.get() == NULL_DICTIONARY) { + throw new ParquetDecodingException("Error in decoding page because dictionary data not found for " + + " column " + path + " with encoding " + encoding); + } + return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, path, getFilePath(), fieldTypes, readChannel.position(), pageHeader, - -1); + ColumnPageReaderImpl.NULL_NUM_VALUES); } catch (IOException e) { - throw new RuntimeException("Error reading page header", e); + throw new UncheckedDeephavenException("Error reading page header", e); } } - - @Override - public void close() {} } - private final class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator { + private final class ColumnPageReaderIteratorIndexImpl implements Iterator { private int pos; ColumnPageReaderIteratorIndexImpl() { @@ -311,9 +310,6 @@ nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset( pos++; return columnPageReader; } - - @Override - public void close() {} } private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor { @@ -326,9 +322,10 @@ public ColumnPageReader getPageReader(final int pageNum) { throw new IndexOutOfBoundsException( "pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); } - final int numValues = -1; // Will be populated properly when we read the page header + // Page header and number of values will be populated later when we read the page header from the file return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, - path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null, numValues); + path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null, + ColumnPageReaderImpl.NULL_NUM_VALUES); } } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 8393a8d6146..1e367b42758 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -45,6 +45,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader { private static final int MAX_HEADER = 8192; private static final int START_HEADER = 128; public static final int NULL_OFFSET = -1; + static final int NULL_NUM_VALUES = -1; private final SeekableChannelsProvider channelsProvider; private final CompressorAdapter compressorAdapter; @@ -64,8 +65,21 @@ public class ColumnPageReaderImpl implements ColumnPageReader { private int rowCount = -1; /** + * Returns a {@link ColumnPageReader} object for reading the column page data from the file. + * + * @param channelsProvider The provider for {@link SeekableByteChannel} for reading the file. + * @param compressorAdapter The adapter for decompressing the data. + * @param dictionarySupplier The supplier for dictionary data, set as {@link ColumnChunkReader#NULL_DICTIONARY} if + * page isn't dictionary encoded + * @param materializerFactory The factory for creating {@link PageMaterializer}. + * @param path The path of the column. + * @param filePath The path of the file. + * @param fieldTypes The types of the fields in the column. * @param offset The offset for page header if supplied {@code pageHeader} is {@code null}. Else, the offset of data - * in the page. + * following the header in the page. + * @param pageHeader The page header if it is already read from the file. Else, {@code null}. + * @param numValues The number of values in the page if it is already read from the file. Else, + * {@value #NULL_NUM_VALUES} */ ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider, CompressorAdapter compressorAdapter, @@ -92,7 +106,6 @@ public class ColumnPageReaderImpl implements ColumnPageReader { @Override public Object materialize(Object nullValue) throws IOException { try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); return readDataPage(nullValue, readChannel); } @@ -100,7 +113,6 @@ public Object materialize(Object nullValue) throws IOException { public int readRowCount() throws IOException { try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); return readRowCountFromDataPage(readChannel); } @@ -110,53 +122,66 @@ public int readRowCount() throws IOException { @Override public IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder) throws IOException { try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); return readKeyFromDataPage(keyDest, nullPlaceholder, readChannel); } } /** - * If {@link #pageHeader} is {@code null}, read it from the file and increment the {@link #offset} and file position - * by the length of page header. This method assumes that file position is set to {@link #offset} before calling. - * This method also read the number of values in the page from the header. + * If {@link #pageHeader} is {@code null}, read it from the file, and increment the {@link #offset} by the length of + * page header. Channel position would be set to the end of page header or beginning of data before returning. */ - private synchronized void ensurePageHeader(SeekableByteChannel file) throws IOException { - if (pageHeader == null) { - if (file.position() != offset) { - throw new IllegalStateException("File position = " + file.position() + " not equal to expected offset =" - + offset); - } - int maxHeader = START_HEADER; - boolean success; - do { - ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader); - file.read(headerBuffer); - headerBuffer.flip(); - ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer); - try { - pageHeader = Util.readPageHeader(bufferedIS); - offset += bufferedIS.position(); - success = true; - } catch (IOException e) { - success = false; - if (maxHeader > MAX_HEADER) { - throw e; + private void ensurePageHeader(final SeekableByteChannel file) throws IOException { + // Set this channel's position to appropriate offset for reading. If pageHeader is null, this offset would be + // the offset of page header, else it would be the offset of data. + file.position(offset); + synchronized (this) { + if (pageHeader == null) { + int maxHeader = START_HEADER; + boolean success; + do { + final ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader); + file.read(headerBuffer); + headerBuffer.flip(); + final ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer); + try { + pageHeader = Util.readPageHeader(bufferedIS); + offset += bufferedIS.position(); + success = true; + } catch (IOException e) { + success = false; + if (maxHeader > MAX_HEADER) { + throw e; + } + maxHeader <<= 1; + file.position(offset); + } + } while (!success); + file.position(offset); + if (numValues >= 0) { + final int numValuesFromHeader = readNumValuesFromPageHeader(pageHeader); + if (numValues != numValuesFromHeader) { + throw new IllegalStateException( + "numValues = " + numValues + " different from number of values " + + "read from the page header = " + numValuesFromHeader + " for column " + path); } - maxHeader *= 2; - file.position(offset); - } - } while (!success); - file.position(offset); - if (numValues >= 0) { - // Make sure the number of values are same as those in the header - if (numValues != readNumValuesFromPageHeader(pageHeader)) { - throw new IllegalStateException("numValues = " + numValues + " different from number of values " + - "read from the page header"); } } + if (numValues == NULL_NUM_VALUES) { + numValues = readNumValuesFromPageHeader(pageHeader); + } + } + } + + private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException { + switch (header.type) { + case DATA_PAGE: + return header.getData_page_header().getNum_values(); + case DATA_PAGE_V2: + return header.getData_page_header_v2().getNum_values(); + default: + throw new IOException(String.format("Unexpected page of type {%s}", header.getType())); } - ensureNumValues(); } private int readRowCountFromDataPage(ReadableByteChannel file) throws IOException { @@ -573,9 +598,8 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val if (dataEncoding.usesDictionary()) { final Dictionary dictionary = dictionarySupplier.get(); if (dictionary == ColumnChunkReader.NULL_DICTIONARY) { - throw new ParquetDecodingException( - "Could not read page in col " + path - + " as the dictionary was missing for encoding " + dataEncoding); + throw new ParquetDecodingException("Could not read page in col " + path + " as the dictionary was " + + "missing for encoding " + dataEncoding); } dataReader = new DictionaryValuesReader(dictionary); } else { @@ -596,33 +620,13 @@ public int numValues() throws IOException { return numValues; } try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); - // Above will automatically populate numValues - Assert.geq(numValues, "numValues", 0); + // Above will block till it populates numValues + Assert.geqZero(numValues, "numValues"); return numValues; } } - private void ensureNumValues() throws IOException { - if (numValues >= 0) { - return; - } - Assert.neqNull(pageHeader, "pageHeader"); - numValues = readNumValuesFromPageHeader(pageHeader); - } - - private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException { - switch (header.type) { - case DATA_PAGE: - return header.getData_page_header().getNum_values(); - case DATA_PAGE_V2: - return header.getData_page_header_v2().getNum_values(); - default: - throw new IOException(String.format("Unexpected page of type {%s}", header.getType())); - } - } - @NotNull @Override public Dictionary getDictionary() { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java index 23f6bb7bffe..5e228456fdd 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java @@ -68,7 +68,7 @@ public String version() { final String version = TableInfo.class.getPackage().getImplementationVersion(); if (version == null) { // For unit tests - return "0.dev.0"; + return "unknown"; } return version; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java index f36c93cbc64..dcd7677ccf5 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java @@ -65,7 +65,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun // part of deephaven-core#4844 final Class columnType = columnDefinition.getDataType(); if (columnType.isArray() || Vector.class.isAssignableFrom(columnType)) { - return satisfiesMinimumVersionRequirements(version); + return hasCorrectVectorOffsetIndexes(version); } return true; } @@ -76,7 +76,7 @@ private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChun * Check if the version is greater than or equal to 0.31.0, or it doesn't follow the versioning schema X.Y.Z */ @VisibleForTesting - public static boolean satisfiesMinimumVersionRequirements(@NotNull final String version) { + public static boolean hasCorrectVectorOffsetIndexes(@NotNull final String version) { final Matcher matcher = VERSION_PATTERN.matcher(version); if (!matcher.matches()) { // Could be unit tests or some other versioning scheme diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java index a9c7b37f57f..62f0f22cfa0 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -53,9 +53,9 @@ private static final class PageState { @NotNull final ToPage toPage) throws IOException { super(pageCache, columnChunkReader, mask, toPage); offsetIndex = columnChunkReader.getOffsetIndex(); - Assert.assertion(offsetIndex != null, "offsetIndex != null"); + Assert.neqNull(offsetIndex, "offsetIndex"); numPages = offsetIndex.getPageCount(); - Assert.assertion(numPages > 0, "numPages > 0"); + Assert.gtZero(numPages, "numPages"); pageStates = new AtomicReferenceArray<>(numPages); columnPageDirectAccessor = columnChunkReader.getPageAccessor(); @@ -85,13 +85,13 @@ private static int findPageNumUsingOffsetIndex(final OffsetIndex offsetIndex, fi while (low <= high) { final int mid = (low + high) >>> 1; final long midVal = offsetIndex.getFirstRowIndex(mid); - - if (midVal < row) + if (midVal < row) { low = mid + 1; - else if (midVal > row) + } else if (midVal > row) { high = mid - 1; - else + } else { return mid; // 'row' is the first row of page + } } return (low - 1); // 'row' is somewhere in the middle of page } @@ -141,8 +141,8 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, if (pageNum >= numPages) { // This can happen if the last page is larger than rest of the pages, which are all the same size. // We have already checked that row is less than numRows. - Assert.assertion(row >= offsetIndex.getFirstRowIndex(numPages - 1), - "row >= offsetIndex.getFirstRowIndex(numPages - 1)"); + Assert.geq(row, "row", offsetIndex.getFirstRowIndex(numPages - 1), + "offsetIndex.getFirstRowIndex(numPages - 1)"); pageNum = (numPages - 1); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java index e9568936d4c..9975ebdbb5d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java @@ -16,6 +16,7 @@ import java.io.UncheckedIOException; import java.lang.ref.WeakReference; import java.util.Arrays; +import java.util.Iterator; final class VariablePageSizeColumnChunkPageStore extends ColumnChunkPageStore { @@ -27,7 +28,7 @@ final class VariablePageSizeColumnChunkPageStore extends Colum private volatile int numPages = 0; private volatile long[] pageRowOffsets; private volatile ColumnPageReader[] columnPageReaders; - private final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator; + private final Iterator columnPageReaderIterator; private volatile WeakReference>[] pages; VariablePageSizeColumnChunkPageStore(@NotNull final PageCache pageCache, @@ -160,9 +161,4 @@ public ChunkPage getPageContaining(@NotNull final FillContext fillContext, return getPage(pageNum); } - - @Override - public void close() { - columnPageReaderIterator.close(); - } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 671a4ed3a15..1ea62f13652 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -682,19 +682,15 @@ public void testReadOldParquetData() { @Test public void testVersionChecks() { - assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.0.0")); - assertFalse(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.4.0")); - try { - ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.3"); - TestCase.fail("Exception expected for invalid version string"); - } catch (IllegalArgumentException expected) { - } - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.0")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.1")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.32.0")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("1.3.0")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.dev.0")); - assertTrue(ColumnChunkPageStore.satisfiesMinimumVersionRequirements("0.31.0-SNAPSHOT")); + assertFalse(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.0.0")); + assertFalse(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.4.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.3")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.31.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.31.1")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.32.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("1.3.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("unknown")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.31.0-SNAPSHOT")); } From 405a6791142b0b58869b634e4f9550863445ebdd Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 22 Nov 2023 16:11:58 -0600 Subject: [PATCH 11/12] Updated comments --- .../java/io/deephaven/parquet/table/metadata/TableInfo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java index 5e228456fdd..fb6b8da2002 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java @@ -67,7 +67,7 @@ public final Map columnTypeMap() { public String version() { final String version = TableInfo.class.getPackage().getImplementationVersion(); if (version == null) { - // For unit tests + // When the code is run from class files as opposed to jars, like in unit tests return "unknown"; } return version; From 6b4aa24c5b86e3a8339bbcbc237daf3f3b26c5bf Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 22 Nov 2023 16:58:14 -0600 Subject: [PATCH 12/12] Remaining comments --- .../deephaven/parquet/base/ColumnChunkReaderImpl.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index ffddffadab1..8de3d8b9281 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -266,12 +266,11 @@ public ColumnPageReader next() { throw new UncheckedDeephavenException( "Unknown parquet data page header type " + pageHeader.type); } - if ((encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) - && dictionarySupplier.get() == NULL_DICTIONARY) { - throw new ParquetDecodingException("Error in decoding page because dictionary data not found for " + - " column " + path + " with encoding " + encoding); - } - return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, + final Supplier pageDictionarySupplier = + (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) + ? dictionarySupplier + : () -> NULL_DICTIONARY; + return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier, nullMaterializerFactory, path, getFilePath(), fieldTypes, readChannel.position(), pageHeader, ColumnPageReaderImpl.NULL_NUM_VALUES); } catch (IOException e) {