Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 67 additions & 5 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
if !select.lateral_views.is_empty() {
return not_impl_err!("LATERAL VIEWS");
}
if select.qualify.is_some() {
return not_impl_err!("QUALIFY");
}

if select.top.is_some() {
return not_impl_err!("TOP");
}
Expand Down Expand Up @@ -148,6 +146,33 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
})
.transpose()?;

// Optionally the QUALIFY expression.
let qualify_expr_opt = select
.qualify
.map::<Result<Expr>, _>(|qualify_expr| {
let qualify_expr = self.sql_expr_to_logical_expr(
qualify_expr,
&combined_schema,
planner_context,
)?;
// This step "dereferences" any aliases in the QUALIFY clause.
//
// This is how we support queries with QUALIFY expressions that
// refer to aliased columns.
//
// For example:
//
// select row_number() over (PARTITION BY id) as rk from users qualify rk > 1;
//
// are rewritten as, respectively:
//
// select row_number() over (PARTITION BY id) as rk from users qualify row_number() over (PARTITION BY id) > 1;
//
let qualify_expr = resolve_aliases_to_exprs(qualify_expr, &alias_map)?;
normalize_col(qualify_expr, &projected_plan)
})
.transpose()?;

// The outer expressions we will search through for aggregates.
// Aggregates may be sourced from the SELECT list or from the HAVING expression.
let aggr_expr_haystack = select_exprs.iter().chain(having_expr_opt.iter());
Expand Down Expand Up @@ -225,8 +250,12 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
plan
};

// Process window function
let window_func_exprs = find_window_exprs(&select_exprs_post_aggr);
// The outer expressions we will search through for window functions.
// Window functions may be sourced from the SELECT list or from the QUALIFY expression.
let windows_expr_haystack =
select_exprs_post_aggr.iter().chain(qualify_expr_opt.iter());
// All of the window expressions (deduplicated).
let window_func_exprs = find_window_exprs(windows_expr_haystack);

let plan = if window_func_exprs.is_empty() {
plan
Expand All @@ -242,6 +271,39 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
plan
};

// Process QUALIFY clause after window functions
// QUALIFY filters the results of window functions, similar to how HAVING filters aggregates
let plan = if let Some(qualify_expr) = qualify_expr_opt {
// Validate that QUALIFY is used with window functions
if window_func_exprs.is_empty() {
return plan_err!(
"QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
);
}

// now attempt to resolve columns and replace with fully-qualified columns
let windows_projection_exprs = window_func_exprs
.iter()
.map(|expr| resolve_columns(expr, &plan))
.collect::<Result<Vec<Expr>>>()?;

// Rewrite the qualify expression to reference columns from the window plan
let qualify_expr_post_window =
rebase_expr(&qualify_expr, &windows_projection_exprs, &plan)?;

// Validate that the qualify expression can be resolved from the window plan schema
self.validate_schema_satisfies_exprs(
plan.schema(),
std::slice::from_ref(&qualify_expr_post_window),
)?;

LogicalPlanBuilder::from(plan)
.filter(qualify_expr_post_window)?
.build()?
} else {
plan
};

// Try processing unnest expression or do the final projection
let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?;

Expand Down
48 changes: 43 additions & 5 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use datafusion_functions_aggregate::{
};
use datafusion_functions_aggregate::{average::avg_udaf, grouping::grouping_udaf};
use datafusion_functions_nested::make_array::make_array_udf;
use datafusion_functions_window::rank::rank_udwf;
use datafusion_functions_window::{rank::rank_udwf, row_number::row_number_udwf};
use insta::{allow_duplicates, assert_snapshot};
use rstest::rstest;
use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};
Expand Down Expand Up @@ -3298,6 +3298,7 @@ fn logical_plan_with_dialect_and_options(
.with_aggregate_function(max_udaf())
.with_aggregate_function(grouping_udaf())
.with_window_function(rank_udwf())
.with_window_function(row_number_udwf())
.with_expr_planner(Arc::new(CoreFunctionPlanner::default()));

let context = MockContextProvider { state };
Expand Down Expand Up @@ -4186,6 +4187,47 @@ fn test_select_distinct_order_by() {
);
}

#[test]
fn test_select_qualify_basic() {
let sql = "SELECT person.id, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn FROM person QUALIFY rn = 1";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r#"
Projection: person.id, row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn
Filter: row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW = Int64(1)
WindowAggr: windowExpr=[[row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: person
"#
);
}

#[test]
fn test_select_qualify_without_window_function() {
let sql = "SELECT person.id FROM person QUALIFY person.id > 1";
let err = logical_plan(sql).unwrap_err();
assert_eq!(
err.strip_backtrace(),
"Error during planning: QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this is consistent with the DuckDB behavior:

D SELECT person.id FROM person QUALIFY person.id > 1;
Binder Error:
at least one window function must appear in the SELECT column or QUALIFY clause

);
}

#[test]
fn test_select_qualify_complex_condition() {
let sql = "SELECT person.id, person.age, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn, RANK() OVER (ORDER BY person.salary) as rank FROM person QUALIFY rn <= 2 AND rank <= 5";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r#"
Projection: person.id, person.age, row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn, rank() ORDER BY [person.salary ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rank
Filter: row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(2) AND rank() ORDER BY [person.salary ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(5)
WindowAggr: windowExpr=[[rank() ORDER BY [person.salary ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
WindowAggr: windowExpr=[[row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: person
"#
);
}

#[rstest]
#[case::select_cluster_by_unsupported(
"SELECT customer_name, sum(order_total) as total_order_amount FROM orders CLUSTER BY customer_name",
Expand All @@ -4195,10 +4237,6 @@ fn test_select_distinct_order_by() {
"SELECT id, number FROM person LATERAL VIEW explode(numbers) exploded_table AS number",
"This feature is not implemented: LATERAL VIEWS"
)]
#[case::select_qualify_unsupported(
"SELECT i, p, o FROM person QUALIFY ROW_NUMBER() OVER (PARTITION BY p ORDER BY o) = 1",
"This feature is not implemented: QUALIFY"
)]
#[case::select_top_unsupported(
"SELECT TOP (5) * FROM person",
"This feature is not implemented: TOP"
Expand Down
Loading