Skip to content

Optimize Datasource Node for Iceberg Tables #10

@frisbeeman

Description

@frisbeeman

Overview

Optimize the DataFusion datasource node (table scan) for reading Iceberg tables efficiently, implementing partition pruning, predicate pushdown, and projection pushdown to minimize data read from S3.

Context

DataFusion's execution model allows pushing down filters, projections, and limits to the data source layer. For Iceberg tables stored in S3, effective pushdown can reduce:

  • Data transferred from S3 (network bandwidth)
  • Data deserialized from Parquet (CPU)
  • Memory used during query execution

Iceberg metadata provides partition information and column statistics that enable aggressive pruning before reading any data files.

Requirements

Partition Pruning

Implement partition elimination based on filters:

  • Parse Iceberg partition specs (year, month, day, hour)
  • Map query filters to partition predicates
  • Skip partitions that don't match query predicates
  • Use Iceberg's manifest files to identify relevant partitions

Example:

SELECT * FROM logs 
WHERE timestamp >= '2025-01-07' AND timestamp < '2025-01-08'

Should only read files from 2025-01-07 partition, skipping all others.

Predicate Pushdown

Push WHERE clause filters down to Parquet reader:

  • Extract filter predicates from logical plan
  • Convert to Parquet row group filters
  • Use Parquet column statistics (min/max, null count)
  • Skip row groups that don't match predicates
  • Implement Bloom filter checks when available

Supported predicates:

  • Equality: column = value
  • Comparison: column < value, column > value
  • Range: column BETWEEN a AND b
  • IN list: column IN (v1, v2, v3)
  • IS NULL / IS NOT NULL

Projection Pushdown

Read only required columns from Parquet:

  • Extract column list from SELECT clause
  • Pass to Parquet reader
  • Skip reading unreferenced columns
  • Minimize memory and I/O

Example:

SELECT timestamp, message FROM logs

Should only read timestamp and message columns, not labels, attributes, etc.

Limit Pushdown

Stop reading after limit is reached:

  • For queries with LIMIT clause and no aggregation
  • Read files in appropriate order (newest first for DESC)
  • Stop as soon as limit is satisfied
  • Particularly effective with sort optimization (Optimize LogicalPlan Sort Operations #9)

Metadata Optimization

Use Iceberg metadata to minimize S3 reads:

  • Cache manifest files in memory
  • Use manifest file statistics for pruning
  • Implement metadata-only queries where possible
  • Batch S3 requests for multiple files

Parallel Scanning

Implement parallel file reading:

  • Read multiple Parquet files concurrently
  • Configure parallelism based on query and system resources
  • Use DataFusion's partition-aware execution
  • Balance parallelism vs memory usage

Acceptance Criteria

  • Partition pruning eliminates irrelevant partitions
  • Predicate pushdown skips non-matching row groups
  • Projection pushdown reads only required columns
  • Limit pushdown stops reading after limit reached
  • Parallel scanning uses multiple concurrent readers
  • S3 reads are minimized for filtered queries
  • EXPLAIN shows pushdown information
  • Unit tests verify each optimization
  • Integration tests show end-to-end improvements
  • Benchmarks demonstrate >10x improvement for selective queries

Dependencies

Crates already present:

  • datafusion (50.3.0) - Datasource trait
  • iceberg-datafusion (0.7.0) - Iceberg table provider
  • parquet - Parquet reader (transitive)
  • arrow - Data representation

Depends on:

Implementation Notes

Partition Pruning Example

// Iceberg table partitioned by day(timestamp)
let partition_spec = table.metadata().current_partition_spec();

// Query filter: timestamp >= '2025-01-07 00:00:00'
let filter = col("timestamp").gt_eq(lit("2025-01-07T00:00:00Z"));

// Convert to partition filter
let partition_filter = PartitionFilter::from_expr(&filter, &partition_spec);

// Get matching data files
let files = table
    .current_snapshot()
    .manifest_list()
    .entries()
    .filter(|entry| partition_filter.matches(&entry.partition))
    .map(|entry| entry.data_file)
    .collect();

Predicate Pushdown to Parquet

use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::file::reader::FileReader;

// Push filter to Parquet row group selection
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
    .with_row_filter(RowFilter::new(vec![
        Box::new(|row_group_metadata| {
            // Use column statistics to skip row group
            let stats = row_group_metadata.column(0).statistics()?;
            stats.max() >= filter_min && stats.min() <= filter_max
        })
    ]))
    .build()?;

Projection Pushdown

// Only read required columns
let projection = vec![0, 2, 5]; // timestamp, message, level columns
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
    .with_projection(projection)
    .build()?;

Performance Targets

Baseline (no optimization):

  • Query: Filter by timestamp (1 hour) + specific label
  • Data: 1TB total, 10GB in time range, 100MB matches filter
  • Read: 10GB, Time: 30 seconds

Optimized:

  • Partition pruning: Read only 10GB (not 1TB)
  • Predicate pushdown: Skip row groups, read ~150MB
  • Projection: Read 2 columns instead of 10
  • Result: Read ~30MB, Time: <1 second

Improvement: 30x faster, 300x less data read

Monitoring

Expose metrics for:

  • Partitions scanned vs skipped
  • Row groups scanned vs skipped
  • Bytes read from S3
  • Scan parallelism
  • Pushdown effectiveness

Technical References

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions