Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ private static void addKeyValue(FileMetaData fileMetaData, String key, String va
private static interface MetadataFilterVisitor<T, E extends Throwable> {
T visit(NoFilter filter) throws E;
T visit(SkipMetadataFilter filter) throws E;
T visit(RangeMetadataFilter filter) throws E;
T visit(OffsetMetadataFilter filter) throws E;
}

Expand All @@ -452,7 +453,7 @@ public static MetadataFilter offsets(long... offsets) {
for (long offset : offsets) {
set.add(offset);
}
return new OffsetListMetadataFilter(set);
return new OffsetMetadataFilter(set);
}

private static final class NoFilter extends MetadataFilter {
Expand All @@ -478,16 +479,12 @@ public String toString() {
}
}

interface OffsetMetadataFilter {
boolean contains(long offset);
}

/**
* [ startOffset, endOffset )
* @author Julien Le Dem
*/
// Visible for testing
static final class RangeMetadataFilter extends MetadataFilter implements OffsetMetadataFilter {
static final class RangeMetadataFilter extends MetadataFilter {
final long startOffset;
final long endOffset;

Expand All @@ -502,7 +499,6 @@ <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E
return visitor.visit(this);
}

@Override
public boolean contains(long offset) {
return offset >= this.startOffset && offset < this.endOffset;
}
Expand All @@ -513,10 +509,10 @@ public String toString() {
}
}

static final class OffsetListMetadataFilter extends MetadataFilter implements OffsetMetadataFilter {
static final class OffsetMetadataFilter extends MetadataFilter {
private final Set<Long> offsets;

public OffsetListMetadataFilter(Set<Long> offsets) {
public OffsetMetadataFilter(Set<Long> offsets) {
this.offsets = offsets;
}

Expand All @@ -536,7 +532,7 @@ public ParquetMetadata readParquetMetadata(InputStream from) throws IOException
}

// Visible for testing
static FileMetaData filterFileMetaData(FileMetaData metaData, OffsetMetadataFilter filter) {
static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
for (RowGroup rowGroup : rowGroups) {
Expand All @@ -555,6 +551,19 @@ static FileMetaData filterFileMetaData(FileMetaData metaData, OffsetMetadataFilt
}

// Visible for testing
static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
for (RowGroup rowGroup : rowGroups) {
long startIndex = getOffset(rowGroup.getColumns().get(0));
if (filter.contains(startIndex)) {
newRowGroups.add(rowGroup);
}
}
metaData.setRow_groups(newRowGroups);
return metaData;
}

static long getOffset(RowGroup rowGroup) {
return getOffset(rowGroup.getColumns().get(0));
}
Expand Down Expand Up @@ -582,7 +591,12 @@ public FileMetaData visit(SkipMetadataFilter filter) throws IOException {

@Override
public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
return filterFileMetaData(readFileMetaData(from), filter);
return filterFileMetaDataByStart(readFileMetaData(from), filter);
}

@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
}
});
if (Log.DEBUG) LOG.debug(fileMetaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.parquet.format.converter;

import static java.util.Collections.emptyList;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.filterFileMetaDataByStart;
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
Expand All @@ -27,7 +28,7 @@
import static org.apache.parquet.format.Type.INT32;
import static org.apache.parquet.format.Util.readPageHeader;
import static org.apache.parquet.format.Util.writePageHeader;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.filterFileMetaData;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.filterFileMetaDataByMidpoint;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.getOffset;

import java.io.ByteArrayInputStream;
Expand All @@ -43,6 +44,7 @@
import java.util.Set;
import java.util.TreeSet;

import com.google.common.collect.Sets;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
Expand Down Expand Up @@ -170,7 +172,20 @@ private FileMetaData metadata(long... sizes) {
}

private FileMetaData filter(FileMetaData md, long start, long end) {
return filterFileMetaData(new FileMetaData(md), new ParquetMetadataConverter.RangeMetadataFilter(start, end));
return filterFileMetaDataByMidpoint(new FileMetaData(md),
new ParquetMetadataConverter.RangeMetadataFilter(start, end));
}

private FileMetaData find(FileMetaData md, Long... blockStart) {
return filterFileMetaDataByStart(new FileMetaData(md),
new ParquetMetadataConverter.OffsetMetadataFilter(
Sets.newHashSet((Long[]) blockStart)));
}

private FileMetaData find(FileMetaData md, long blockStart) {
return filterFileMetaDataByStart(new FileMetaData(md),
new ParquetMetadataConverter.OffsetMetadataFilter(
Sets.newHashSet(blockStart)));
}

private void verifyMD(FileMetaData md, long... offsets) {
Expand Down Expand Up @@ -242,6 +257,18 @@ public void testFilterMetaData() {
verifyAllFilters(metadata(11, 9, 10), 8);
}

@Test
public void testFindRowGroups() {
verifyMD(find(metadata(50, 50, 50), 0), 0);
verifyMD(find(metadata(50, 50, 50), 50), 50);
verifyMD(find(metadata(50, 50, 50), 100), 100);
verifyMD(find(metadata(50, 50, 50), 0L, 50L), 0, 50);
verifyMD(find(metadata(50, 50, 50), 0L, 50L, 100L), 0, 50, 100);
verifyMD(find(metadata(50, 50, 50), 50L, 100L), 50, 100);
// doesn't find an offset that isn't the start of a row group.
verifyMD(find(metadata(50, 50, 50), 10));
}

@Test
public void randomTestFilterMetaData() {
// randomized property based testing
Expand Down