diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 72baa0db00a2..97d028897b0b 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -71,3 +71,7 @@ name = "case_when" [[bench]] harness = false name = "is_null" + +[[bench]] +harness = false +name = "binary_op" diff --git a/datafusion/physical-expr/benches/binary_op.rs b/datafusion/physical-expr/benches/binary_op.rs new file mode 100644 index 000000000000..7ac5c0485203 --- /dev/null +++ b/datafusion/physical-expr/benches/binary_op.rs @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::{ + array::BooleanArray, + compute::{bool_and, bool_or}, + datatypes::{DataType, Field, Schema}, +}; +use arrow::{array::StringArray, record_batch::RecordBatch}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_expr::{and, binary_expr, col, lit, or, Operator}; +use datafusion_physical_expr::{ + expressions::{BinaryExpr, Column}, + planner::logical2physical, + PhysicalExpr, +}; +use std::sync::{Arc, LazyLock}; + +/// Generates BooleanArrays with different true/false distributions for benchmarking. +/// +/// Returns a vector of tuples containing scenario name and corresponding BooleanArray. +/// +/// # Arguments +/// - `TEST_ALL_FALSE` - Used to generate what kind of test data +/// - `len` - Length of the BooleanArray to generate +fn generate_boolean_cases( + len: usize, +) -> Vec<(String, BooleanArray)> { + let mut cases = Vec::with_capacity(6); + + // Scenario 1: All elements false or all elements true + if TEST_ALL_FALSE { + let all_false = BooleanArray::from(vec![false; len]); + cases.push(("all_false".to_string(), all_false)); + } else { + let all_true = BooleanArray::from(vec![true; len]); + cases.push(("all_true".to_string(), all_true)); + } + + // Scenario 2: Single true at first position or single false at first position + if TEST_ALL_FALSE { + let mut first_true = vec![false; len]; + first_true[0] = true; + cases.push(("one_true_first".to_string(), BooleanArray::from(first_true))); + } else { + let mut first_false = vec![true; len]; + first_false[0] = false; + cases.push(( + "one_false_first".to_string(), + BooleanArray::from(first_false), + )); + } + + // Scenario 3: Single true at last position or single false at last position + if TEST_ALL_FALSE { + let mut last_true = vec![false; len]; + last_true[len - 1] = true; + cases.push(("one_true_last".to_string(), BooleanArray::from(last_true))); + } else { + let mut last_false = vec![true; len]; + last_false[len - 1] = false; + cases.push(("one_false_last".to_string(), BooleanArray::from(last_false))); + } + + // Scenario 4: Single true at exact middle or single false at exact middle + let mid = len / 2; + if TEST_ALL_FALSE { + let mut mid_true = vec![false; len]; + mid_true[mid] = true; + cases.push(("one_true_middle".to_string(), BooleanArray::from(mid_true))); + } else { + let mut mid_false = vec![true; len]; + mid_false[mid] = false; + cases.push(( + "one_false_middle".to_string(), + BooleanArray::from(mid_false), + )); + } + + // Scenario 5: Single true at 25% position or single false at 25% position + let mid_left = len / 4; + if TEST_ALL_FALSE { + let mut mid_left_true = vec![false; len]; + mid_left_true[mid_left] = true; + cases.push(( + "one_true_middle_left".to_string(), + BooleanArray::from(mid_left_true), + )); + } else { + let mut mid_left_false = vec![true; len]; + mid_left_false[mid_left] = false; + cases.push(( + "one_false_middle_left".to_string(), + BooleanArray::from(mid_left_false), + )); + } + + // Scenario 6: Single true at 75% position or single false at 75% position + let mid_right = (3 * len) / 4; + if TEST_ALL_FALSE { + let mut mid_right_true = vec![false; len]; + mid_right_true[mid_right] = true; + cases.push(( + "one_true_middle_right".to_string(), + BooleanArray::from(mid_right_true), + )); + } else { + let mut mid_right_false = vec![true; len]; + mid_right_false[mid_right] = false; + cases.push(( + "one_false_middle_right".to_string(), + BooleanArray::from(mid_right_false), + )); + } + + cases +} + +/// Benchmarks boolean operations `false_count/bool_or` and `true_count/bool_and` on [`BooleanArray`] +/// You can run this benchmark with: +/// ```sh +/// # test true_count/false_count +/// TEST_BOOL_COUNT=1 cargo bench --bench binary_op -- boolean_ops +/// # test bool_or/bool_and +/// cargo bench --bench binary_op -- boolean_ops +/// ``` +fn benchmark_boolean_ops(c: &mut Criterion) { + let len = 1_000_000; // Use one million elements for clear performance differentiation + static TEST_BOOL_COUNT: LazyLock = + LazyLock::new(|| match std::env::var("TEST_BOOL_COUNT") { + Ok(_) => { + println!("TEST_BOOL_COUNT=ON"); + true + } + Err(_) => { + println!("TEST_BOOL_COUNT=OFF"); + false + } + }); + + // Determine the test function to be executed based on the ENV `TEST_BOOL_COUNT` + fn test_func(array: &BooleanArray) -> bool { + // Use false_count for all false and true_count for all true + if *TEST_BOOL_COUNT { + if TEST_ALL_FALSE { + array.false_count() == array.len() + } else { + array.true_count() == array.len() + } + } + // Use bool_or for all false and bool_and for all true + else if TEST_ALL_FALSE { + match bool_or(array) { + Some(v) => !v, + None => false, + } + } else { + bool_and(array).unwrap_or(false) + } + } + + // Test cases for false_count and bool_or + { + let test_cases = generate_boolean_cases::(len); + for (scenario, array) in test_cases { + let arr_ref = Arc::new(array); + + // Benchmark test_func across different scenarios + c.bench_function(&format!("boolean_ops/or/{}", scenario), |b| { + b.iter(|| test_func::(black_box(&arr_ref))) + }); + } + } + // Test cases for true_count and bool_and + { + let test_cases = generate_boolean_cases::(len); + for (scenario, array) in test_cases { + let arr_ref = Arc::new(array); + + // Benchmark test_func across different scenarios + c.bench_function(&format!("boolean_ops/and/{}", scenario), |b| { + b.iter(|| test_func::(black_box(&arr_ref))) + }); + } + } +} + +/// Benchmarks AND/OR operator short-circuiting by evaluating complex regex conditions. +/// +/// Creates 6 test scenarios per operator: +/// 1. All values enable short-circuit (all_true/all_false) +/// 2. 2-6 Single true/false value at different positions to measure early exit +/// +/// You can run this benchmark with: +/// ```sh +/// cargo bench --bench binary_op -- short_circuit +/// ``` +fn benchmark_binary_op_in_short_circuit(c: &mut Criterion) { + // Create schema with three columns + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Boolean, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Utf8, false), + ])); + + // Generate test data with extended content + let (b_values, c_values) = generate_test_strings(8192); + + let batches_and = + create_record_batch::(schema.clone(), &b_values, &c_values).unwrap(); + let batches_or = + create_record_batch::(schema.clone(), &b_values, &c_values).unwrap(); + + // Build complex string matching conditions + let right_condition_and = and( + // Check for API endpoint pattern in URLs + binary_expr( + col("b"), + Operator::RegexMatch, + lit(r#"^https://(\w+\.)?example\.(com|org)/"#), + ), + // Check for markdown code blocks and summary section + binary_expr( + col("c"), + Operator::RegexMatch, + lit("```(rust|python|go)\nfn? main$$"), + ), + ); + + let right_condition_or = or( + // Check for secure HTTPS protocol + binary_expr( + col("b"), + Operator::RegexMatch, + lit(r#"^https://(\w+\.)?example\.(com|org)/"#), + ), + // Check for Rust code examples + binary_expr( + col("c"), + Operator::RegexMatch, + lit("```(rust|python|go)\nfn? main$$"), + ), + ); + + // Create physical binary expressions + let expr_and = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::And, + logical2physical(&right_condition_and, &schema), + ); + + let expr_or = BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Or, + logical2physical(&right_condition_or, &schema), + ); + + // Each scenario when the test operator is `and` + { + for (name, batch) in batches_and { + c.bench_function(&format!("short_circuit/and/{}", name), |b| { + b.iter(|| expr_and.evaluate(black_box(&batch)).unwrap()) + }); + } + } + // Each scenario when the test operator is `or` + { + for (name, batch) in batches_or { + c.bench_function(&format!("short_circuit/or/{}", name), |b| { + b.iter(|| expr_or.evaluate(black_box(&batch)).unwrap()) + }); + } + } +} + +/// Generate test data with computationally expensive patterns +fn generate_test_strings(num_rows: usize) -> (Vec, Vec) { + // Extended URL patterns with query parameters and paths + let base_urls = [ + "https://api.example.com/v2/users/12345/posts?category=tech&sort=date&lang=en-US", + "https://cdn.example.net/assets/images/2023/08/15/sample-image-highres.jpg?width=1920&quality=85", + "http://service.demo.org:8080/api/data/transactions/20230815123456.csv", + "ftp://legacy.archive.example/backups/2023/Q3/database-dump.sql.gz", + "https://docs.example.co.uk/reference/advanced-topics/concurrency/parallel-processing.md#implementation-details", + ]; + + // Extended markdown content with code blocks and structure + let base_markdowns = [ + concat!( + "# Advanced Topics in Computer Science\n\n", + "## Summary\nThis article explores complex system design patterns and...\n\n", + "```rust\nfn process_data(data: &mut [i32]) {\n // Parallel processing example\n data.par_iter_mut().for_each(|x| *x *= 2);\n}\n```\n\n", + "## Performance Considerations\nWhen implementing concurrent systems...\n" + ), + concat!( + "## API Documentation\n\n", + "```json\n{\n \"endpoint\": \"/api/v2/users\",\n \"methods\": [\"GET\", \"POST\"],\n \"parameters\": {\n \"page\": \"number\"\n }\n}\n```\n\n", + "# Authentication Guide\nSecure your API access using OAuth 2.0...\n" + ), + concat!( + "# Data Processing Pipeline\n\n", + "```python\nfrom multiprocessing import Pool\n\ndef main():\n with Pool(8) as p:\n results = p.map(process_item, data)\n```\n\n", + "## Summary of Optimizations\n1. Batch processing\n2. Memory pooling\n3. Concurrent I/O operations\n" + ), + concat!( + "# System Architecture Overview\n\n", + "## Components\n- Load Balancer\n- Database Cluster\n- Cache Service\n\n", + "```go\nfunc main() {\n router := gin.Default()\n router.GET(\"/api/health\", healthCheck)\n router.Run(\":8080\")\n}\n```\n" + ), + concat!( + "## Configuration Reference\n\n", + "```yaml\nserver:\n port: 8080\n max_threads: 32\n\ndatabase:\n url: postgres://user@prod-db:5432/main\n```\n\n", + "# Deployment Strategies\nBlue-green deployment patterns with...\n" + ), + ]; + + let mut urls = Vec::with_capacity(num_rows); + let mut markdowns = Vec::with_capacity(num_rows); + + for i in 0..num_rows { + urls.push(base_urls[i % 5].to_string()); + markdowns.push(base_markdowns[i % 5].to_string()); + } + + (urls, markdowns) +} + +/// Creates record batches with boolean arrays that test different short-circuit scenarios. +/// When TEST_ALL_FALSE = true: creates data for AND operator benchmarks (needs early false exit) +/// When TEST_ALL_FALSE = false: creates data for OR operator benchmarks (needs early true exit) +fn create_record_batch( + schema: Arc, + b_values: &[String], + c_values: &[String], +) -> arrow::error::Result> { + // Generate data for six scenarios, but only the data for the "all_false" and "all_true" cases can be optimized through short-circuiting + let boolean_array = generate_boolean_cases::(b_values.len()); + let mut rbs = Vec::with_capacity(boolean_array.len()); + for (name, a_array) in boolean_array { + let b_array = StringArray::from(b_values.to_vec()); + let c_array = StringArray::from(c_values.to_vec()); + rbs.push(( + name, + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(a_array), Arc::new(b_array), Arc::new(c_array)], + )?, + )); + } + Ok(rbs) +} + +criterion_group!( + benches, + benchmark_boolean_ops, + benchmark_binary_op_in_short_circuit +); + +criterion_main!(benches); diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index f21d3e7652cd..84374f4a2970 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -359,6 +359,12 @@ impl PhysicalExpr for BinaryExpr { use arrow::compute::kernels::numeric::*; let lhs = self.left.evaluate(batch)?; + + // Optimize for short-circuiting `Operator::And` or `Operator::Or` operations and return early. + if check_short_circuit(&lhs, &self.op) { + return Ok(lhs); + } + let rhs = self.right.evaluate(batch)?; let left_data_type = lhs.data_type(); let right_data_type = rhs.data_type(); @@ -805,6 +811,60 @@ impl BinaryExpr { } } +/// Checks if a logical operator (`AND`/`OR`) can short-circuit evaluation based on the left-hand side (lhs) result. +/// +/// Short-circuiting occurs when evaluating the right-hand side (rhs) becomes unnecessary: +/// - For `AND`: if ALL values in `lhs` are `false`, the expression must be `false` regardless of rhs. +/// - For `OR`: if ALL values in `lhs` are `true`, the expression must be `true` regardless of rhs. +/// +/// Returns `true` if short-circuiting is possible, `false` otherwise. +/// +/// # Arguments +/// * `arg` - The left-hand side (lhs) columnar value (array or scalar) +/// * `op` - The logical operator (`AND` or `OR`) +/// +/// # Implementation Notes +/// 1. Only works with Boolean-typed arguments (other types automatically return `false`) +/// 2. Handles both scalar values and array values +/// 3. For arrays, uses optimized `true_count()`/`false_count()` methods from arrow-rs. +/// `bool_or`/`bool_and` maybe a better choice too,for detailed discussion,see:[link](https://github.com/apache/datafusion/pull/15462#discussion_r2020558418) +fn check_short_circuit(arg: &ColumnarValue, op: &Operator) -> bool { + let data_type = arg.data_type(); + match (data_type, op) { + (DataType::Boolean, Operator::And) => { + match arg { + ColumnarValue::Array(array) => { + if let Ok(array) = as_boolean_array(&array) { + return array.false_count() == array.len(); + } + } + ColumnarValue::Scalar(scalar) => { + if let ScalarValue::Boolean(Some(value)) = scalar { + return !value; + } + } + } + false + } + (DataType::Boolean, Operator::Or) => { + match arg { + ColumnarValue::Array(array) => { + if let Ok(array) = as_boolean_array(&array) { + return array.true_count() == array.len(); + } + } + ColumnarValue::Scalar(scalar) => { + if let ScalarValue::Boolean(Some(value)) = scalar { + return *value; + } + } + } + false + } + _ => false, + } +} + fn concat_elements(left: Arc, right: Arc) -> Result { Ok(match left.data_type() { DataType::Utf8 => Arc::new(concat_elements_utf8( @@ -4832,4 +4892,39 @@ mod tests { Ok(()) } + + #[test] + fn test_check_short_circuit() { + use crate::planner::logical2physical; + use datafusion_expr::col as logical_col; + use datafusion_expr::lit; + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let a_array = Int32Array::from(vec![1, 3, 4, 5, 6]); + let b_array = Int32Array::from(vec![1, 2, 3, 4, 5]); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(a_array), Arc::new(b_array)], + ) + .unwrap(); + + // op: AND left: all false + let left_expr = logical2physical(&logical_col("a").eq(lit(2)), &schema); + let left_value = left_expr.evaluate(&batch).unwrap(); + assert!(check_short_circuit(&left_value, &Operator::And)); + // op: AND left: not all false + let left_expr = logical2physical(&logical_col("a").eq(lit(3)), &schema); + let left_value = left_expr.evaluate(&batch).unwrap(); + assert!(!check_short_circuit(&left_value, &Operator::And)); + // op: OR left: all true + let left_expr = logical2physical(&logical_col("a").gt(lit(0)), &schema); + let left_value = left_expr.evaluate(&batch).unwrap(); + assert!(check_short_circuit(&left_value, &Operator::Or)); + // op: OR left: not all true + let left_expr = logical2physical(&logical_col("a").gt(lit(2)), &schema); + let left_value = left_expr.evaluate(&batch).unwrap(); + assert!(!check_short_circuit(&left_value, &Operator::Or)); + } }