Skip to content

Commit

Permalink
Simplify InListExpr ~20-70% Faster (apache#4057)
Browse files Browse the repository at this point in the history
* Simplify InList expression

* Simplify

* Hash floats as integers

* Fix tests

* Format

* Update datafusion-cli lockfile

* Sort Cargo.toml

* Update datafusion/physical-expr/src/expressions/in_list.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
2 people authored and Dandandan committed Nov 5, 2022
1 parent 046b718 commit b62f265
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 1,140 deletions.
6 changes: 3 additions & 3 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.14", optional = true }
arrow = { version = "25.0.0", features = ["prettyprint"] }
arrow-buffer = "25.0.0"
arrow-schema = "25.0.0"

async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "futures-io", "tokio"] }
async-trait = "0.1.41"
bytes = "1.1"
Expand All @@ -74,7 +73,6 @@ datafusion-sql = { path = "../sql", version = "13.0.0" }
flate2 = "1.0.24"
futures = "0.3"
glob = "0.3.0"
half = { version = "2.1", default-features = false }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = "0.10"
lazy_static = { version = "^1.4.0" }
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ pub mod empty;
pub mod explain;
pub mod file_format;
pub mod filter;
pub mod hash_utils;
pub mod joins;
pub mod limit;
pub mod memory;
Expand All @@ -541,4 +540,6 @@ pub mod values;
pub mod windows;

use crate::execution::context::TaskContext;
pub use datafusion_physical_expr::{expressions, functions, type_coercion, udf};
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, type_coercion, udf,
};
49 changes: 3 additions & 46 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2035,8 +2035,9 @@ mod tests {
.build()?;
let execution_plan = plan(&logical_plan).await?;
// verify that the plan correctly adds cast from Int64(1) to Utf8, and the const will be evaluated.
let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, Literal { value: Utf8(\"1\") }], negated: false, set: None }";
assert!(format!("{:?}", execution_plan).contains(expected));
let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, Literal { value: Utf8(\"1\") }], negated: false }";
let actual = format!("{:?}", execution_plan);
assert!(actual.contains(expected), "{}", actual);

Ok(())
}
Expand Down Expand Up @@ -2068,50 +2069,6 @@ mod tests {
lit(struct_literal)
}

#[tokio::test]
async fn in_set_test() -> Result<()> {
// OPTIMIZER_INSET_THRESHOLD = 10
// expression: "a in ('a', 1, 2, ..30)"
let mut list = vec![Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))];
for i in 1..31 {
list.push(Expr::Literal(ScalarValue::Int64(Some(i))));
}
let logical_plan = test_csv_scan()
.await?
.filter(col("c12").lt(lit(0.05)))?
.project(vec![col("c1").in_list(list, false)])?
.build()?;
let execution_plan = plan(&logical_plan).await?;
let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, Literal { value: Utf8(\"1\") }, Literal { value: Utf8(\"2\") },";
assert!(format!("{:?}", execution_plan).contains(expected));
let expected =
"Literal { value: Utf8(\"30\") }], negated: false, set: Some(InSet { set: ";
assert!(format!("{:?}", execution_plan).contains(expected));
Ok(())
}

#[tokio::test]
async fn in_set_null_test() -> Result<()> {
// test NULL
let mut list = vec![Expr::Literal(ScalarValue::Int64(None))];
for i in 1..31 {
list.push(Expr::Literal(ScalarValue::Int64(Some(i))));
}

let logical_plan = test_csv_scan()
.await?
.filter(col("c12").lt(lit(0.05)))?
.project(vec![col("c1").in_list(list, false)])?
.build()?;
let execution_plan = plan(&logical_plan).await?;
let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(NULL) }, Literal { value: Utf8(\"1\") }, Literal { value: Utf8(\"2\") }";
assert!(format!("{:?}", execution_plan).contains(expected));
let expected =
"Literal { value: Utf8(\"30\") }], negated: false, set: Some(InSet";
assert!(format!("{:?}", execution_plan).contains(expected));
Ok(())
}

#[tokio::test]
async fn hash_agg_input_schema() -> Result<()> {
let logical_plan = test_csv_scan_with_name("aggregate_test_100")
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ unicode_expressions = ["unicode-segmentation"]
[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { version = "25.0.0", features = ["prettyprint"] }
arrow-buffer = "25.0.0"
arrow-schema = "25.0.0"
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4.22", default-features = false }
datafusion-common = { path = "../common", version = "13.0.0" }
datafusion-expr = { path = "../expr", version = "13.0.0" }
datafusion-row = { path = "../row", version = "13.0.0" }
half = { version = "2.1", default-features = false }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = { version = "0.10", features = ["use_std"] }
lazy_static = { version = "^1.4.0" }
Expand Down
Loading

0 comments on commit b62f265

Please sign in to comment.