Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,10 @@ impl FileOpener for ParquetOpener {
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;

let file_schema = builder.schema().clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought giving builder.schema() a name made the code clearer.


let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(builder.schema())?;
schema_adapter.map_schema(&file_schema)?;
// let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;

let mask = ProjectionMask::roots(
Expand All @@ -481,8 +483,8 @@ impl FileOpener for ParquetOpener {
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
builder.schema().as_ref(),
table_schema.as_ref(),
&file_schema,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive by cleanup -- no functioanl change

&table_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
Expand All @@ -507,6 +509,7 @@ impl FileOpener for ParquetOpener {
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
&file_schema,
builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
Expand Down
140 changes: 110 additions & 30 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use super::ParquetFileMetrics;
/// Note: This method currently ignores ColumnOrder
/// <https://github.com/apache/arrow-datafusion/issues/8335>
pub(crate) fn prune_row_groups_by_statistics(
arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
groups: &[RowGroupMetaData],
range: Option<FileRange>,
Expand All @@ -80,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
let pruning_stats = RowGroupPruningStatistics {
parquet_schema,
row_group_metadata: metadata,
arrow_schema: predicate.schema().as_ref(),
arrow_schema,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix -- to use the file schema rather than the table schema to lookup columns

};
match predicate.prune(&pruning_stats) {
Ok(values) => {
Expand Down Expand Up @@ -416,11 +417,11 @@ mod tests {
fn row_group_pruning_predicate_simple_expr() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I updated the signature of prune_row_groups_by_statistics I also had to update the tests appropriately

use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
Expand All @@ -436,6 +437,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2],
None,
Expand All @@ -450,11 +452,11 @@ mod tests {
fn row_group_pruning_predicate_missing_stats() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
Expand All @@ -471,6 +473,7 @@ mod tests {
// is null / undefined so the first row group can't be filtered out
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2],
None,
Expand Down Expand Up @@ -519,6 +522,7 @@ mod tests {
// when conditions are joined using AND
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
groups,
None,
Expand All @@ -532,12 +536,13 @@ mod tests {
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();

// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
groups,
None,
Expand All @@ -548,6 +553,64 @@ mod tests {
);
}

#[test]
fn row_group_pruning_predicate_file_schema() {
use datafusion_expr::{col, lit};
// test row group predicate when file schema is different than table schema
// c1 > 0
let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Int32, false),
]));
let expr = col("c1").gt(lit(0));
let expr = logical2physical(&expr, &table_schema);
let pruning_predicate =
PruningPredicate::try_new(expr, table_schema.clone()).unwrap();

// Model a file schema's column order c2 then c1, which is the opposite
// of the table schema
let file_schema = Arc::new(Schema::new(vec![
Field::new("c2", DataType::Int32, false),
Field::new("c1", DataType::Int32, false),
]));
let schema_descr = get_test_schema_descr(vec![
PrimitiveTypeField::new("c2", PhysicalType::INT32),
PrimitiveTypeField::new("c1", PhysicalType::INT32),
]);
// rg1 has c2 less than zero, c1 greater than zero
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false), // c2
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
],
);
// rg1 has c2 greater than zero, c1 less than zero
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false),
],
);

let metrics = parquet_file_metrics();
let groups = &[rgm1, rgm2];
// the first row group should be left because c1 is greater than zero
// the second should be filtered out because c1 is less than zero
assert_eq!(
prune_row_groups_by_statistics(
&file_schema, // NB must be file schema, not table_schema
&schema_descr,
groups,
None,
Some(&pruning_predicate),
&metrics
),
vec![0]
);
}

fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr(vec![
PrimitiveTypeField::new("c1", PhysicalType::INT32),
Expand Down Expand Up @@ -581,13 +644,14 @@ mod tests {
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Expand All @@ -613,14 +677,15 @@ mod tests {
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Expand All @@ -639,8 +704,11 @@ mod tests {

// INT32: c1 > 5, the c1 is decimal(9,2)
// The type of scalar value if decimal(9,2), don't need to do cast
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(9, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -651,8 +719,7 @@ mod tests {
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [1.00, 6.00]
Expand Down Expand Up @@ -680,6 +747,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -693,8 +761,11 @@ mod tests {
// The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
// We should convert all type to the coercion type, which is decimal(11,2)
// The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(9, 0),
false,
)]));

let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
Expand All @@ -709,8 +780,7 @@ mod tests {
Decimal128(11, 2),
));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [100, 600]
Expand Down Expand Up @@ -744,6 +814,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3, rgm4],
None,
Expand All @@ -754,8 +825,11 @@ mod tests {
);

// INT64: c1 < 5, the c1 is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -766,8 +840,7 @@ mod tests {
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [6.00, 8.00]
Expand All @@ -792,6 +865,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -803,8 +877,11 @@ mod tests {

// FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -818,8 +895,7 @@ mod tests {
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand Down Expand Up @@ -863,6 +939,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand All @@ -874,8 +951,11 @@ mod tests {

// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let schema = Arc::new(Schema::new(vec![Field::new(
"c1",
DataType::Decimal128(18, 2),
false,
)]));
let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
Expand All @@ -889,8 +969,7 @@ mod tests {
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
Expand Down Expand Up @@ -923,6 +1002,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like these are existing tests? Should we add one test of prune_row_groups_by_statistics which schema (file schema) is different to predicate schema?

Copy link
Contributor Author

@alamb alamb Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good idea -- I will do so

The reason I think an end to end test in .slt is also required is that the bug is related to passing the wrong schema into prune_row_groups_by_statistics -- so I could write a test in terms of prune_row_groups_by_statistics that passes (passes in the right schema) but actual answers would still be wrong because the callsite of prune_row_groups_by_statistics would pass the wrong one

&schema_descr,
&[rgm1, rgm2, rgm3],
None,
Expand Down
Loading