From 644bdf50eb0934a523c633ea8b796f54000e3254 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 26 Oct 2022 12:55:50 -0400 Subject: [PATCH] parquet filter pushdown correctness tests --- datafusion/core/Cargo.toml | 1 + .../core/tests/parquet_filter_pushdown.rs | 415 ++++++++++++++++++ test-utils/src/data_gen.rs | 19 + 3 files changed, 435 insertions(+) create mode 100644 datafusion/core/tests/parquet_filter_pushdown.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index f5fbf997412d..51a4e0001e0d 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -105,6 +105,7 @@ csv = "1.1.6" ctor = "0.1.22" doc-comment = "0.3" env_logger = "0.9" +parquet-test-utils = { path = "../../parquet-test-utils" } rstest = "0.15.0" test-utils = { path = "../../test-utils" } diff --git a/datafusion/core/tests/parquet_filter_pushdown.rs b/datafusion/core/tests/parquet_filter_pushdown.rs new file mode 100644 index 000000000000..ba957083491a --- /dev/null +++ b/datafusion/core/tests/parquet_filter_pushdown.rs @@ -0,0 +1,415 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! non trivial integration testing for parquet predicate pushdown +//! +//! Testing hints: If you run this test with --nocapture it will tell you where +//! the generated parquet file went. You can then test it and try out various queries +//! datafusion-cli like: +//! +//! ```sql +//! create external table data stored as parquet location 'data.parquet'; +//! select * from data limit 10; +//! ``` + +use arrow::compute::concat_batches; +use arrow::record_batch::RecordBatch; +use datafusion::logical_expr::{lit, Expr}; +use datafusion::physical_plan::collect; +use datafusion::physical_plan::metrics::{MetricValue, MetricsSet}; +use datafusion::prelude::{col, SessionContext}; +use lazy_static::lazy_static; +use parquet_test_utils::{ParquetScanOptions, TestParquetFile}; +use tempfile::TempDir; +use test_utils::AccessLogGenerator; + +/// how many rows of generated data to write to our parquet file (arbitrary) +const NUM_ROWS: usize = 53819; + +// Only create the parquet file once as it is fairly large +lazy_static! { + static ref TEMPDIR: TempDir = TempDir::new().unwrap(); + + /// Randomly (but consistently) generated test file. You can use `datafusion-cli` to explore it more carefully + static ref TESTFILE: TestParquetFile = { + let generator = AccessLogGenerator::new() + .with_row_limit(Some(NUM_ROWS)); + + // TODO: set the max page rows with some various / arbitrary sizes 8311 + // (using https://github.com/apache/arrow-rs/issues/2941) to ensure we get multiple pages + let page_size = None; + let row_group_size = None; + let file = TEMPDIR.path().join("data.parquet"); + + println!("Writing test data to {:?}", file); + match TestParquetFile::try_new(file, generator, page_size, row_group_size) { + Err(e) => { + panic!("Error writing data: {}", e); + } + Ok(f) => f, + } + }; +} + +#[tokio::test] +async fn selective() { + TestCase::new() + // request_method = 'GET' + .with_filter(col("request_method").eq(lit("GET"))) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(8886) + .run() + .await +} + +#[tokio::test] +async fn non_selective() { + TestCase::new() + // request_method != 'GET' + .with_filter(col("request_method").not_eq(lit("GET"))) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(44933) + .run() + .await +} + +#[tokio::test] +async fn basic_conjunction() { + TestCase::new() + // request_method = 'POST' AND + // response_status = 503 + .with_filter( + col("request_method") + .eq(lit("POST")) + .and(col("response_status").eq(lit(503_u16))), + ) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(1729) + .run() + .await +} + +#[tokio::test] +async fn everything() { + TestCase::new() + // filter filters everything (no row has this status) + // response_status = 429 + .with_filter(col("response_status").eq(lit(429_u16))) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(0) + .run() + .await +} + +#[tokio::test] +async fn nothing() { + TestCase::new() + // No rows are filtered out -- all are returned + // response_status > 0 + .with_filter(col("response_status").gt(lit(0_u16))) + .with_pushdown_expected(PushdownExpected::None) + .with_expected_rows(53819) + .run() + .await +} + +#[tokio::test] +async fn dict_selective() { + TestCase::new() + // container = 'backend_container_0' + .with_filter(col("container").eq(lit("backend_container_0"))) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(37856) + .run() + .await +} + +#[tokio::test] +async fn dict_non_selective() { + TestCase::new() + // container != 'backend_container_0' + .with_filter(col("container").not_eq(lit("backend_container_0"))) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(15963) + .run() + .await +} + +#[tokio::test] +async fn dict_conjunction() { + TestCase::new() + // container == 'backend_container_0' AND + // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' + .with_filter( + col("container") + .eq(lit("backend_container_0")) + .and(col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg"))), + ) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(3052) + .run() + .await +} + +#[tokio::test] +async fn dict_very_selective() { + TestCase::new() + // request_bytes > 2B AND + // container == 'backend_container_0' AND + // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' + .with_filter( + col("request_bytes") + .gt(lit(2000000000)) + .and(col("container").eq(lit("backend_container_0"))) + .and(col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg"))), + ) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(88) + .run() + .await +} + +#[tokio::test] +async fn dict_very_selective2() { + TestCase::new() + // picks only 2 rows + // client_addr = '204.47.29.82' AND + // container == 'backend_container_0' AND + // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' + .with_filter( + col("request_bytes") + .gt(lit(2000000000)) + .and(col("container").eq(lit("backend_container_0"))) + .and(col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg"))), + ) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(88) + .run() + .await +} + +#[tokio::test] +async fn dict_disjunction() { + TestCase::new() + // container = 'backend_container_0' OR + // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' + .with_filter( + col("container") + .eq(lit("backend_container_0")) + .or(col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg"))), + ) + .with_pushdown_expected(PushdownExpected::Some) + .with_expected_rows(39982) + .run() + .await +} + +/// Expected pushdown behavior +#[derive(Debug, Clone, Copy)] +enum PushdownExpected { + /// Did not expect filter pushdown to filter any rows + None, + /// Expected that some rows were filtered by pushdown + Some, +} + +/// parameters for running a test +struct TestCase { + filter: Expr, + pushdown_expected: PushdownExpected, + /// How many rows are expected to pass the predicate overall? + expected_rows: usize, +} + +impl TestCase { + fn new() -> Self { + Self { + // default to a filter that passes everything + filter: lit(true), + pushdown_expected: PushdownExpected::None, + expected_rows: 0, + } + } + + /// Set the filter expression to use + fn with_filter(mut self, filter: Expr) -> Self { + self.filter = filter; + self + } + + /// Set the expected predicate pushdown + fn with_pushdown_expected(mut self, pushdown_expected: PushdownExpected) -> Self { + self.pushdown_expected = pushdown_expected; + self + } + + /// Set the number of expected rows (to ensure the predicates have + /// a good range of selectivity + fn with_expected_rows(mut self, expected_rows: usize) -> Self { + self.expected_rows = expected_rows; + self + } + + async fn run(&self) { + let no_pushdown = self + .read_with_options( + &TESTFILE, + ParquetScanOptions { + pushdown_filters: false, + reorder_filters: false, + enable_page_index: false, + }, + // always expect no pushdown + PushdownExpected::None, + ) + .await; + + let only_pushdown = self + .read_with_options( + &TESTFILE, + ParquetScanOptions { + pushdown_filters: true, + reorder_filters: false, + enable_page_index: false, + }, + self.pushdown_expected, + ) + .await; + + assert_eq!(no_pushdown, only_pushdown); + + let pushdown_and_reordering = self + .read_with_options( + &TESTFILE, + ParquetScanOptions { + pushdown_filters: true, + reorder_filters: true, + enable_page_index: false, + }, + self.pushdown_expected, + ) + .await; + + assert_eq!(no_pushdown, pushdown_and_reordering); + + // page index filtering is not correct: + // https://github.com/apache/arrow-datafusion/issues/4002 + // when it is fixed we can add these tests too + + // let page_index_only = self + // .read_with_options( + // &TESTFILE, + // ParquetScanOptions { + // pushdown_filters: false, + // reorder_filters: false, + // enable_page_index: true, + // }, + // ) + // .await; + //assert_eq!(no_pushdown, page_index_only); + + // let pushdown_reordering_and_page_index = self + // .read_with_options( + // &TESTFILE, + // ParquetScanOptions { + // pushdown_filters: false, + // reorder_filters: false, + // enable_page_index: true, + // }, + // ) + // .await; + + //assert_eq!(no_pushdown, pushdown_reordering_and_page_index); + } + + /// Reads data from a test parquet file using the specified scan options + async fn read_with_options( + &self, + test_file: &TestParquetFile, + scan_options: ParquetScanOptions, + pushdown_expected: PushdownExpected, + ) -> RecordBatch { + println!("Querying {:?}", test_file.path()); + println!(" scan options: {scan_options:?}"); + println!(" reading with filter {:?}", self.filter); + let ctx = SessionContext::new(); + let exec = test_file + .create_scan(self.filter.clone(), scan_options) + .await + .unwrap(); + let result = collect(exec.clone(), ctx.task_ctx()).await.unwrap(); + + // Concatenate the results back together + let batch = concat_batches(&test_file.schema(), &result).unwrap(); + + let total_rows = batch.num_rows(); + + println!( + "Filter: {}, total records: {}, after filter: {}, selectivty: {}", + self.filter, + NUM_ROWS, + total_rows, + (total_rows as f64) / (NUM_ROWS as f64), + ); + assert_eq!(total_rows, self.expected_rows); + + // verify expected pushdown + let metrics = test_file + .parquet_metrics(exec) + .expect("found parquet metrics"); + let pushdown_rows_filtered = get_value(&metrics, "pushdown_rows_filtered"); + println!(" pushdown_rows_filtered: {}", pushdown_rows_filtered); + + match pushdown_expected { + PushdownExpected::None => { + assert_eq!(pushdown_rows_filtered, 0); + } + PushdownExpected::Some => { + assert!( + pushdown_rows_filtered > 0, + "Expected to filter rows via pushdown, but none were" + ); + } + }; + + batch + } +} + +// TODO move this to metricsset +/// returns the sum of all the metrics with the specified name +/// the returned set. +/// +/// Count: returns value +/// Time: returns elapsed nanoseconds +/// +/// Panics if no such metric. +fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { + let sum = metrics.sum( + |m| matches!(m.value(), MetricValue::Count { name, .. } if name == metric_name), + ); + + match sum { + Some(MetricValue::Count { count, .. }) => count.value(), + _ => { + panic!( + "Expected metric not found. Looking for '{}' in\n\n{:#?}", + metric_name, metrics + ); + } + } +} diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs index dd516d5fe4c9..de32a57bd2f2 100644 --- a/test-utils/src/data_gen.rs +++ b/test-utils/src/data_gen.rs @@ -185,6 +185,25 @@ fn generate_sorted_strings( /// usecases. /// /// This is useful for writing tests queries on such data +/// +/// Here are the columns with example data: +/// +/// ```text +/// service: 'backend' +/// host: 'i-1ec3ca3151468928.ec2.internal' +/// pod: 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg' +/// container: 'backend_container_0' +/// image: 'backend_container_0@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9' +/// time: '1970-01-01 00:00:00' +/// client_addr: '127.216.178.64' +/// request_duration_ns: -1261239112 +/// request_user_agent: 'kxttrfiiietlsaygzphhwlqcgngnumuphliejmxfdznuurswhdcicrlprbnocibvsbukiohjjbjdygwbfhxqvurm' +/// request_method: 'PUT' +/// request_host: 'https://backend.mydomain.com' +/// request_bytes: -312099516 +/// response_bytes: 1448834362 +/// response_status: 200 +/// ``` #[derive(Debug)] pub struct AccessLogGenerator { schema: SchemaRef,