Skip to content

Commit de1eb41

Browse files
zhuqi-lucascrepererum
authored andcommitted
feat: support distinct for window (apache#16925)
* feat: support distinct for window * fix * fix * fisx * fix unparse * fix test * fix test * easy way * add test * add comments
1 parent 1693eac commit de1eb41

File tree

22 files changed

+408
-43
lines changed

22 files changed

+408
-43
lines changed

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
199199
order_by: window_function.params.order_by,
200200
window_frame: window_function.params.window_frame,
201201
null_treatment: window_function.params.null_treatment,
202+
distinct: window_function.params.distinct,
202203
},
203204
}))
204205
};

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,6 +1517,7 @@ pub fn create_window_expr_with_name(
15171517
order_by,
15181518
window_frame,
15191519
null_treatment,
1520+
distinct,
15201521
},
15211522
} = window_fun.as_ref();
15221523
let physical_args =
@@ -1545,6 +1546,7 @@ pub fn create_window_expr_with_name(
15451546
window_frame,
15461547
physical_schema,
15471548
ignore_nulls,
1549+
*distinct,
15481550
)
15491551
}
15501552
other => plan_err!("Invalid window expression '{other:?}'"),

datafusion/core/tests/fuzz_cases/window_fuzz.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
289289
Arc::new(window_frame),
290290
&extended_schema,
291291
false,
292+
false,
292293
)?;
293294
let running_window_exec = Arc::new(BoundedWindowAggExec::try_new(
294295
vec![window_expr],
@@ -658,6 +659,7 @@ async fn run_window_test(
658659
Arc::new(window_frame.clone()),
659660
&extended_schema,
660661
false,
662+
false,
661663
)?],
662664
exec1,
663665
false,
@@ -676,6 +678,7 @@ async fn run_window_test(
676678
Arc::new(window_frame.clone()),
677679
&extended_schema,
678680
false,
681+
false,
679682
)?],
680683
exec2,
681684
search_mode.clone(),

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3401,6 +3401,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
34013401
case.window_frame,
34023402
input_schema.as_ref(),
34033403
false,
3404+
false,
34043405
)?;
34053406
let window_exec = if window_expr.uses_bounded_memory() {
34063407
Arc::new(BoundedWindowAggExec::try_new(

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ pub fn bounded_window_exec(
254254
Arc::new(WindowFrame::new(Some(false))),
255255
schema.as_ref(),
256256
false,
257+
false,
257258
)
258259
.unwrap();
259260

datafusion/expr/src/expr.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,8 @@ pub struct WindowFunctionParams {
948948
pub window_frame: WindowFrame,
949949
/// Specifies how NULL value is treated: ignore or respect
950950
pub null_treatment: Option<NullTreatment>,
951+
/// Distinct flag
952+
pub distinct: bool,
951953
}
952954

953955
impl WindowFunction {
@@ -962,6 +964,7 @@ impl WindowFunction {
962964
order_by: Vec::default(),
963965
window_frame: WindowFrame::new(None),
964966
null_treatment: None,
967+
distinct: false,
965968
},
966969
}
967970
}
@@ -2130,6 +2133,7 @@ impl NormalizeEq for Expr {
21302133
partition_by: self_partition_by,
21312134
order_by: self_order_by,
21322135
null_treatment: self_null_treatment,
2136+
distinct: self_distinct,
21332137
},
21342138
} = left.as_ref();
21352139
let WindowFunction {
@@ -2141,6 +2145,7 @@ impl NormalizeEq for Expr {
21412145
partition_by: other_partition_by,
21422146
order_by: other_order_by,
21432147
null_treatment: other_null_treatment,
2148+
distinct: other_distinct,
21442149
},
21452150
} = other.as_ref();
21462151

@@ -2164,6 +2169,7 @@ impl NormalizeEq for Expr {
21642169
&& a.nulls_first == b.nulls_first
21652170
&& a.expr.normalize_eq(&b.expr)
21662171
})
2172+
&& self_distinct == other_distinct
21672173
}
21682174
(
21692175
Expr::Exists(Exists {
@@ -2397,11 +2403,13 @@ impl HashNode for Expr {
23972403
order_by: _,
23982404
window_frame,
23992405
null_treatment,
2406+
distinct,
24002407
},
24012408
} = window_fun.as_ref();
24022409
fun.hash(state);
24032410
window_frame.hash(state);
24042411
null_treatment.hash(state);
2412+
distinct.hash(state);
24052413
}
24062414
Expr::InList(InList {
24072415
expr: _expr,
@@ -2704,15 +2712,27 @@ impl Display for SchemaDisplay<'_> {
27042712
order_by,
27052713
window_frame,
27062714
null_treatment,
2715+
distinct,
27072716
} = params;
27082717

2718+
// Write function name and open parenthesis
2719+
write!(f, "{fun}(")?;
2720+
2721+
// If DISTINCT, emit the keyword
2722+
if *distinct {
2723+
write!(f, "DISTINCT ")?;
2724+
}
2725+
2726+
// Write the comma‑separated argument list
27092727
write!(
27102728
f,
2711-
"{}({})",
2712-
fun,
2729+
"{}",
27132730
schema_name_from_exprs_comma_separated_without_space(args)?
27142731
)?;
27152732

2733+
// **Close the argument parenthesis**
2734+
write!(f, ")")?;
2735+
27162736
if let Some(null_treatment) = null_treatment {
27172737
write!(f, " {null_treatment}")?;
27182738
}
@@ -3099,9 +3119,10 @@ impl Display for Expr {
30993119
order_by,
31003120
window_frame,
31013121
null_treatment,
3122+
distinct,
31023123
} = params;
31033124

3104-
fmt_function(f, &fun.to_string(), false, args, true)?;
3125+
fmt_function(f, &fun.to_string(), *distinct, args, true)?;
31053126

31063127
if let Some(nt) = null_treatment {
31073128
write!(f, "{nt}")?;

datafusion/expr/src/expr_fn.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,7 @@ impl ExprFuncBuilder {
841841
window_frame: window_frame
842842
.unwrap_or_else(|| WindowFrame::new(has_order_by)),
843843
null_treatment,
844+
distinct,
844845
},
845846
})
846847
}

