Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Literal in ORDER BY window definition should not be an ordinal referring to relation column #8419

Merged
merged 12 commits into from
Dec 6, 2023
8 changes: 2 additions & 6 deletions datafusion/physical-expr/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::PhysicalExpr;
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_common::Result;
use datafusion_expr::ColumnarValue;

/// Represents Sort operation for a column in a RecordBatch
Expand Down Expand Up @@ -65,11 +65,7 @@ impl PhysicalSortExpr {
let value_to_sort = self.expr.evaluate(batch)?;
let array_to_sort = match value_to_sort {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => {
return exec_err!(
"Sort operation is not applicable to scalar value {scalar}"
);
}
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(batch.num_rows())?,
};
Ok(SortColumn {
values: array_to_sort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use datafusion_common::utils::{
evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
get_record_batch_at_indices, get_row_at_idx,
};
use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
use datafusion_expr::ColumnarValue;
Expand Down Expand Up @@ -586,7 +586,7 @@ impl LinearSearch {
.map(|item| match item.evaluate(record_batch)? {
ColumnarValue::Array(array) => Ok(array),
ColumnarValue::Scalar(scalar) => {
plan_err!("Sort operation is not applicable to scalar value {scalar}")
scalar.to_array_of_size(record_batch.num_rows())
}
})
.collect()
Expand Down
10 changes: 7 additions & 3 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
let order_by =
self.order_by_to_sort_expr(&window.order_by, schema, planner_context)?;
let order_by = self.order_by_to_sort_expr(
&window.order_by,
schema,
planner_context,
false,
viirya marked this conversation as resolved.
Show resolved Hide resolved
)?;
let window_frame = window
.window_frame
.as_ref()
Expand Down Expand Up @@ -143,7 +147,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// next, aggregate built-ins
if let Ok(fun) = AggregateFunction::from_str(&name) {
let order_by =
self.order_by_to_sort_expr(&order_by, schema, planner_context)?;
self.order_by_to_sort_expr(&order_by, schema, planner_context, true)?;
let order_by = (!order_by.is_empty()).then_some(order_by);
let args = self.function_args_to_expr(args, schema, planner_context)?;
let filter: Option<Box<Expr>> = filter
Expand Down
7 changes: 6 additions & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} = array_agg;

let order_by = if let Some(order_by) = order_by {
Some(self.order_by_to_sort_expr(&order_by, input_schema, planner_context)?)
Some(self.order_by_to_sort_expr(
&order_by,
input_schema,
planner_context,
true,
)?)
} else {
None
};
Expand Down
3 changes: 2 additions & 1 deletion datafusion/sql/src/expr/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
exprs: &[OrderByExpr],
schema: &DFSchema,
planner_context: &mut PlannerContext,
literal_to_column: bool,
alamb marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Vec<Expr>> {
let mut expr_vec = vec![];
for e in exprs {
Expand All @@ -40,7 +41,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} = e;

let expr = match expr {
SQLExpr::Value(Value::Number(v, _)) => {
SQLExpr::Value(Value::Number(v, _)) if literal_to_column => {
let field_index = v
.parse::<usize>()
.map_err(|err| plan_datafusion_err!("{}", err))?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

let order_by_rex =
self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context)?;
self.order_by_to_sort_expr(&order_by, plan.schema(), planner_context, true)?;

if let LogicalPlan::Distinct(Distinct::On(ref distinct_on)) = plan {
// In case of `DISTINCT ON` we must capture the sort expressions since during the plan
Expand Down
3 changes: 2 additions & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut all_results = vec![];
for expr in order_exprs {
// Convert each OrderByExpr to a SortExpr:
let expr_vec = self.order_by_to_sort_expr(&expr, schema, planner_context)?;
let expr_vec =
self.order_by_to_sort_expr(&expr, schema, planner_context, true)?;
// Verify that columns of all SortExprs exist in the schema:
for expr in expr_vec.iter() {
for column in expr.to_columns()?.iter() {
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3778,10 +3778,10 @@ query error DataFusion error: Arrow error: Invalid argument error: must either s
select rank() over (RANGE between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
from (select 1 a union select 2 a) q;

# TODO: this is different to Postgres which returns [1, 1] for `rnk`.
query I
select rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
from (select 1 a union select 2 a) q ORDER BY rnk
query II
select a,
rank() over (order by 1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) rnk
from (select 1 a union select 2 a) q ORDER BY a
----
1
2
1 1
2 1