Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added offset index based parquet reading support #4844

Merged
merged 13 commits into from
Nov 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@
package io.deephaven.parquet.base;

import org.apache.parquet.column.Dictionary;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.schema.PrimitiveType;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Supplier;

public interface ColumnChunkReader {
/**
* @return -1 if the current column doesn't guarantee fixed page size, otherwise the fixed page size
*/
int getPageFixedSize();

/**
* @return The number of rows in this ColumnChunk, or -1 if it's unknown.
*/
Expand All @@ -32,16 +29,34 @@ public interface ColumnChunkReader {
*/
int getMaxRl();

/**
* @return The offset index for this column chunk, or null if it not found in the metadata.
*/
@Nullable
OffsetIndex getOffsetIndex();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

interface ColumnPageReaderIterator extends Iterator<ColumnPageReader>, AutoCloseable {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@Override
void close() throws IOException;
void close();
}

/**
* @return An iterator over individual parquet pages
*/
ColumnPageReaderIterator getPageIterator() throws IOException;

interface ColumnPageDirectAccessor {
/**
* Directly access a page reader for a given page number.
*/
ColumnPageReader getPageReader(final int pageNum);
}

/**
* @return An accessor for individual parquet pages
*/
ColumnPageDirectAccessor getPageAccessor();

/**
* @return Whether this column chunk uses a dictionary-based encoding on every page
*/
Expand Down Expand Up @@ -69,4 +84,10 @@ public int getMaxId() {
}

PrimitiveType getType();

/**
* @return The "version" string from deephaven specific parquet metadata, or null if it's not present.
*/
@Nullable
String getVersion();
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Supplier;

import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
Expand All @@ -46,10 +47,18 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {
private final PageMaterializer.Factory nullMaterializerFactory;

private Path filePath;

ColumnChunkReaderImpl(
ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider,
Path rootPath, MessageType type, OffsetIndex offsetIndex, List<Type> fieldTypes) {
/**
* Number of rows in the row group of this column chunk.
*/
private final long numRows;
/**
* Version string from deephaven specific parquet metadata, or null if it's not present.
*/
private final String version;

ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, Path rootPath,
MessageType type, OffsetIndex offsetIndex, List<Type> fieldTypes, final long numRows,
final String version) {
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.rootPath = rootPath;
Expand All @@ -65,16 +74,13 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {
this.fieldTypes = fieldTypes;
this.dictionarySupplier = new LazyCachingSupplier<>(this::getDictionary);
this.nullMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType().getPrimitiveTypeName());
}

@Override
public int getPageFixedSize() {
return -1;
this.numRows = numRows;
this.version = version;
}

@Override
public long numRows() {
return numValues();
return numRows;
}

@Override
Expand All @@ -87,17 +93,28 @@ public int getMaxRl() {
return path.getMaxRepetitionLevel();
}

public final OffsetIndex getOffsetIndex() {
return offsetIndex;
}

@Override
public ColumnPageReaderIterator getPageIterator() {
final long dataPageOffset = columnChunk.meta_data.getData_page_offset();
if (offsetIndex == null) {
return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values(),
path, channelsProvider);
return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values());
} else {
return new ColumnPageReaderIteratorIndexImpl(path, channelsProvider);
return new ColumnPageReaderIteratorIndexImpl();
}
}

@Override
public final ColumnPageDirectAccessor getPageAccessor() {
if (offsetIndex == null) {
throw new UnsupportedOperationException("Cannot use direct accessor without offset index");
}
return new ColumnPageDirectAccessorImpl();
}

private Path getFilePath() {
if (filePath != null) {
return filePath;
Expand Down Expand Up @@ -166,6 +183,11 @@ public PrimitiveType getType() {
return path.getPrimitiveType();
}

@Override
public String getVersion() {
return version;
}

@NotNull
private Dictionary readDictionary(ReadableByteChannel file) throws IOException {
// explicitly not closing this, caller is responsible
Expand All @@ -192,21 +214,13 @@ private Dictionary readDictionary(ReadableByteChannel file) throws IOException {
return dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
}

class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator {
private final SeekableChannelsProvider channelsProvider;
private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator {
private long currentOffset;
private final ColumnDescriptor path;

private long remainingValues;

ColumnPageReaderIteratorImpl(final long startOffset,
final long numValues,
@NotNull final ColumnDescriptor path,
@NotNull final SeekableChannelsProvider channelsProvider) {
ColumnPageReaderIteratorImpl(final long startOffset, final long numValues) {
this.remainingValues = numValues;
this.currentOffset = startOffset;
this.path = path;
this.channelsProvider = channelsProvider;
}

@Override
Expand Down Expand Up @@ -267,15 +281,10 @@ nullMaterializerFactory, path, getFilePath(), fieldTypes, readChannel.position()
public void close() {}
}

class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator {
private final SeekableChannelsProvider channelsProvider;
private final class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator {
private int pos;
private final ColumnDescriptor path;

ColumnPageReaderIteratorIndexImpl(ColumnDescriptor path,
SeekableChannelsProvider channelsProvider) {
this.path = path;
this.channelsProvider = channelsProvider;
ColumnPageReaderIteratorIndexImpl() {
pos = 0;
}

Expand All @@ -287,20 +296,39 @@ public boolean hasNext() {
@Override
public ColumnPageReader next() {
if (!hasNext()) {
throw new RuntimeException("No next element");
throw new NoSuchElementException("No next element");
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}
int rowCount =
(int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values())
- offsetIndex.getFirstRowIndex(pos) + 1);
// Following logic assumes that offsetIndex will store the number of values for a page instead of number
// of rows (which can be different for array and vector columns). This behavior is because of a bug on
// parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading
// parquet files written before deephaven-core/pull/4844.
final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values())
- offsetIndex.getFirstRowIndex(pos) + 1);
ColumnPageReaderImpl columnPageReader =
new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier,
nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset(pos), null,
rowCount);
numValues);
pos++;
return columnPageReader;
}

@Override
public void close() throws IOException {}
public void close() {}
}

private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {

ColumnPageDirectAccessorImpl() {}

@Override
public ColumnPageReader getPageReader(final int pageNum) {
if (pageNum < 0 || pageNum >= offsetIndex.getPageCount()) {
throw new IndexOutOfBoundsException(
"pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount());
}
final int numValues = -1; // Will be populated properly when we read the page header
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory,
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null, numValues);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,19 @@ public class ColumnPageReaderImpl implements ColumnPageReader {
private final Path filePath;
private final List<Type> fieldTypes;

/**
* Stores the offset from where the next byte should be read. Can be the offset of page header if
* {@link #pageHeader} is {@code null}, else will be the offset of data.
*/
private long offset;
private PageHeader pageHeader;
private int numValues;
private int rowCount = -1;

/**
* @param offset The offset for page header if supplied {@code pageHeader} is {@code null}. Else, the offset of data
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* in the page.
*/
ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider,
CompressorAdapter compressorAdapter,
Supplier<Dictionary> dictionarySupplier,
Expand Down Expand Up @@ -108,9 +116,17 @@ public IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder) throws IO
}
}

/**
* If {@link #pageHeader} is {@code null}, read it from the file and increment the {@link #offset} and file position
* by the length of page header. This method assumes that file position is set to {@link #offset} before calling.
* This method also read the number of values in the page from the header.
*/
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
private synchronized void ensurePageHeader(SeekableByteChannel file) throws IOException {
if (pageHeader == null) {
offset = file.position();
if (file.position() != offset) {
throw new IllegalStateException("File position = " + file.position() + " not equal to expected offset ="
+ offset);
}
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
int maxHeader = START_HEADER;
boolean success;
do {
Expand All @@ -132,6 +148,13 @@ private synchronized void ensurePageHeader(SeekableByteChannel file) throws IOEx
}
} while (!success);
file.position(offset);
if (numValues >= 0) {
// Make sure the number of values are same as those in the header
if (numValues != readNumValuesFromPageHeader(pageHeader)) {
throw new IllegalStateException("numValues = " + numValues + " different from number of values " +
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
"read from the page header");
}
}
}
ensureNumValues();
}
Expand Down Expand Up @@ -569,25 +592,34 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val

@Override
public int numValues() throws IOException {
ensureNumValues();
return numValues;
if (numValues > 0) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return numValues;
}
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) {
readChannel.position(offset);
ensurePageHeader(readChannel);
// Above will automatically populate numValues
Assert.geq(numValues, "numValues", 0);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return numValues;
}
}

private void ensureNumValues() throws IOException {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
if (numValues >= 0) {
return;
}
Assert.neqNull(pageHeader, "pageHeader");
switch (pageHeader.type) {
numValues = readNumValuesFromPageHeader(pageHeader);
}

private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException {
switch (header.type) {
case DATA_PAGE:
numValues = pageHeader.getData_page_header().getNum_values();
break;
return header.getData_page_header().getNum_values();
case DATA_PAGE_V2:
numValues = pageHeader.getData_page_header_v2().getNum_values();
break;
return header.getData_page_header_v2().getNum_values();
default:
throw new IOException(
String.format("Unexpected page of type {%s}", pageHeader.getType()));
throw new IOException(String.format("Unexpected page of type {%s}", header.getType()));
}
}

Expand Down
Loading
Loading