Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support join-filter pushdown for semi/anti join #4923

Merged
merged 6 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2112,6 +2112,34 @@ async fn left_semi_join() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn left_semi_join_pushdown() -> Result<()> {
let ctx = create_left_semi_anti_join_context_with_null_ids("t1_id", "t2_id", false)
.unwrap();

// assert logical plan
let sql = "SELECT t1.t1_id, t1.t1_name FROM t1 left semi join t2 on (t1.t1_id = t2.t2_id and t2.t2_int > 1)";
let msg = format!("Creating logical plan for '{sql}'");
let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
let plan = dataframe.into_optimized_plan()?;
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: t1.t1_id, t1.t1_name [t1_id:UInt32;N, t1_name:Utf8;N]",
" LeftSemi Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
" TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
" Filter: t2.t2_int > UInt32(1) [t2_id:UInt32;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);

Ok(())
}

#[tokio::test]
async fn left_anti_join() -> Result<()> {
let test_repartition_joins = vec![true, false];
Expand Down
39 changes: 39 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,42 @@ SELECT s.*, g.grade FROM students s join grades g on s.mark between g.min and g.
Amina 89 5
Salma 77 4
Christen 50 3

# two tables for join
statement ok
CREATE TABLE t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES
(11, 'a', 1),
(22, 'b', 2),
(33, 'c', 3),
(44, 'd', 4);

statement ok
CREATE TABLE t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
(11, 'z', 3),
(22, 'y', 1),
(44, 'x', 3),
(55, 'w', 3);

# left semi with wrong where clause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

query error DataFusion error: Schema error: No field named 't2'.'t2_id'. Valid fields are 't1'.'t1_id', 't1'.'t1_name', 't1'.'t1_int'.
SELECT t1.t1_id,
t1.t1_name,
t1.t1_int
FROM t1 LEFT SEMI
JOIN t2
ON (
t1.t1_id = t2.t2_id)
WHERE t2.t2_id > 1

# left semi join with on-filter
query ITI rowsort
SELECT t1.t1_id,
t1.t1_name,
t1.t1_int
FROM t1 LEFT SEMI
JOIN t2
ON (
t1.t1_id = t2.t2_id and t2.t2_int > 1)
----
11 a 1
44 d 4