datafusion/expr/src/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ pub struct RawWindowExpr {
308308
pub order_by: Vec<SortExpr>,
309309
pub window_frame: WindowFrame,
310310
pub null_treatment: Option<NullTreatment>,
311+
pub distinct: bool,
311312
}
312313

313314
/// Result of planning a raw expr with [`ExprPlanner`]

datafusion/expr/src/tree_node.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,22 @@ impl TreeNode for Expr {
242242
order_by,
243243
window_frame,
244244
null_treatment,
245+
distinct,
245246
},
246247
} = *window_fun;
247248
(args, partition_by, order_by).map_elements(f)?.update_data(
248249
|(new_args, new_partition_by, new_order_by)| {
250+
if distinct {
251+
return Expr::from(WindowFunction::new(fun, new_args))
252+
.partition_by(new_partition_by)
253+
.order_by(new_order_by)
254+
.window_frame(window_frame)
255+
.null_treatment(null_treatment)
256+
.distinct()
257+
.build()
258+
.unwrap();
259+
}
260+
249261
Expr::from(WindowFunction::new(fun, new_args))
250262
.partition_by(new_partition_by)
251263
.order_by(new_order_by)

datafusion/expr/src/udaf.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -522,14 +522,25 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
522522
order_by,
523523
window_frame,
524524
null_treatment,
525+
distinct,
525526
} = params;
526527

527528
let mut schema_name = String::new();
528-
schema_name.write_fmt(format_args!(
529-
"{}({})",
530-
self.name(),
531-
schema_name_from_exprs(args)?
532-
))?;
529+
530+
// Inject DISTINCT into the schema name when requested
531+
if *distinct {
532+
schema_name.write_fmt(format_args!(
533+
"{}(DISTINCT {})",
534+
self.name(),
535+
schema_name_from_exprs(args)?
536+
))?;
537+
} else {
538+
schema_name.write_fmt(format_args!(
539+
"{}({})",
540+
self.name(),
541+
schema_name_from_exprs(args)?
542+
))?;
543+
}
533544

534545
if let Some(null_treatment) = null_treatment {
535546
schema_name.write_fmt(format_args!(" {null_treatment}"))?;
@@ -547,7 +558,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
547558
" ORDER BY [{}]",
548559
schema_name_from_sorts(order_by)?
549560
))?;
550-
};
561+
}
551562

552563
schema_name.write_fmt(format_args!(" {window_frame}"))?;
553564

@@ -615,15 +626,24 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
615626
order_by,
616627
window_frame,
617628
null_treatment,
629+
distinct,
618630
} = params;
619631

620632
let mut display_name = String::new();
621633

622-
display_name.write_fmt(format_args!(
623-
"{}({})",
624-
self.name(),
625-
expr_vec_fmt!(args)
626-
))?;
634+
if *distinct {
635+
display_name.write_fmt(format_args!(
636+
"{}(DISTINCT {})",
637+
self.name(),
638+
expr_vec_fmt!(args)
639+
))?;
640+
} else {
641+
display_name.write_fmt(format_args!(
642+
"{}({})",
643+
self.name(),
644+
expr_vec_fmt!(args)
645+
))?;
646+
}
627647

628648
if let Some(null_treatment) = null_treatment {
629649
display_name.write_fmt(format_args!(" {null_treatment}"))?;

0 commit comments

Comments
 (0)