Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Optimizer example and docs, deprecate Expr::name #3788

Merged
merged 16 commits into from
Oct 12, 2022
5 changes: 5 additions & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow = "24.0.0"
arrow-flight = "24.0.0"
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
datafusion-common = { path = "../datafusion/common" }
datafusion-expr = { path = "../datafusion/expr" }
datafusion-optimizer = { path = "../datafusion/optimizer" }
datafusion-sql = { path = "../datafusion/sql" }
futures = "0.3"
num_cpus = "1.13.0"
object_store = { version = "0.5.0", features = ["aws"] }
Expand Down
163 changes: 163 additions & 0 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::{AggregateUDF, Expr, Filter, LogicalPlan, ScalarUDF, TableSource};
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{utils, OptimizerConfig, OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;
use std::any::Any;
use std::sync::Arc;

pub fn main() -> Result<()> {
// produce a logical plan using the datafusion-sql crate
let dialect = PostgreSqlDialect {};
let sql = "SELECT * FROM person WHERE age BETWEEN 21 AND 32";
let statements = Parser::parse_sql(&dialect, sql)?;

// produce a logical plan using the datafusion-sql crate
let context_provider = MyContextProvider {};
let sql_to_rel = SqlToRel::new(&context_provider);
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?;
println!(
"Unoptimized Logical Plan:\n\n{}\n",
logical_plan.display_indent()
);

// now run the optimizer with our custom rule
let optimizer = Optimizer::with_rules(vec![Arc::new(MyRule {})]);
let mut optimizer_config = OptimizerConfig::default().with_skip_failing_rules(false);
let optimized_plan =
optimizer.optimize(&logical_plan, &mut optimizer_config, observe)?;
println!(
"Optimized Logical Plan:\n\n{}\n",
optimized_plan.display_indent()
);

Ok(())
}

fn observe(plan: &LogicalPlan, rule: &dyn OptimizerRule) {
println!(
"After applying rule '{}':\n{}\n",
rule.name(),
plan.display_indent()
)
}

struct MyRule {}

impl OptimizerRule for MyRule {
fn name(&self) -> &str {
"my_rule"
}

fn optimize(
&self,
plan: &LogicalPlan,
_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
// recurse down and optimize children first
let plan = utils::optimize_children(self, plan, _config)?;

match plan {
LogicalPlan::Filter(filter) => {
let mut expr_rewriter = MyExprRewriter {};
let predicate = filter.predicate().clone();
let predicate = predicate.rewrite(&mut expr_rewriter)?;
Ok(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input().clone(),
)?))
}
_ => Ok(plan.clone()),
}
}
}

struct MyExprRewriter {}

impl ExprRewriter for MyExprRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::Between {
negated,
expr,
low,
high,
} => {
let expr: Expr = expr.as_ref().clone();
let low: Expr = low.as_ref().clone();
let high: Expr = high.as_ref().clone();
if negated {
Ok(expr.clone().lt(low).or(expr.gt(high)))
} else {
Ok(expr.clone().gt_eq(low).and(expr.lt_eq(high)))
}
}
_ => Ok(expr.clone()),
}
}
}

struct MyContextProvider {}

impl ContextProvider for MyContextProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
if name.table() == "person" {
Ok(Arc::new(MyTableSource {
schema: Arc::new(Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::UInt8, false),
])),
}))
} else {
Err(DataFusionError::Plan("table not found".to_string()))
}
}

fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}
}

struct MyTableSource {
schema: SchemaRef,
}

impl TableSource for MyTableSource {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
13 changes: 10 additions & 3 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,17 @@ impl PartialOrd for Expr {
impl Expr {
/// Returns the name of this expression as it should appear in a schema. This name
/// will not include any CAST expressions.
pub fn name(&self) -> Result<String> {
pub fn display_name(&self) -> Result<String> {
create_name(self)
}

/// Returns the name of this expression as it should appear in a schema. This name
/// will not include any CAST expressions.
#[deprecated(since = "14.0.0", note = "please use `display_name` instead")]
pub fn name(&self) -> Result<String> {
self.display_name()
}

/// Returns a full and complete string representation of this expression.
pub fn canonical_name(&self) -> String {
format!("{}", self)
Expand Down Expand Up @@ -1186,7 +1193,7 @@ mod test {
assert_eq!(expected, expr.canonical_name());
assert_eq!(expected, format!("{}", expr));
assert_eq!(expected, format!("{:?}", expr));
assert_eq!(expected, expr.name()?);
assert_eq!(expected, expr.display_name()?);
Ok(())
}

Expand All @@ -1202,7 +1209,7 @@ mod test {
assert_eq!(expected_canonical, format!("{:?}", expr));
// note that CAST intentionally has a name that is different from its `Display`
// representation. CAST does not change the name of expressions.
assert_eq!("Float32(1.23)", expr.name()?);
assert_eq!("Float32(1.23)", expr.display_name()?);
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl ExprSchemable for Expr {
)),
_ => Ok(DFField::new(
None,
&self.name()?,
&self.display_name()?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ pub(crate) fn validate_unique_names<'a>(
) -> Result<()> {
let mut unique_names = HashMap::new();
expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
let name = expr.name()?;
let name = expr.display_name()?;
match unique_names.get(&name) {
None => {
unique_names.insert(name, (position, expr));
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name)
}
Expr::ScalarSubquery(_) => e.clone(),
_ => match e.name() {
_ => match e.display_name() {
Ok(name) => match input_schema.field_with_unqualified_name(&name) {
Ok(field) => Expr::Column(field.qualified_column()),
// expression not provided as input, do not convert to a column reference
Expand Down Expand Up @@ -728,7 +728,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
let field = plan.schema().field_from_column(col)?;
Ok(Expr::Column(field.qualified_column()))
}
_ => Ok(Expr::Column(Column::from_name(expr.name()?))),
_ => Ok(Expr::Column(Column::from_name(expr.display_name()?))),
}
}

Expand Down
Loading