From acac9ec68fd8b5ed7b38e8bb00ea5b8155fed903 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Sat, 16 Mar 2024 12:49:42 +0100 Subject: [PATCH] Add support for row group pruning on FixedSizeBinary --- .../physical_plan/parquet/row_groups.rs | 1 + .../physical_plan/parquet/statistics.rs | 8 +- .../core/tests/parquet/row_group_pruning.rs | 102 ++++++++++++++++++ 3 files changed, 109 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 95d4c93a60079..ce80f7f4a4da8 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -222,6 +222,7 @@ impl PruningStatistics for BloomFilterStatistics { match value { ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()), ScalarValue::Binary(Some(v)) => sbbf.check(v), + ScalarValue::FixedSizeBinary(_size, Some(v)) => sbbf.check(v), ScalarValue::Boolean(Some(v)) => sbbf.check(v), ScalarValue::Float64(Some(v)) => sbbf.check(v), ScalarValue::Float32(Some(v)) => sbbf.check(v), diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 4e472606da515..628745893df87 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -109,10 +109,11 @@ macro_rules! get_statistic { } } } - // type not supported yet + // type not fully supported yet ParquetStatistics::FixedLenByteArray(s) => { match $target_arrow_type { - // just support the decimal data type + // just support specific logical data type, there are others each + // with their own ordering Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(from_bytes_to_i128(s.$bytes_func())), @@ -120,6 +121,9 @@ macro_rules! get_statistic { *scale, )) } + Some(DataType::FixedSizeBinary(size)) => { + Some(ScalarValue::FixedSizeBinary(*size, Some(s.$bytes_func().to_vec()))) + } _ => None, } } diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 55112193502df..399e3cd1123ad 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -948,6 +948,108 @@ async fn prune_binary_lt() { .await; } +#[tokio::test] +async fn prune_fixedsizebinary_eq_match() { + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_fixedsize FROM t WHERE service_fixedsize = ARROW_CAST(CAST('fe6' AS bytea), 'FixedSizeBinary(3)')", + ) + .with_expected_errors(Some(0)) + // false positive on 'all frontends' batch: 'fe1' < 'fe6' < 'fe7' + .with_matched_by_stats(Some(2)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(1)) + .with_pruned_by_bloom_filter(Some(1)) + .with_expected_rows(1) + .test_row_group_prune() + .await; + + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_fixedsize FROM t WHERE service_fixedsize = ARROW_CAST(CAST('fe6' AS bytea), 'FixedSizeBinary(3)')", + ) + .with_expected_errors(Some(0)) + // false positive on 'all frontends' batch: 'fe1' < 'fe6' < 'fe7' + .with_matched_by_stats(Some(2)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(1)) + .with_pruned_by_bloom_filter(Some(1)) + .with_expected_rows(1) + .test_row_group_prune() + .await; + +} + +#[tokio::test] +async fn prune_fixedsizebinary_eq_no_match() { + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_fixedsize FROM t WHERE service_fixedsize = ARROW_CAST(CAST('be9' AS bytea), 'FixedSizeBinary(3)')", + ) + .with_expected_errors(Some(0)) + // false positive on 'mixed' batch: 'be1' < 'be9' < 'fe4' + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(2)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(1)) + .with_expected_rows(0) + .test_row_group_prune() + .await; +} + +#[tokio::test] +async fn prune_fixedsizebinary_neq() { + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_fixedsize FROM t WHERE service_fixedsize != ARROW_CAST(CAST('be1' AS bytea), 'FixedSizeBinary(3)')", + ) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(3)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(14) + .test_row_group_prune() + .await; +} + +#[tokio::test] +async fn prune_fixedsizebinary_lt() { + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_fixedsize FROM t WHERE service_fixedsize < ARROW_CAST(CAST('be3' AS bytea), 'FixedSizeBinary(3)')", + ) + .with_expected_errors(Some(0)) + // matches 'all backends' only + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(2)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(2) + .test_row_group_prune() + .await; + + RowGroupPruningTest::new() + .with_scenario(Scenario::ByteArray) + .with_query( + "SELECT name, service_fixedsize FROM t WHERE service_fixedsize < ARROW_CAST(CAST('be9' AS bytea), 'FixedSizeBinary(3)')", + ) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(2)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + // all backends from 'mixed' and 'all backends' + .with_expected_rows(8) + .test_row_group_prune() + .await; +} + #[tokio::test] async fn prune_periods_in_column_names() { // There are three row groups for "service.name", each with 5 rows = 15 rows total