Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move wildcard expansions to the analyzer #11681

Merged
merged 41 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
cee3a11
allow qualified wildcard in the logical plan
goldmedal Jul 27, 2024
98ef466
move wildcard expansions to the analyzer
goldmedal Jul 27, 2024
3b1c39a
fix fmt
goldmedal Jul 27, 2024
f3b642a
fix the view tests
goldmedal Jul 28, 2024
01382f1
expand wildcard for schema
goldmedal Aug 5, 2024
63ce513
fix for union query
goldmedal Aug 5, 2024
59cf620
cargo fmt clippy
goldmedal Aug 5, 2024
8775862
move wildcard expanding tests to expand_wildcard_rule.rs
goldmedal Aug 5, 2024
8cc5553
coercion the expanded wildcard expression in union
goldmedal Aug 5, 2024
718b720
remove debug message
goldmedal Aug 6, 2024
623777c
move wildcard options to logical plan
goldmedal Aug 7, 2024
50d2e3e
remove unused function
goldmedal Aug 7, 2024
9410c1b
add the doc for expression function
goldmedal Aug 7, 2024
0b2bba9
fix cargo check
goldmedal Aug 7, 2024
36a2e1c
fix cargo fmt
goldmedal Aug 7, 2024
dd487e5
fix test
goldmedal Aug 7, 2024
f01c0dd
extract expand_exprlist
goldmedal Aug 10, 2024
4aa8e64
expand wildcard for functional_dependencies
goldmedal Aug 10, 2024
89c1b3d
refine the doc
goldmedal Aug 10, 2024
20479ec
fix tests
goldmedal Aug 10, 2024
15dddf6
fix expand exclude and except
goldmedal Aug 10, 2024
88632b2
Merge branch 'main' into feature/11639-add-expand-rule
goldmedal Aug 10, 2024
38dc403
remove unused import
goldmedal Aug 10, 2024
d95ad74
fix check and update function
goldmedal Aug 10, 2024
d2bdb7f
fix check
goldmedal Aug 10, 2024
c5d304e
throw the error when exprlist to field
goldmedal Aug 10, 2024
f95894a
fix functional_dependency and exclude
goldmedal Aug 11, 2024
0bd43a5
fix projection_schema
goldmedal Aug 11, 2024
46922f5
fix the window functions
goldmedal Aug 11, 2024
55f13e3
fix clippy and support unparsing wildcard
goldmedal Aug 11, 2024
7272349
fix clippy and fmt
goldmedal Aug 11, 2024
3b55265
add the doc for util functions
goldmedal Aug 11, 2024
03b9b19
fix unique expression check for projection
goldmedal Aug 12, 2024
c3503e7
Merge branch 'main' into feature/11639-add-expand-rule
goldmedal Aug 12, 2024
d21063a
cargo fmt
goldmedal Aug 12, 2024
e461492
move test and solve dependency issue
goldmedal Aug 12, 2024
28278d1
Merge branch 'main' into feature/11639-add-expand-rule
goldmedal Aug 13, 2024
5e24932
address review comments
goldmedal Aug 13, 2024
74cc393
add the fail reason
goldmedal Aug 13, 2024
2324fa3
enhance the doc
goldmedal Aug 13, 2024
81d66f4
add more doc
goldmedal Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@

use std::{any::Any, sync::Arc};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::Column;
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};

use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan},
physical_plan::ExecutionPlan,
};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Column;
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use datafusion_optimizer::Analyzer;

use crate::datasource::{TableProvider, TableType};

Expand All @@ -50,6 +52,7 @@ impl ViewTable {
logical_plan: LogicalPlan,
definition: Option<String>,
) -> Result<Self> {
let logical_plan = Self::apply_required_rule(logical_plan)?;
let table_schema = logical_plan.schema().as_ref().to_owned().into();

let view = Self {
Expand All @@ -61,6 +64,15 @@ impl ViewTable {
Ok(view)
}

fn apply_required_rule(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let options = ConfigOptions::default();
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]).execute_and_check(
logical_plan,
&options,
|_, _| {},
)
}
Comment on lines +67 to +74
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To generate the correct schema, we need to apply ExpandWildcardRule for the plan of the view first


