Skip to content

Commit f3941b2

Browse files
haohuaijinalamb
andauthored
feat: implement QUALIFY clause (apache#16933)
* feat: implement QUALIFY clause * add document --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent a2f84f3 commit f3941b2

File tree

4 files changed

+420
-10
lines changed

4 files changed

+420
-10
lines changed

datafusion/sql/src/select.rs

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
6666
if !select.lateral_views.is_empty() {
6767
return not_impl_err!("LATERAL VIEWS");
6868
}
69-
if select.qualify.is_some() {
70-
return not_impl_err!("QUALIFY");
71-
}
69+
7270
if select.top.is_some() {
7371
return not_impl_err!("TOP");
7472
}
@@ -148,6 +146,33 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
148146
})
149147
.transpose()?;
150148

149+
// Optionally the QUALIFY expression.
150+
let qualify_expr_opt = select
151+
.qualify
152+
.map::<Result<Expr>, _>(|qualify_expr| {
153+
let qualify_expr = self.sql_expr_to_logical_expr(
154+
qualify_expr,
155+
&combined_schema,
156+
planner_context,
157+
)?;
158+
// This step "dereferences" any aliases in the QUALIFY clause.
159+
//
160+
// This is how we support queries with QUALIFY expressions that
161+
// refer to aliased columns.
162+
//
163+
// For example:
164+
//
165+
// select row_number() over (PARTITION BY id) as rk from users qualify rk > 1;
166+
//
167+
// are rewritten as, respectively:
168+
//
169+
// select row_number() over (PARTITION BY id) as rk from users qualify row_number() over (PARTITION BY id) > 1;
170+
//
171+
let qualify_expr = resolve_aliases_to_exprs(qualify_expr, &alias_map)?;
172+
normalize_col(qualify_expr, &projected_plan)
173+
})
174+
.transpose()?;
175+
151176
// The outer expressions we will search through for aggregates.
152177
// Aggregates may be sourced from the SELECT list or from the HAVING expression.
153178
let aggr_expr_haystack = select_exprs.iter().chain(having_expr_opt.iter());
@@ -225,8 +250,12 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
225250
plan
226251
};
227252

228-
// Process window function
229-
let window_func_exprs = find_window_exprs(&select_exprs_post_aggr);
253+
// The outer expressions we will search through for window functions.
254+
// Window functions may be sourced from the SELECT list or from the QUALIFY expression.
255+
let windows_expr_haystack =
256+
select_exprs_post_aggr.iter().chain(qualify_expr_opt.iter());
257+
// All of the window expressions (deduplicated).
258+
let window_func_exprs = find_window_exprs(windows_expr_haystack);
230259

231260
let plan = if window_func_exprs.is_empty() {
232261
plan
@@ -242,6 +271,39 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
242271
plan
243272
};
244273

274+
// Process QUALIFY clause after window functions
275+
// QUALIFY filters the results of window functions, similar to how HAVING filters aggregates
276+
let plan = if let Some(qualify_expr) = qualify_expr_opt {
277+
// Validate that QUALIFY is used with window functions
278+
if window_func_exprs.is_empty() {
279+
return plan_err!(
280+
"QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
281+
);
282+
}
283+
284+
// now attempt to resolve columns and replace with fully-qualified columns
285+
let windows_projection_exprs = window_func_exprs
286+
.iter()
287+
.map(|expr| resolve_columns(expr, &plan))
288+
.collect::<Result<Vec<Expr>>>()?;
289+
290+
// Rewrite the qualify expression to reference columns from the window plan
291+
let qualify_expr_post_window =
292+
rebase_expr(&qualify_expr, &windows_projection_exprs, &plan)?;
293+
294+
// Validate that the qualify expression can be resolved from the window plan schema
295+
self.validate_schema_satisfies_exprs(
296+
plan.schema(),
297+
std::slice::from_ref(&qualify_expr_post_window),
298+
)?;
299+
300+
LogicalPlanBuilder::from(plan)
301+
.filter(qualify_expr_post_window)?
302+
.build()?
303+
} else {
304+
plan
305+
};
306+
245307
// Try processing unnest expression or do the final projection
246308
let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?;
247309

