Skip to content
Merged
1 change: 1 addition & 0 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
order_by: window_function.params.order_by,
window_frame: window_function.params.window_frame,
null_treatment: window_function.params.null_treatment,
distinct: window_function.params.distinct,
},
}))
};
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,7 @@ pub fn create_window_expr_with_name(
order_by,
window_frame,
null_treatment,
distinct,
},
} = window_fun.as_ref();
let physical_args =
Expand Down Expand Up @@ -1677,6 +1678,7 @@ pub fn create_window_expr_with_name(
window_frame,
physical_schema,
ignore_nulls,
*distinct,
)
}
other => plan_err!("Invalid window expression '{other:?}'"),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
Arc::new(window_frame),
&extended_schema,
false,
false,
)?;
let running_window_exec = Arc::new(BoundedWindowAggExec::try_new(
vec![window_expr],
Expand Down Expand Up @@ -660,6 +661,7 @@ async fn run_window_test(
Arc::new(window_frame.clone()),
&extended_schema,
false,
false,
)?],
exec1,
false,
Expand All @@ -678,6 +680,7 @@ async fn run_window_test(
Arc::new(window_frame.clone()),
&extended_schema,
false,
false,
)?],
exec2,
search_mode.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3684,6 +3684,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
case.window_frame,
input_schema.as_ref(),
false,
false,
)?;
let window_exec = if window_expr.uses_bounded_memory() {
Arc::new(BoundedWindowAggExec::try_new(
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ pub fn bounded_window_exec_with_partition(
Arc::new(WindowFrame::new(Some(false))),
schema.as_ref(),
false,
false,
)
.unwrap();

Expand Down
27 changes: 24 additions & 3 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,8 @@ pub struct WindowFunctionParams {
pub window_frame: WindowFrame,
/// Specifies how NULL value is treated: ignore or respect
pub null_treatment: Option<NullTreatment>,
/// Distinct flag
pub distinct: bool,
}

impl WindowFunction {
Expand All @@ -1145,6 +1147,7 @@ impl WindowFunction {
order_by: Vec::default(),
window_frame: WindowFrame::new(None),
null_treatment: None,
distinct: false,
},
}
}
Expand Down Expand Up @@ -2291,6 +2294,7 @@ impl NormalizeEq for Expr {
partition_by: self_partition_by,
order_by: self_order_by,
null_treatment: self_null_treatment,
distinct: self_distinct,
},
} = left.as_ref();
let WindowFunction {
Expand All @@ -2302,6 +2306,7 @@ impl NormalizeEq for Expr {
partition_by: other_partition_by,
order_by: other_order_by,
null_treatment: other_null_treatment,
distinct: other_distinct,
},
} = other.as_ref();

Expand All @@ -2325,6 +2330,7 @@ impl NormalizeEq for Expr {
&& a.nulls_first == b.nulls_first
&& a.expr.normalize_eq(&b.expr)
})
&& self_distinct == other_distinct
}
(
Expr::Exists(Exists {
Expand Down Expand Up @@ -2558,11 +2564,13 @@ impl HashNode for Expr {
order_by: _,
window_frame,
null_treatment,
distinct,
},
} = window_fun.as_ref();
fun.hash(state);
window_frame.hash(state);
null_treatment.hash(state);
distinct.hash(state);
}
Expr::InList(InList {
expr: _expr,
Expand Down Expand Up @@ -2865,15 +2873,27 @@ impl Display for SchemaDisplay<'_> {
order_by,
window_frame,
null_treatment,
distinct,
} = params;

// Write function name and open parenthesis
write!(f, "{fun}(")?;

// If DISTINCT, emit the keyword
if *distinct {
write!(f, "DISTINCT ")?;
}

// Write the comma‑separated argument list
write!(
f,
"{}({})",
fun,
"{}",
schema_name_from_exprs_comma_separated_without_space(args)?
)?;

// **Close the argument parenthesis**
write!(f, ")")?;

if let Some(null_treatment) = null_treatment {
write!(f, " {null_treatment}")?;
}
Expand Down Expand Up @@ -3260,9 +3280,10 @@ impl Display for Expr {
order_by,
window_frame,
null_treatment,
distinct,
} = params;

fmt_function(f, &fun.to_string(), false, args, true)?;
fmt_function(f, &fun.to_string(), *distinct, args, true)?;

if let Some(nt) = null_treatment {
write!(f, "{nt}")?;
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,7 @@ impl ExprFuncBuilder {
window_frame: window_frame
.unwrap_or_else(|| WindowFrame::new(has_order_by)),
null_treatment,
distinct,
},
})
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ pub struct RawWindowExpr {
pub order_by: Vec<SortExpr>,
pub window_frame: WindowFrame,
pub null_treatment: Option<NullTreatment>,
pub distinct: bool,
}

/// Result of planning a raw expr with [`ExprPlanner`]
Expand Down
12 changes: 12 additions & 0 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,22 @@ impl TreeNode for Expr {
order_by,
window_frame,
null_treatment,
distinct,
},
} = *window_fun;
(args, partition_by, order_by).map_elements(f)?.update_data(
|(new_args, new_partition_by, new_order_by)| {
if distinct {
return Expr::from(WindowFunction::new(fun, new_args))
.partition_by(new_partition_by)
.order_by(new_order_by)
.window_frame(window_frame)
.null_treatment(null_treatment)
.distinct()
.build()
.unwrap();
}

Expr::from(WindowFunction::new(fun, new_args))
.partition_by(new_partition_by)
.order_by(new_order_by)
Expand Down
42 changes: 31 additions & 11 deletions datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,14 +554,25 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
order_by,
window_frame,
null_treatment,
distinct,
} = params;

let mut schema_name = String::new();
schema_name.write_fmt(format_args!(
"{}({})",
self.name(),
schema_name_from_exprs(args)?
))?;

// Inject DISTINCT into the schema name when requested
if *distinct {
schema_name.write_fmt(format_args!(
"{}(DISTINCT {})",
self.name(),
schema_name_from_exprs(args)?
))?;
} else {
schema_name.write_fmt(format_args!(
"{}({})",
self.name(),
schema_name_from_exprs(args)?
))?;
}

if let Some(null_treatment) = null_treatment {
schema_name.write_fmt(format_args!(" {null_treatment}"))?;
Expand All @@ -579,7 +590,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
" ORDER BY [{}]",
schema_name_from_sorts(order_by)?
))?;
};
}

schema_name.write_fmt(format_args!(" {window_frame}"))?;

Expand Down Expand Up @@ -648,15 +659,24 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
order_by,
window_frame,
null_treatment,
distinct,
} = params;

let mut display_name = String::new();

display_name.write_fmt(format_args!(
"{}({})",
self.name(),
expr_vec_fmt!(args)
))?;
if *distinct {
display_name.write_fmt(format_args!(
"{}(DISTINCT {})",
self.name(),
expr_vec_fmt!(args)
))?;
} else {
display_name.write_fmt(format_args!(
"{}({})",
self.name(),
expr_vec_fmt!(args)
))?;
}

if let Some(null_treatment) = null_treatment {
display_name.write_fmt(format_args!(" {null_treatment}"))?;
Expand Down
Loading