/// Get definition ref
pub fn definition(&self) -> Option<&String> {
self.definition.as_ref()
Expand Down Expand Up @@ -232,6 +244,26 @@ mod tests {

assert_batches_eq!(expected, &results);

let view_sql =
"CREATE VIEW replace_xyz AS SELECT * REPLACE (column1*2 as column1) FROM xyz";
session_ctx.sql(view_sql).await?.collect().await?;

let results = session_ctx
.sql("SELECT * FROM replace_xyz")
.await?
.collect()
.await?;

let expected = [
"+---------+---------+---------+",
"| column1 | column2 | column3 |",
"+---------+---------+---------+",
"| 2 | 2 | 3 |",
"| 8 | 5 | 6 |",
"+---------+---------+---------+",
];

assert_batches_eq!(expected, &results);
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ impl SessionContext {
}
(_, Err(_)) => {
let table = Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(name, table)?;
self.return_empty_dataframe()
}
Expand Down
221 changes: 211 additions & 10 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use datafusion_common::{
internal_err, not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue,
TableReference,
};
use sqlparser::ast::NullTreatment;
use sqlparser::ast::{
display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem,
NullTreatment, RenameSelectItem, ReplaceSelectElement,
};

/// Represents logical expressions such as `A + 1`, or `CAST(c1 AS int)`.
///
Expand Down Expand Up @@ -315,7 +318,10 @@ pub enum Expr {
///
/// This expr has to be resolved to a list of columns before translating logical
/// plan into physical plan.
Wildcard { qualifier: Option<TableReference> },
Wildcard {
qualifier: Option<TableReference>,
options: WildcardOptions,
},
/// List of grouping set expressions. Only valid in the context of an aggregate
/// GROUP BY expression list
GroupingSet(GroupingSet),
Expand Down Expand Up @@ -970,6 +976,89 @@ impl GroupingSet {
}
}

/// Additional options for wildcards, e.g. Snowflake `EXCLUDE`/`RENAME` and Bigquery `EXCEPT`.
#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
pub struct WildcardOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a follow on PR, can we add some doc comments to this struct explaining why this structure is needed and how to interpret it?

It seems like the core rationale is that wildcards have different semantics depending on what part of the query they appear in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added descriptions for the purpose and source of each option.

/// `[ILIKE...]`.
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
pub ilike: Option<IlikeSelectItem>,
/// `[EXCLUDE...]`.
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
pub exclude: Option<ExcludeSelectItem>,
/// `[EXCEPT...]`.
/// BigQuery syntax: <https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_except>
/// Clickhouse syntax: <https://clickhouse.com/docs/en/sql-reference/statements/select#except>
pub except: Option<ExceptSelectItem>,
/// `[REPLACE]`
/// BigQuery syntax: <https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#select_replace>
/// Clickhouse syntax: <https://clickhouse.com/docs/en/sql-reference/statements/select#replace>
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
pub replace: Option<PlannedReplaceSelectItem>,
/// `[RENAME ...]`.
/// Snowflake syntax: <https://docs.snowflake.com/en/sql-reference/sql/select#parameters>
pub rename: Option<RenameSelectItem>,
}

impl WildcardOptions {
pub fn with_replace(self, replace: PlannedReplaceSelectItem) -> Self {
WildcardOptions {
ilike: self.ilike,
exclude: self.exclude,
except: self.except,
replace: Some(replace),
rename: self.rename,
}
}
}

impl Display for WildcardOptions {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
if let Some(ilike) = &self.ilike {
write!(f, " {ilike}")?;
}
if let Some(exclude) = &self.exclude {
write!(f, " {exclude}")?;
}
if let Some(except) = &self.except {
write!(f, " {except}")?;
}
if let Some(replace) = &self.replace {
write!(f, " {replace}")?;
}
if let Some(rename) = &self.rename {
write!(f, " {rename}")?;
}
Ok(())
}
}

/// The planned expressions for `REPLACE`
#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
pub struct PlannedReplaceSelectItem {
/// The original ast nodes
pub items: Vec<ReplaceSelectElement>,
/// The expression planned from the ast nodes. They will be used when expanding the wildcard.
pub planned_expressions: Vec<Expr>,
}

impl Display for PlannedReplaceSelectItem {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "REPLACE")?;
write!(f, " ({})", display_comma_separated(&self.items))?;
Ok(())
}
}

impl PlannedReplaceSelectItem {
pub fn items(&self) -> &[ReplaceSelectElement] {
&self.items
}

pub fn expressions(&self) -> &[Expr] {
&self.planned_expressions
}
}

