Skip to content

Commit

Permalink
Parse only required row groups from parquet footer
Browse files Browse the repository at this point in the history
  • Loading branch information
jinyangli34 authored and raunaqmorarka committed Jan 3, 2025
1 parent 8bf43b5 commit 2ef6dc1
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public String toString()

public List<BlockMetadata> getBlocks()
throws ParquetCorruptionException
{
return getBlocks(0, Long.MAX_VALUE);
}

public List<BlockMetadata> getBlocks(long splitStart, long splitLength)
throws ParquetCorruptionException
{
List<SchemaElement> schema = parquetMetadata.getSchema();
validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty");
Expand All @@ -99,6 +105,14 @@ public List<BlockMetadata> getBlocks()
List<RowGroup> rowGroups = parquetMetadata.getRow_groups();
if (rowGroups != null) {
for (RowGroup rowGroup : rowGroups) {
if (rowGroup.isSetFile_offset()) {
long rowGroupStart = rowGroup.getFile_offset();
boolean splitContainsRowGroup = splitStart <= rowGroupStart && rowGroupStart < splitStart + splitLength;
if (!splitContainsRowGroup) {
continue;
}
}

List<ColumnChunk> columns = rowGroup.getColumns();
validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup);
String filePath = columns.get(0).getFile_path();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ColumnChunkMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.metadata.PrunedBlockMetadata;
import io.trino.parquet.reader.RowGroupInfo;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -183,7 +184,7 @@ public static List<RowGroupInfo> getFilteredRowGroups(
long splitStart,
long splitLength,
ParquetDataSource dataSource,
List<BlockMetadata> blocksMetaData,
ParquetMetadata parquetMetadata,
List<TupleDomain<ColumnDescriptor>> parquetTupleDomains,
List<TupleDomainParquetPredicate> parquetPredicates,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
Expand All @@ -194,7 +195,7 @@ public static List<RowGroupInfo> getFilteredRowGroups(
{
long fileRowCount = 0;
ImmutableList.Builder<RowGroupInfo> rowGroupInfoBuilder = ImmutableList.builder();
for (BlockMetadata block : blocksMetaData) {
for (BlockMetadata block : parquetMetadata.getBlocks(splitStart, splitLength)) {
long blockStart = block.getStartingPos();
boolean splitContainsBlock = splitStart <= blockStart && blockStart < splitStart + splitLength;
if (splitContainsBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public static ParquetReader createParquetReader(
0,
input.getEstimatedSize(),
input,
parquetMetadata.getBlocks(),
parquetMetadata,
ImmutableList.of(parquetTupleDomain),
ImmutableList.of(parquetPredicate),
descriptorsByPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.spi.Page;
Expand Down Expand Up @@ -186,6 +187,44 @@ public void testBackwardsCompatibleRepeatedPrimitiveFieldDefinedAsPrimitive()
.isInstanceOf(TrinoException.class);
}

@Test
void testReadMetadataWithSplitOffset()
throws IOException
{
// Write a file with 100 rows per row-group
List<String> columnNames = ImmutableList.of("columna", "columnb");
List<Type> types = ImmutableList.of(INTEGER, BIGINT);

ParquetDataSource dataSource = new TestingParquetDataSource(
writeParquetFile(
ParquetWriterOptions.builder()
.setMaxBlockSize(DataSize.ofBytes(1000))
.build(),
types,
columnNames,
generateInputPages(types, 100, 5)),
new ParquetReaderOptions());

// Read both columns, 1 row group
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
List<BlockMetadata> columnBlocks = parquetMetadata.getBlocks(0, 800);
assertThat(columnBlocks.size()).isEqualTo(1);
assertThat(columnBlocks.getFirst().columns().size()).isEqualTo(2);
assertThat(columnBlocks.getFirst().rowCount()).isEqualTo(100);

// Read both columns, half row groups
parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
columnBlocks = parquetMetadata.getBlocks(0, 2500);
assertThat(columnBlocks.stream().allMatch(block -> block.columns().size() == 2)).isTrue();
assertThat(columnBlocks.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(300);

// Read both columns, all row groups
parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
columnBlocks = parquetMetadata.getBlocks();
assertThat(columnBlocks.stream().allMatch(block -> block.columns().size() == 2)).isTrue();
assertThat(columnBlocks.stream().mapToLong(BlockMetadata::rowCount).sum()).isEqualTo(500);
}

private void testReadingOldParquetFiles(File file, List<String> columnNames, Type columnType, List<?> expectedValues)
throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public static ReaderPageSource createPageSource(
start,
length,
dataSource,
parquetMetadata.getBlocks(),
parquetMetadata,
parquetTupleDomains,
parquetPredicates,
descriptorsByPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private static ConnectorPageSource createPageSource(
start,
length,
dataSource,
parquetMetadata.getBlocks(),
parquetMetadata,
ImmutableList.of(parquetTupleDomain),
ImmutableList.of(parquetPredicate),
descriptorsByPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource(
start,
length,
dataSource,
parquetMetadata.getBlocks(),
parquetMetadata,
ImmutableList.of(parquetTupleDomain),
ImmutableList.of(parquetPredicate),
descriptorsByPath,
Expand Down

0 comments on commit 2ef6dc1

Please sign in to comment.