datafusion/sql/tests/sql_integration.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use datafusion_functions_aggregate::{
4343
};
4444
use datafusion_functions_aggregate::{average::avg_udaf, grouping::grouping_udaf};
4545
use datafusion_functions_nested::make_array::make_array_udf;
46-
use datafusion_functions_window::rank::rank_udwf;
46+
use datafusion_functions_window::{rank::rank_udwf, row_number::row_number_udwf};
4747
use insta::{allow_duplicates, assert_snapshot};
4848
use rstest::rstest;
4949
use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect};
@@ -3298,6 +3298,7 @@ fn logical_plan_with_dialect_and_options(
32983298
.with_aggregate_function(max_udaf())
32993299
.with_aggregate_function(grouping_udaf())
33003300
.with_window_function(rank_udwf())
3301+
.with_window_function(row_number_udwf())
33013302
.with_expr_planner(Arc::new(CoreFunctionPlanner::default()));
33023303

33033304
let context = MockContextProvider { state };
@@ -4186,6 +4187,47 @@ fn test_select_distinct_order_by() {
41864187
);
41874188
}
41884189

4190+
#[test]
4191+
fn test_select_qualify_basic() {
4192+
let sql = "SELECT person.id, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn FROM person QUALIFY rn = 1";
4193+
let plan = logical_plan(sql).unwrap();
4194+
assert_snapshot!(
4195+
plan,
4196+
@r#"
4197+
Projection: person.id, row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn
4198+
Filter: row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW = Int64(1)
4199+
WindowAggr: windowExpr=[[row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
4200+
TableScan: person
4201+
"#
4202+
);
4203+
}
4204+
4205+
#[test]
4206+
fn test_select_qualify_without_window_function() {
4207+
let sql = "SELECT person.id FROM person QUALIFY person.id > 1";
4208+
let err = logical_plan(sql).unwrap_err();
4209+
assert_eq!(
4210+
err.strip_backtrace(),
4211+
"Error during planning: QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
4212+
);
4213+
}
4214+
4215+
#[test]
4216+
fn test_select_qualify_complex_condition() {
4217+
let sql = "SELECT person.id, person.age, ROW_NUMBER() OVER (PARTITION BY person.age ORDER BY person.id) as rn, RANK() OVER (ORDER BY person.salary) as rank FROM person QUALIFY rn <= 2 AND rank <= 5";
4218+
let plan = logical_plan(sql).unwrap();
4219+
assert_snapshot!(
4220+
plan,
4221+
@r#"
4222+
Projection: person.id, person.age, row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn, rank() ORDER BY [person.salary ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rank
4223+
Filter: row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(2) AND rank() ORDER BY [person.salary ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Int64(5)
4224+
WindowAggr: windowExpr=[[rank() ORDER BY [person.salary ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
4225+
WindowAggr: windowExpr=[[row_number() PARTITION BY [person.age] ORDER BY [person.id ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
4226+
TableScan: person
4227+
"#
4228+
);
4229+
}
4230+
41894231
#[rstest]
41904232
#[case::select_cluster_by_unsupported(
41914233
"SELECT customer_name, sum(order_total) as total_order_amount FROM orders CLUSTER BY customer_name",
@@ -4195,10 +4237,6 @@ fn test_select_distinct_order_by() {
41954237
"SELECT id, number FROM person LATERAL VIEW explode(numbers) exploded_table AS number",
41964238
"This feature is not implemented: LATERAL VIEWS"
41974239
)]
4198-
#[case::select_qualify_unsupported(
4199-
"SELECT i, p, o FROM person QUALIFY ROW_NUMBER() OVER (PARTITION BY p ORDER BY o) = 1",
4200-
"This feature is not implemented: QUALIFY"
4201-
)]
42024240
#[case::select_top_unsupported(
42034241
"SELECT TOP (5) * FROM person",
42044242
"This feature is not implemented: TOP"

0 commit comments

Comments
 (0)