Skip to content

Commit ebec550

Browse files
committed
Fix reading parquet statistics
1 parent dac4152 commit ebec550

File tree

2 files changed

+69
-44
lines changed

2 files changed

+69
-44
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,10 @@ impl FileOpener for ParquetOpener {
468468
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
469469
.await?;
470470

471+
let file_schema = builder.schema().clone();
472+
471473
let (schema_mapping, adapted_projections) =
472-
schema_adapter.map_schema(builder.schema())?;
474+
schema_adapter.map_schema(&file_schema)?;
473475
// let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;
474476

475477
let mask = ProjectionMask::roots(
@@ -481,8 +483,8 @@ impl FileOpener for ParquetOpener {
481483
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
482484
let row_filter = row_filter::build_row_filter(
483485
&predicate,
484-
builder.schema().as_ref(),
485-
table_schema.as_ref(),
486+
&file_schema,
487+
&*table_schema,
486488
builder.metadata(),
487489
reorder_predicates,
488490
&file_metrics,
@@ -507,6 +509,7 @@ impl FileOpener for ParquetOpener {
507509
let file_metadata = builder.metadata().clone();
508510
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
509511
let mut row_groups = row_groups::prune_row_groups_by_statistics(
512+
&file_schema,
510513
builder.parquet_schema(),
511514
file_metadata.row_groups(),
512515
file_range,

datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs

Lines changed: 63 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use super::ParquetFileMetrics;
5555
/// Note: This method currently ignores ColumnOrder
5656
/// <https://github.com/apache/arrow-datafusion/issues/8335>
5757
pub(crate) fn prune_row_groups_by_statistics(
58+
arrow_schema: &Schema,
5859
parquet_schema: &SchemaDescriptor,
5960
groups: &[RowGroupMetaData],
6061
range: Option<FileRange>,
@@ -80,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
8081
let pruning_stats = RowGroupPruningStatistics {
8182
parquet_schema,
8283
row_group_metadata: metadata,
83-
arrow_schema: predicate.schema().as_ref(),
84+
arrow_schema,
8485
};
8586
match predicate.prune(&pruning_stats) {
8687
Ok(values) => {
@@ -416,11 +417,11 @@ mod tests {
416417
fn row_group_pruning_predicate_simple_expr() {
417418
use datafusion_expr::{col, lit};
418419
// int > 1 => c1_max > 1
419-
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
420+
let schema =
421+
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
420422
let expr = col("c1").gt(lit(15));
421423
let expr = logical2physical(&expr, &schema);
422-
let pruning_predicate =
423-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
424+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
424425

425426
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
426427
let schema_descr = get_test_schema_descr(vec![field]);
@@ -436,6 +437,7 @@ mod tests {
436437
let metrics = parquet_file_metrics();
437438
assert_eq!(
438439
prune_row_groups_by_statistics(
440+
&schema,
439441
&schema_descr,
440442
&[rgm1, rgm2],
441443
None,
@@ -447,37 +449,38 @@ mod tests {
447449
}
448450

449451
#[test]
450-
fn row_group_pruning_predicate_missing_stats() {
452+
fn row_group_pruning_predicate_schema_evolution() {
451453
use datafusion_expr::{col, lit};
454+
// schema has col "c0" before column c1
452455
// int > 1 => c1_max > 1
453-
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
456+
let schema = Arc::new(Schema::new(vec![
457+
Field::new("c0", DataType::Int32, false),
458+
Field::new("c1", DataType::Int32, false),
459+
]));
454460
let expr = col("c1").gt(lit(15));
455461
let expr = logical2physical(&expr, &schema);
456-
let pruning_predicate =
457-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
462+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
458463

459-
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
464+
// model a file that only has column c1
465+
// only have statistics for column c0, should not be able to prune based on c1
466+
let field = PrimitiveTypeField::new("c0", PhysicalType::INT32);
460467
let schema_descr = get_test_schema_descr(vec![field]);
461468
let rgm1 = get_row_group_meta_data(
462469
&schema_descr,
463-
vec![ParquetStatistics::int32(None, None, None, 0, false)],
464-
);
465-
let rgm2 = get_row_group_meta_data(
466-
&schema_descr,
467-
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
470+
vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)],
468471
);
472+
469473
let metrics = parquet_file_metrics();
470-
// missing statistics for first row group mean that the result from the predicate expression
471-
// is null / undefined so the first row group can't be filtered out
472474
assert_eq!(
473475
prune_row_groups_by_statistics(
476+
&schema,
474477
&schema_descr,
475-
&[rgm1, rgm2],
478+
&[rgm1],
476479
None,
477480
Some(&pruning_predicate),
478481
&metrics
479482
),
480-
vec![0, 1]
483+
vec![0]
481484
);
482485
}
483486

@@ -519,6 +522,7 @@ mod tests {
519522
// when conditions are joined using AND
520523
assert_eq!(
521524
prune_row_groups_by_statistics(
525+
&schema,
522526
&schema_descr,
523527
groups,
524528
None,
@@ -532,12 +536,13 @@ mod tests {
532536
// this bypasses the entire predicate expression and no row groups are filtered out
533537
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
534538
let expr = logical2physical(&expr, &schema);
535-
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
539+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
536540

537541
// if conditions in predicate are joined with OR and an unsupported expression is used
538542
// this bypasses the entire predicate expression and no row groups are filtered out
539543
assert_eq!(
540544
prune_row_groups_by_statistics(
545+
&schema,
541546
&schema_descr,
542547
groups,
543548
None,
@@ -581,13 +586,14 @@ mod tests {
581586
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
582587
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
583588
let expr = logical2physical(&expr, &schema);
584-
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
589+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
585590
let groups = gen_row_group_meta_data_for_pruning_predicate();
586591

587592
let metrics = parquet_file_metrics();
588593
// First row group was filtered out because it contains no null value on "c2".
589594
assert_eq!(
590595
prune_row_groups_by_statistics(
596+
&schema,
591597
&schema_descr,
592598
&groups,
593599
None,
@@ -613,14 +619,15 @@ mod tests {
613619
.gt(lit(15))
614620
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
615621
let expr = logical2physical(&expr, &schema);
616-
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
622+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
617623
let groups = gen_row_group_meta_data_for_pruning_predicate();
618624

619625
let metrics = parquet_file_metrics();
620626
// bool = NULL always evaluates to NULL (and thus will not
621627
// pass predicates. Ideally these should both be false
622628
assert_eq!(
623629
prune_row_groups_by_statistics(
630+
&schema,
624631
&schema_descr,
625632
&groups,
626633
None,
@@ -639,8 +646,11 @@ mod tests {
639646

640647
// INT32: c1 > 5, the c1 is decimal(9,2)
641648
// The type of scalar value if decimal(9,2), don't need to do cast
642-
let schema =
643-
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
649+
let schema = Arc::new(Schema::new(vec![Field::new(
650+
"c1",
651+
DataType::Decimal128(9, 2),
652+
false,
653+
)]));
644654
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
645655
.with_logical_type(LogicalType::Decimal {
646656
scale: 2,
@@ -651,8 +661,7 @@ mod tests {
651661
let schema_descr = get_test_schema_descr(vec![field]);
652662
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
653663
let expr = logical2physical(&expr, &schema);
654-
let pruning_predicate =
655-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
664+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
656665
let rgm1 = get_row_group_meta_data(
657666
&schema_descr,
658667
// [1.00, 6.00]
@@ -680,6 +689,7 @@ mod tests {
680689
let metrics = parquet_file_metrics();
681690
assert_eq!(
682691
prune_row_groups_by_statistics(
692+
&schema,
683693
&schema_descr,
684694
&[rgm1, rgm2, rgm3],
685695
None,
@@ -693,8 +703,11 @@ mod tests {
693703
// The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
694704
// We should convert all type to the coercion type, which is decimal(11,2)
695705
// The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
696-
let schema =
697-
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
706+
let schema = Arc::new(Schema::new(vec![Field::new(
707+
"c1",
708+
DataType::Decimal128(9, 0),
709+
false,
710+
)]));
698711

699712
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
700713
.with_logical_type(LogicalType::Decimal {
@@ -709,8 +722,7 @@ mod tests {
709722
Decimal128(11, 2),
710723
));
711724
let expr = logical2physical(&expr, &schema);
712-
let pruning_predicate =
713-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
725+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
714726
let rgm1 = get_row_group_meta_data(
715727
&schema_descr,
716728
// [100, 600]
@@ -744,6 +756,7 @@ mod tests {
744756
let metrics = parquet_file_metrics();
745757
assert_eq!(
746758
prune_row_groups_by_statistics(
759+
&schema,
747760
&schema_descr,
748761
&[rgm1, rgm2, rgm3, rgm4],
749762
None,
@@ -754,8 +767,11 @@ mod tests {
754767
);
755768

756769
// INT64: c1 < 5, the c1 is decimal(18,2)
757-
let schema =
758-
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
770+
let schema = Arc::new(Schema::new(vec![Field::new(
771+
"c1",
772+
DataType::Decimal128(18, 2),
773+
false,
774+
)]));
759775
let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
760776
.with_logical_type(LogicalType::Decimal {
761777
scale: 2,
@@ -766,8 +782,7 @@ mod tests {
766782
let schema_descr = get_test_schema_descr(vec![field]);
767783
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
768784
let expr = logical2physical(&expr, &schema);
769-
let pruning_predicate =
770-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
785+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
771786
let rgm1 = get_row_group_meta_data(
772787
&schema_descr,
773788
// [6.00, 8.00]
@@ -792,6 +807,7 @@ mod tests {
792807
let metrics = parquet_file_metrics();
793808
assert_eq!(
794809
prune_row_groups_by_statistics(
810+
&schema,
795811
&schema_descr,
796812
&[rgm1, rgm2, rgm3],
797813
None,
@@ -803,8 +819,11 @@ mod tests {
803819

804820
// FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
805821
// the type of parquet is decimal(18,2)
806-
let schema =
807-
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
822+
let schema = Arc::new(Schema::new(vec![Field::new(
823+
"c1",
824+
DataType::Decimal128(18, 2),
825+
false,
826+
)]));
808827
let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
809828
.with_logical_type(LogicalType::Decimal {
810829
scale: 2,
@@ -818,8 +837,7 @@ mod tests {
818837
let left = cast(col("c1"), DataType::Decimal128(28, 3));
819838
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
820839
let expr = logical2physical(&expr, &schema);
821-
let pruning_predicate =
822-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
840+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
823841
// we must use the big-endian when encode the i128 to bytes or vec[u8].
824842
let rgm1 = get_row_group_meta_data(
825843
&schema_descr,
@@ -863,6 +881,7 @@ mod tests {
863881
let metrics = parquet_file_metrics();
864882
assert_eq!(
865883
prune_row_groups_by_statistics(
884+
&schema,
866885
&schema_descr,
867886
&[rgm1, rgm2, rgm3],
868887
None,
@@ -874,8 +893,11 @@ mod tests {
874893

875894
// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
876895
// the type of parquet is decimal(18,2)
877-
let schema =
878-
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
896+
let schema = Arc::new(Schema::new(vec![Field::new(
897+
"c1",
898+
DataType::Decimal128(18, 2),
899+
false,
900+
)]));
879901
let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
880902
.with_logical_type(LogicalType::Decimal {
881903
scale: 2,
@@ -889,8 +911,7 @@ mod tests {
889911
let left = cast(col("c1"), DataType::Decimal128(28, 3));
890912
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
891913
let expr = logical2physical(&expr, &schema);
892-
let pruning_predicate =
893-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
914+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
894915
// we must use the big-endian when encode the i128 to bytes or vec[u8].
895916
let rgm1 = get_row_group_meta_data(
896917
&schema_descr,
@@ -923,6 +944,7 @@ mod tests {
923944
let metrics = parquet_file_metrics();
924945
assert_eq!(
925946
prune_row_groups_by_statistics(
947+
&schema,
926948
&schema_descr,
927949
&[rgm1, rgm2, rgm3],
928950
None,

0 commit comments

Comments
 (0)