Skip to content

Commit

Permalink
Allow DISTINCT with ORDER BY and an aliased select list (#5307)
Browse files Browse the repository at this point in the history
* Allow DISTINCT with ORDER BY and an aliased select list

* fix: update tests

* Update datafusion/core/tests/sqllogictests/test_files/order.slt

Co-authored-by: Stuart Carnie <stuart.carnie@gmail.com>

* Update datafusion/expr/src/logical_plan/builder.rs

Co-authored-by: Stuart Carnie <stuart.carnie@gmail.com>

* update test

---------

Co-authored-by: Stuart Carnie <stuart.carnie@gmail.com>
  • Loading branch information
alamb and stuartcarnie authored Feb 22, 2023
1 parent a853123 commit 554852e
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 69 deletions.
33 changes: 28 additions & 5 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,15 +1097,22 @@ mod tests {
.unwrap()
.distinct()
.unwrap()
.sort(vec![col("c2").sort(true, true)])
.sort(vec![col("c1").sort(true, true)])
.unwrap();

let df_results = plan.clone().collect().await?;

#[rustfmt::skip]
assert_batches_sorted_eq!(
vec![
"+----+", "| c1 |", "+----+", "| a |", "| a |", "| a |", "| a |",
"| a |", "| b |", "| b |", "| b |", "| b |", "| b |", "| c |",
"| c |", "| c |", "| c |", "| c |", "| d |", "| d |", "| d |",
"| d |", "| d |", "| e |", "| e |", "| e |", "| e |", "| e |",
"+----+",
"| c1 |",
"+----+",
"| a |",
"| b |",
"| c |",
"| d |",
"| e |",
"+----+",
],
&df_results
Expand All @@ -1114,6 +1121,22 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_distinct_sort_by_unprojected() -> Result<()> {
let t = test_table().await?;
let err = t
.select(vec![col("c1")])
.unwrap()
.distinct()
.unwrap()
// try to sort on some value not present in input to distinct
.sort(vec![col("c2").sort(true, true)])
.unwrap_err();
assert_eq!(err.to_string(), "Error during planning: For SELECT DISTINCT, ORDER BY expressions c2 must appear in select list");

Ok(())
}

#[tokio::test]
async fn join() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
Expand Down
38 changes: 35 additions & 3 deletions datafusion/core/tests/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async fn sort_on_unprojected_columns() -> Result<()> {
}

#[tokio::test]
async fn sort_on_distinct_unprojected_columns() -> Result<()> {
async fn sort_on_distinct_columns() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Expand All @@ -138,7 +138,7 @@ async fn sort_on_distinct_unprojected_columns() -> Result<()> {
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from_slice([1, 10, 10, 100])),
Arc::new(Int32Array::from_slice([2, 12, 12, 120])),
Arc::new(Int32Array::from_slice([2, 3, 4, 5])),
],
)
.unwrap();
Expand All @@ -153,7 +153,7 @@ async fn sort_on_distinct_unprojected_columns() -> Result<()> {
.unwrap()
.distinct()
.unwrap()
.sort(vec![Expr::Sort(Sort::new(Box::new(col("b")), false, true))])
.sort(vec![Expr::Sort(Sort::new(Box::new(col("a")), false, true))])
.unwrap();
let results = df.collect().await.unwrap();

Expand All @@ -170,6 +170,38 @@ async fn sort_on_distinct_unprojected_columns() -> Result<()> {
assert_batches_eq!(expected, &results);
Ok(())
}
#[tokio::test]
async fn sort_on_distinct_unprojected_columns() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);

let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from_slice([1, 10, 10, 100])),
Arc::new(Int32Array::from_slice([2, 3, 4, 5])),
],
)
.unwrap();