/// Fixed seed for the hashing so that Ords are consistent across runs
const SEED: ahash::RandomState = ahash::RandomState::with_seeds(0, 0, 0, 0);

Expand Down Expand Up @@ -1720,8 +1809,9 @@ impl Expr {
Expr::ScalarSubquery(subquery) => {
subquery.hash(hasher);
}
Expr::Wildcard { qualifier } => {
Expr::Wildcard { qualifier, options } => {
qualifier.hash(hasher);
options.hash(hasher);
}
Expr::GroupingSet(grouping_set) => {
mem::discriminant(grouping_set).hash(hasher);
Expand Down Expand Up @@ -2242,9 +2332,9 @@ impl fmt::Display for Expr {
write!(f, "{expr} IN ([{}])", expr_vec_fmt!(list))
}
}
Expr::Wildcard { qualifier } => match qualifier {
Some(qualifier) => write!(f, "{qualifier}.*"),
None => write!(f, "*"),
Expr::Wildcard { qualifier, options } => match qualifier {
Some(qualifier) => write!(f, "{qualifier}.*{options}"),
None => write!(f, "*{options}"),
},
Expr::GroupingSet(grouping_sets) => match grouping_sets {
GroupingSet::Rollup(exprs) => {
Expand Down Expand Up @@ -2543,9 +2633,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard { .. } => {
internal_err!("Create physical name does not support wildcard")
}
Expr::Wildcard { qualifier, options } => match qualifier {
Some(qualifier) => Ok(format!("{}.*{}", qualifier, options)),
None => Ok(format!("*{}", options)),
},
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
}
Expand All @@ -2558,7 +2649,12 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
#[cfg(test)]
mod test {
use crate::expr_fn::col;
use crate::{case, lit, ColumnarValue, ScalarUDF, ScalarUDFImpl, Volatility};
use crate::{
case, lit, qualified_wildcard, wildcard, wildcard_with_options, ColumnarValue,
ScalarUDF, ScalarUDFImpl, Volatility,
};
use sqlparser::ast;
use sqlparser::ast::{Ident, IdentWithAlias};
use std::any::Any;

#[test]
Expand Down Expand Up @@ -2859,4 +2955,109 @@ mod test {
);
assert_eq!(find_df_window_func("not_exist"), None)
}

#[test]
fn test_display_wildcard() {
assert_eq!(format!("{}", wildcard()), "*");
assert_eq!(format!("{}", qualified_wildcard("t1")), "t1.*");
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
Some(IlikeSelectItem {
pattern: "c1".to_string()
}),
None,
None,
None,
None
))
),
"* ILIKE 'c1'"
);
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
None,
Some(ExcludeSelectItem::Multiple(vec![
Ident::from("c1"),
Ident::from("c2")
])),
None,
None,
None
))
),
"* EXCLUDE (c1, c2)"
);
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
None,
None,
Some(ExceptSelectItem {
first_element: Ident::from("c1"),
additional_elements: vec![Ident::from("c2")]
}),
None,
None
))
),
"* EXCEPT (c1, c2)"
);
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
None,
None,
None,
Some(PlannedReplaceSelectItem {
items: vec![ReplaceSelectElement {
expr: ast::Expr::Identifier(Ident::from("c1")),
column_name: Ident::from("a1"),
as_keyword: false
}],
planned_expressions: vec![]
}),
None
))
),
"* REPLACE (c1 a1)"
);
assert_eq!(
format!(
"{}",
wildcard_with_options(wildcard_options(
None,
None,
None,
None,
Some(RenameSelectItem::Multiple(vec![IdentWithAlias {
ident: Ident::from("c1"),
alias: Ident::from("a1")
}]))
))
),
"* RENAME (c1 AS a1)"
)
}

fn wildcard_options(
opt_ilike: Option<IlikeSelectItem>,
opt_exclude: Option<ExcludeSelectItem>,
opt_except: Option<ExceptSelectItem>,
opt_replace: Option<PlannedReplaceSelectItem>,
opt_rename: Option<RenameSelectItem>,
) -> WildcardOptions {
WildcardOptions {
ilike: opt_ilike,
exclude: opt_exclude,
except: opt_except,
replace: opt_replace,
rename: opt_rename,
}
}
}
Loading