Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import parquet.Log;
import parquet.common.schema.ColumnPath;
import parquet.format.ColumnChunk;
import parquet.format.ColumnMetaData;
import parquet.format.ConvertedType;
import parquet.format.DataPageHeader;
import parquet.format.DictionaryPageHeader;
Expand All @@ -58,7 +59,7 @@
import parquet.schema.Type.Repetition;
import parquet.schema.TypeVisitor;
import parquet.schema.Types;

import static java.lang.Math.min;
import static parquet.format.Util.readFileMetaData;
import static parquet.format.Util.writePageHeader;

Expand Down Expand Up @@ -340,8 +341,124 @@ private void addKeyValue(FileMetaData fileMetaData, String key, String value) {
fileMetaData.addToKey_value_metadata(keyValue);
}

private static interface MetadataFilterVisitor<T, E extends Throwable> {
T visit(NoFilter filter) throws E;
T visit(SkipMetadataFilter filter) throws E;
T visit(RangeMetadataFilter filter) throws E;
}

public abstract static class MetadataFilter {
private MetadataFilter() {}
abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
}
public static final MetadataFilter NO_FILTER = new NoFilter();
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
/**
* [ startOffset, endOffset )
* @param startOffset
* @param endOffset
* @return the filter
*/
public static final MetadataFilter range(long startOffset, long endOffset) {
return new RangeMetadataFilter(startOffset, endOffset);
}
private static final class NoFilter extends MetadataFilter {
private NoFilter() {}
@Override
<T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
return visitor.visit(this);
}
@Override
public String toString() {
return "NO_FILTER";
}
}
private static final class SkipMetadataFilter extends MetadataFilter {
private SkipMetadataFilter() {}
@Override
<T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
return visitor.visit(this);
}
@Override
public String toString() {
return "SKIP_ROW_GROUPS";
}
}
/**
* [ startOffset, endOffset )
* @author Julien Le Dem
*/
static final class RangeMetadataFilter extends MetadataFilter {
final long startOffset;
final long endOffset;
RangeMetadataFilter(long startOffset, long endOffset) {
super();
this.startOffset = startOffset;
this.endOffset = endOffset;
}
@Override
<T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
return visitor.visit(this);
}
boolean contains(long offset) {
return offset >= this.startOffset && offset < this.endOffset;
}
@Override
public String toString() {
return "range(s:" + startOffset + ", e:" + endOffset + ")";
}
}

@Deprecated
public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
FileMetaData fileMetaData = readFileMetaData(from);
return readParquetMetadata(from, NO_FILTER);
}

static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
for (RowGroup rowGroup : rowGroups) {
long totalSize = 0;
long startIndex = getOffset(rowGroup.getColumns().get(0));
for (ColumnChunk col : rowGroup.getColumns()) {
totalSize += col.getMeta_data().getTotal_compressed_size();
}
long midPoint = startIndex + totalSize / 2;
if (filter.contains(midPoint)) {
newRowGroups.add(rowGroup);
}
}
metaData.setRow_groups(newRowGroups);
return metaData;
}

static long getOffset(RowGroup rowGroup) {
return getOffset(rowGroup.getColumns().get(0));
}
static long getOffset(ColumnChunk columnChunk) {
ColumnMetaData md = columnChunk.getMeta_data();
long offset = md.getData_page_offset();
if (md.isSetDictionary_page_offset() && offset > md.getDictionary_page_offset()) {
offset = md.getDictionary_page_offset();
}
return offset;
}

public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
@Override
public FileMetaData visit(NoFilter filter) throws IOException {
return readFileMetaData(from);
}
@Override
public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
return readFileMetaData(from, true);
}
@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return filterFileMetaData(readFileMetaData(from), filter);
}
});
if (Log.DEBUG) LOG.debug(fileMetaData);
ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
Expand All @@ -352,37 +469,39 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
for (RowGroup rowGroup : row_groups) {
BlockMetaData blockMetaData = new BlockMetaData();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
List<ColumnChunk> columns = rowGroup.getColumns();
String filePath = columns.get(0).getFile_path();
for (ColumnChunk columnChunk : columns) {
if ((filePath == null && columnChunk.getFile_path() != null)
|| (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
if (row_groups != null) {
for (RowGroup rowGroup : row_groups) {
BlockMetaData blockMetaData = new BlockMetaData();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
List<ColumnChunk> columns = rowGroup.getColumns();
String filePath = columns.get(0).getFile_path();
for (ColumnChunk columnChunk : columns) {
if ((filePath == null && columnChunk.getFile_path() != null)
|| (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
}
parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
ColumnPath path = getPath(metaData);
ColumnChunkMetaData column = ColumnChunkMetaData.get(
path,
messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
CompressionCodecName.fromParquet(metaData.codec),
fromFormatEncodings(metaData.encodings),
fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
// TODO
// index_page_offset
// key_value_metadata
blockMetaData.addColumn(column);
}
parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
ColumnPath path = getPath(metaData);
ColumnChunkMetaData column = ColumnChunkMetaData.get(
path,
messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
CompressionCodecName.fromParquet(metaData.codec),
fromFormatEncodings(metaData.encodings),
fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
metaData.total_compressed_size,
metaData.total_uncompressed_size);
// TODO
// index_page_offset
// key_value_metadata
blockMetaData.addColumn(column);
blockMetaData.setPath(filePath);
blocks.add(blockMetaData);
}
blockMetaData.setPath(filePath);
blocks.add(blockMetaData);
}
Map<String, String> keyValueMetaData = new HashMap<String, String>();
List<KeyValue> key_value_metadata = parquetMetadata.getKey_value_metadata();
Expand Down
Loading