// Cannot sort on a column after distinct that would add a new column
let ctx = SessionContext::new();
ctx.register_batch("t", batch).unwrap();
let err = ctx
.table("t")
.await
.unwrap()
.select(vec![col("a")])
.unwrap()
.distinct()
.unwrap()
.sort(vec![Expr::Sort(Sort::new(Box::new(col("b")), false, true))])
.unwrap_err();
assert_eq!(err.to_string(), "Error during planning: For SELECT DISTINCT, ORDER BY expressions b must appear in select list");
Ok(())
}

#[tokio::test]
async fn filter_with_alias_overwrite() -> Result<()> {
Expand Down
35 changes: 35 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,41 @@ ORDER BY time;
statement error DataFusion error: This feature is not implemented: SORT BY
select * from t SORT BY time;


# distinct on a column not in the select list should not work
statement error For SELECT DISTINCT, ORDER BY expressions time must appear in select list
SELECT DISTINCT value FROM t ORDER BY time;

# distinct on an expression of a column not in the select list should not work
statement error For SELECT DISTINCT, ORDER BY expressions time must appear in select list
SELECT DISTINCT date_trunc('hour', time) FROM t ORDER BY time;

# distinct on a column that is in the select list but aliasted should work
query P
SELECT DISTINCT time as "first_seen" FROM t ORDER BY "first_seen";
----
2022-01-01T00:00:30
2022-01-01T01:00:10
2022-01-02T00:00:20

# distinct on a column that is in the select list, but aliased (though
# the reference is to original expr) should work
query P
SELECT DISTINCT time as "first_seen" FROM t ORDER BY time;
----
2022-01-01T00:00:30
2022-01-01T01:00:10
2022-01-02T00:00:20

# distinct on a column that is in the select list, but aliased (though
# the reference is its ordinal position) should work
query P
SELECT DISTINCT time as "first_seen" FROM t ORDER BY 1;
----
2022-01-01T00:00:30
2022-01-01T01:00:10
2022-01-02T00:00:20

## Cleanup
statement ok
drop table t;
91 changes: 79 additions & 12 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,37 @@ impl LogicalPlanBuilder {
}

/// Add missing sort columns to all downstream projection
///
/// Thus, if you have a LogialPlan that selects A and B and have
/// not requested a sort by C, this code will add C recursively to
/// all input projections.
///
/// Adding a new column is not correct if there is a `Distinct`
/// node, which produces only distinct values of its
/// inputs. Adding a new column to its input will result in
/// potententially different results than with the original column.
///
/// For example, if the input is like:
///
/// Distinct(A, B)
///
/// If the input looks like
///
/// a | b | c
/// --+---+---
/// 1 | 2 | 3
/// 1 | 2 | 4
///
/// Distinct (A, B) --> (1,2)
///
/// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
/// (which will appear as a (1, 2), (1, 2) if a and b are projected
///
/// See <https://github.com/apache/arrow-datafusion/issues/5065> for more details
fn add_missing_columns(
curr_plan: LogicalPlan,
missing_cols: &[Column],
is_distinct: bool,
) -> Result<LogicalPlan> {
match curr_plan {
LogicalPlan::Projection(Projection {
Expand All @@ -362,15 +390,24 @@ impl LogicalPlanBuilder {
// missing_cols may be already present but without the new
// projected alias.
missing_exprs.retain(|e| !expr.contains(e));
if is_distinct {
Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
}
expr.extend(missing_exprs);
Ok(project((*input).clone(), expr)?)
}
_ => {
let is_distinct =
is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
let new_inputs = curr_plan
.inputs()
.into_iter()
.map(|input_plan| {
Self::add_missing_columns((*input_plan).clone(), missing_cols)
Self::add_missing_columns(
(*input_plan).clone(),
missing_cols,
is_distinct,
)
})
.collect::<Result<Vec<_>>>()?;

Expand All @@ -380,6 +417,45 @@ impl LogicalPlanBuilder {
}
}

fn ambiguous_distinct_check(
missing_exprs: &[Expr],
missing_cols: &[Column],
projection_exprs: &[Expr],
) -> Result<()> {
if missing_exprs.is_empty() {
return Ok(());
}

// if the missing columns are all only aliases for things in
// the existing select list, it is ok
//
// This handles the special case for
// SELECT col as <alias> ORDER BY <alias>
//
// As described in https://github.com/apache/arrow-datafusion/issues/5293
let all_aliases = missing_exprs.iter().all(|e| {
projection_exprs.iter().any(|proj_expr| {
if let Expr::Alias(expr, _) = proj_expr {
e == expr.as_ref()
} else {
false
}
})
});
if all_aliases {
return Ok(());
}

let missing_col_names = missing_cols
.iter()
.map(|col| col.flat_name())
.collect::<String>();

return Err(DataFusionError::Plan(format!(
"For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list",
)));
}

/// Apply a sort
pub fn sort(
self,
Expand All @@ -406,16 +482,6 @@ impl LogicalPlanBuilder {
Ok(())
})?;

self.create_sort_plan(exprs, missing_cols)
}

pub fn create_sort_plan(
self,
exprs: impl IntoIterator<Item = impl Into<Expr>> + Clone,
missing_cols: Vec<Column>,
) -> Result<Self> {
let schema = self.plan.schema();

if missing_cols.is_empty() {
return Ok(Self::from(LogicalPlan::Sort(Sort {
expr: normalize_cols(exprs, &self.plan)?,
Expand All @@ -431,7 +497,8 @@ impl LogicalPlanBuilder {
.map(|f| Expr::Column(f.qualified_column()))
.collect();

let plan = Self::add_missing_columns(self.plan, &missing_cols)?;
let is_distinct = false;
let plan = Self::add_missing_columns(self.plan, &missing_cols, is_distinct)?;
let sort_plan = LogicalPlan::Sort(Sort {
expr: normalize_cols(exprs, &plan)?,
input: Arc::new(plan),
Expand Down
53 changes: 4 additions & 49 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::normalize_ident;
use datafusion_common::{Column, DFSchema, DataFusionError, Result, ScalarValue};
use datafusion_expr::expr_rewriter::rewrite_sort_cols_by_aggs;
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, Repartition};
use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue};
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use sqlparser::ast::{Expr as SQLExpr, Offset as SQLOffset, OrderByExpr, Query};

use sqlparser::parser::ParserError::ParserError;
Expand Down Expand Up @@ -151,55 +150,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
return Ok(plan);
}

let mut order_by_rex = order_by
let order_by_rex = order_by
.into_iter()
.map(|e| self.order_by_to_sort_expr(e, plan.schema()))
.collect::<Result<Vec<_>>>()?;

order_by_rex = rewrite_sort_cols_by_aggs(order_by_rex, &plan)?;
let schema = plan.schema();

// if current plan is distinct or current plan is repartition and its child plan is distinct,
// then this plan is a select distinct plan
let is_select_distinct = match plan {
LogicalPlan::Distinct(_) => true,
LogicalPlan::Repartition(Repartition { ref input, .. }) => {
matches!(input.as_ref(), &LogicalPlan::Distinct(_))
}
_ => false,
};

let mut missing_cols: Vec<Column> = vec![];
// Collect sort columns that are missing in the input plan's schema
order_by_rex
.clone()
.into_iter()
.try_for_each::<_, Result<()>>(|expr| {
let columns = expr.to_columns()?;

columns.into_iter().for_each(|c| {
if schema.field_from_column(&c).is_err() {
missing_cols.push(c);
}
});

Ok(())
})?;

// for select distinct, order by expressions must exist in select list
if is_select_distinct && !missing_cols.is_empty() {
let missing_col_names = missing_cols
.iter()
.map(|col| col.flat_name())
.collect::<String>();
let error_msg = format!(
"For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list",
);
return Err(DataFusionError::Plan(error_msg));
}

LogicalPlanBuilder::from(plan)
.create_sort_plan(order_by_rex, missing_cols)?
.build()
LogicalPlanBuilder::from(plan).sort(order_by_rex)?.build()
}
}

0 comments on commit 554852e

Please sign in to comment.