Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve formatting of logical plans containing subqueries #2899

Merged
merged 7 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,23 @@ impl std::fmt::Display for Expr {
/// List of expressions to feed to the functions as arguments
ref args,
} => fmt_function(f, &fun.to_string(), false, args, true),
Expr::Exists { negated, .. } => {
if *negated {
write!(f, "NOT EXISTS (<subquery>)")
} else {
write!(f, "EXISTS (<subquery>)")
}
}
Expr::InSubquery { negated, .. } => {
if *negated {
write!(f, "NOT IN (<subquery>)")
} else {
write!(f, "IN (<subquery>)")
}
}
Expr::ScalarSubquery(_) => {
write!(f, "(<subquery>)")
}
_ => write!(f, "{:?}", self),
}
}
Expand Down
30 changes: 18 additions & 12 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1217,11 +1217,13 @@ mod tests {
.filter(exists(Arc::new(subquery)))?
.build()?;

let expected = "Filter: EXISTS (\
Subquery: Filter: #foo.a = #bar.a\
\n Projection: #foo.a\
\n TableScan: foo)\
\n Projection: #bar.a\n TableScan: bar";
let expected = "Filter: EXISTS (<subquery>)\
\n Subquery:\
\n Filter: #foo.a = #bar.a\
\n Projection: #foo.a\
\n TableScan: foo\
\n Projection: #bar.a\
\n TableScan: bar";
assert_eq!(expected, format!("{:?}", outer_query));

Ok(())
Expand All @@ -1243,9 +1245,11 @@ mod tests {
.filter(in_subquery(col("a"), Arc::new(subquery)))?
.build()?;

let expected = "Filter: #bar.a IN (Subquery: Filter: #foo.a = #bar.a\
\n Projection: #foo.a\
\n TableScan: foo)\
let expected = "Filter: #bar.a IN (<subquery>)\
\n Subquery:\
\n Filter: #foo.a = #bar.a\
\n Projection: #foo.a\
\n TableScan: foo\
\n Projection: #bar.a\
\n TableScan: bar";
assert_eq!(expected, format!("{:?}", outer_query));
Expand All @@ -1268,10 +1272,12 @@ mod tests {
.project(vec![scalar_subquery(Arc::new(subquery))])?
.build()?;

let expected = "Projection: (Subquery: Filter: #foo.a = #bar.a\
\n Projection: #foo.b\
\n TableScan: foo)\
\n TableScan: bar";
let expected = "Projection: (<subquery>)\
\n Subquery:\
\n Filter: #foo.a = #bar.a\
\n Projection: #foo.b\
\n TableScan: foo\
\n TableScan: bar";
assert_eq!(expected, format!("{:?}", outer_query));

Ok(())
Expand Down
102 changes: 78 additions & 24 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl LogicalPlan {
}

/// returns all inputs of this `LogicalPlan` node. Does not
/// include inputs to inputs.
/// include inputs to inputs, or subqueries.
pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> {
match self {
LogicalPlan::Projection(Projection { input, .. }) => vec![input],
Expand Down Expand Up @@ -396,8 +396,10 @@ impl LogicalPlan {
}

let recurse = match self {
LogicalPlan::Projection(Projection { input, .. }) => input.accept(visitor)?,
LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?,
LogicalPlan::Projection(Projection { .. }) => {
self.visit_all_inputs(visitor)?
}
LogicalPlan::Filter(Filter { .. }) => self.visit_all_inputs(visitor)?,
Comment on lines +399 to +402
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change. We now visit all inputs, including subqueries. I have only implemented this for Projection and Filter in this PR.

LogicalPlan::Repartition(Repartition { input, .. }) => {
input.accept(visitor)?
}
Expand Down Expand Up @@ -457,6 +459,51 @@ impl LogicalPlan {

Ok(true)
}

/// Visit all inputs, including subqueries
pub fn visit_all_inputs<V>(&self, visitor: &mut V) -> Result<bool, V::Error>
where
V: PlanVisitor,
{
for input in self.all_inputs() {
if !input.accept(visitor)? {
return Ok(false);
}
}

Ok(true)
}

/// Get all plan inputs, including subqueries from expressions
fn all_inputs(&self) -> Vec<Arc<LogicalPlan>> {
let mut inputs = vec![];
for expr in self.expressions() {
self.collect_subqueries(&expr, &mut inputs);
}
for input in self.inputs() {
inputs.push(Arc::new(input.clone()));
}
inputs
}

fn collect_subqueries(&self, expr: &Expr, sub: &mut Vec<Arc<LogicalPlan>>) {
match expr {
Expr::BinaryExpr { left, right, .. } => {
self.collect_subqueries(left, sub);
self.collect_subqueries(right, sub);
}
Expr::Exists { subquery, .. } => {
sub.push(Arc::new(LogicalPlan::Subquery(subquery.clone())));
}
Expr::InSubquery { subquery, .. } => {
sub.push(Arc::new(LogicalPlan::Subquery(subquery.clone())));
}
Expr::ScalarSubquery(subquery) => {
sub.push(Arc::new(LogicalPlan::Subquery(subquery.clone())));
}
_ => {}
}
}
}

// Various implementations for printing out LogicalPlans
Expand Down Expand Up @@ -826,8 +873,8 @@ impl LogicalPlan {
fetch.map_or_else(|| "None".to_string(), |x| x.to_string())
)
}
LogicalPlan::Subquery(Subquery { subquery, .. }) => {
write!(f, "Subquery: {:?}", subquery)
LogicalPlan::Subquery(Subquery { .. }) => {
write!(f, "Subquery:")
}
LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
write!(f, "SubqueryAlias: {}", alias)
Expand Down Expand Up @@ -1245,7 +1292,7 @@ pub struct Subquery {

impl Debug for Subquery {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "Subquery: {:?}", self.subquery)
write!(f, "<subquery>")
}
}

Expand Down Expand Up @@ -1360,8 +1407,9 @@ pub trait ToStringifiedPlan {
mod tests {
use super::*;
use crate::logical_plan::table_scan;
use crate::{col, lit};
use crate::{col, in_subquery, lit};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;

fn employee_schema() -> Schema {
Schema::new(vec![
Expand All @@ -1373,42 +1421,47 @@ mod tests {
])
}

fn display_plan() -> LogicalPlan {
table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))
.unwrap()
.filter(col("state").eq(lit("CO")))
.unwrap()
.project(vec![col("id")])
.unwrap()
fn display_plan() -> Result<LogicalPlan> {
let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
.build()?;

table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
.filter(in_subquery(col("state"), Arc::new(plan1)))?
.project(vec![col("id")])?
.build()
.unwrap()
}

#[test]
fn test_display_indent() {
let plan = display_plan();
fn test_display_indent() -> Result<()> {
let plan = display_plan()?;

let expected = "Projection: #employee_csv.id\
\n Filter: #employee_csv.state = Utf8(\"CO\")\
\n Filter: #employee_csv.state IN (<subquery>)\
\n Subquery:\
\n TableScan: employee_csv projection=[state]\
\n TableScan: employee_csv projection=[id, state]";

assert_eq!(expected, format!("{}", plan.display_indent()));
Ok(())
}

#[test]
fn test_display_indent_schema() {
let plan = display_plan();
fn test_display_indent_schema() -> Result<()> {
let plan = display_plan()?;

let expected = "Projection: #employee_csv.id [id:Int32]\
\n Filter: #employee_csv.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
\n TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]";
\n Filter: #employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]\
\n Subquery: [state:Utf8]\
\n TableScan: employee_csv projection=[state] [state:Utf8]\
\n TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]";

assert_eq!(expected, format!("{}", plan.display_indent_schema()));
Ok(())
}

#[test]
fn test_display_graphviz() {
let plan = display_plan();
fn test_display_graphviz() -> Result<()> {
let plan = display_plan()?;

// just test for a few key lines in the output rather than the
// whole thing to make test mainteance easier.
Expand All @@ -1435,6 +1488,7 @@ mod tests {
"\n{}",
plan.display_graphviz()
);
Ok(())
}

/// Tests for the Visitor trait and walking logical plan nodes
Expand Down
10 changes: 8 additions & 2 deletions datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2072,15 +2072,21 @@ mod tests {

// filter on col b in subquery
let expected_before = "\
Filter: #b IN (Subquery: Projection: #sq.c\n TableScan: sq)\
Filter: #b IN (<subquery>)\
\n Subquery:\
\n Projection: #sq.c\
\n TableScan: sq\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test";
assert_eq!(format!("{:?}", plan), expected_before);

// rewrite filter col b to test.a
let expected_after = "\
Projection: #test.a AS b, #test.c\
\n Filter: #test.a IN (Subquery: Projection: #sq.c\n TableScan: sq)\
\n Filter: #test.a IN (<subquery>)\
\n Subquery:\
\n Projection: #sq.c\
\n TableScan: sq\
\n TableScan: test";
assert_optimized_plan_eq(&plan, expected_after);

Expand Down
16 changes: 10 additions & 6 deletions datafusion/optimizer/src/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,11 @@ mod test {

// Limit pushdown Not supported in sub_query
let expected = "Limit: skip=10, fetch=100\
\n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
\n Projection: #test1.a\
\n TableScan: test1)\
\n Filter: EXISTS (<subquery>)\
\n Subquery:\
\n Filter: #test1.a = #test1.a\
\n Projection: #test1.a\
\n TableScan: test1\
\n Projection: #test2.a\
\n TableScan: test2";

Expand All @@ -705,9 +707,11 @@ mod test {

// Limit pushdown Not supported in sub_query
let expected = "Limit: skip=10, fetch=100\
\n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
\n Projection: #test1.a\
\n TableScan: test1)\
\n Filter: EXISTS (<subquery>)\
\n Subquery:\
\n Filter: #test1.a = #test1.a\
\n Projection: #test1.a\
\n TableScan: test1\
\n Projection: #test2.a\
\n TableScan: test2";

Expand Down
23 changes: 14 additions & 9 deletions datafusion/optimizer/src/subquery_filter_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,10 @@ mod tests {
.build()?;

let expected = "Projection: #test.b [b:UInt32]\
\n Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) OR #test.c IN (\
Subquery: Projection: #sq.c\
\n TableScan: sq) [a:UInt32, b:UInt32, c:UInt32]\
\n Filter: #test.a = UInt32(1) AND #test.b < UInt32(30) OR #test.c IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
\n Subquery: [c:UInt32]\
\n Projection: #sq.c [c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_eq(&plan, expected);
Expand All @@ -352,9 +353,13 @@ mod tests {
.build()?;

let expected = "Projection: #test.b [b:UInt32]\
\n Filter: #test.a = UInt32(1) OR #test.b IN (Subquery: Projection: #sq1.c\
\n TableScan: sq1) AND #test.c IN (Subquery: Projection: #sq2.c\
\n TableScan: sq2) [a:UInt32, b:UInt32, c:UInt32]\
\n Filter: #test.a = UInt32(1) OR #test.b IN (<subquery>) AND #test.c IN (<subquery>) [a:UInt32, b:UInt32, c:UInt32]\
\n Subquery: [c:UInt32]\
\n Projection: #sq1.c [c:UInt32]\
\n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
\n Subquery: [c:UInt32]\
\n Projection: #sq2.c [c:UInt32]\
\n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_eq(&plan, expected);
Expand Down Expand Up @@ -405,9 +410,9 @@ mod tests {
.build()?;

let expected = "Projection: #wrapped.b [b:UInt32]\
\n Filter: #wrapped.b < UInt32(30) OR #wrapped.c IN (\
Subquery: Projection: #sq_outer.c\
\n TableScan: sq_outer) [b:UInt32, c:UInt32]\
\n Filter: #wrapped.b < UInt32(30) OR #wrapped.c IN (<subquery>) [b:UInt32, c:UInt32]\
\n Subquery: [c:UInt32]\n Projection: #sq_outer.c [c:UInt32]\
\n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: #test.b, #test.c, alias=wrapped [b:UInt32, c:UInt32]\
\n Semi Join: #test.c = #sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
Expand Down
Loading