Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update delta-rs to fork and datafusion to 30 #1668

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 70 additions & 84 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ edition = "2021"
lto = "thin"

[workspace.dependencies]
datafusion = { version = "28.0", features = ["avro"] }
datafusion-proto = { version = "28.0" }
datafusion = { version = "30.0", features = ["avro"] }
datafusion-proto = { version = "30.0" }
object_store = { version = "0.6.1" }
tokio = { version = "1", features = ["full"] }
url = "2.4.0"
Expand All @@ -21,6 +21,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.104"

[workspace.dependencies.deltalake]
git = "https://github.com/delta-io/delta-rs.git"
branch = "main"
git = "https://github.com/glaredb/delta-rs.git"
branch = "fork"
features = ["s3", "gcs", "azure", "datafusion", "arrow", "parquet"]
3 changes: 2 additions & 1 deletion crates/datafusion_ext/src/local_hint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ impl TableProvider for LocalTableHint {
&self,
state: &SessionState,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = self.0.insert_into(state, input).await?;
let plan = self.0.insert_into(state, input, overwrite).await?;
Ok(plan)
}
}
32 changes: 11 additions & 21 deletions crates/datafusion_ext/src/planner/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
// under the License.

use crate::planner::{AsyncContextProvider, SqlQueryPlanner};
use datafusion::common::{
Column, DFField, DFSchema, DataFusionError, Result, ScalarValue, TableReference,
};
use datafusion::logical_expr::{Case, Expr, GetIndexedField};
use datafusion::common::{Column, DFField, DFSchema, DataFusionError, Result, TableReference};
use datafusion::logical_expr::{Case, Expr};
use datafusion::physical_plan::internal_err;
use datafusion::sql::planner::PlannerContext;
use datafusion::sql::sqlparser::ast::{Expr as SQLExpr, Ident};

Expand Down Expand Up @@ -91,9 +90,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
planner_context: &mut PlannerContext,
) -> Result<Expr> {
if ids.len() < 2 {
return Err(DataFusionError::Internal(format!(
"Not a compound identifier: {ids:?}"
)));
return internal_err!("Not a compound identifier: {ids:?}");
}

if ids[0].value.starts_with('@') {
Expand Down Expand Up @@ -121,9 +118,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
// Though ideally once that support is in place, this code should work with it
// TODO: remove when can support multiple nested identifiers
if ids.len() > 5 {
return Err(DataFusionError::Internal(format!(
"Unsupported compound identifier: {ids:?}"
)));
return internal_err!("Unsupported compound identifier: {ids:?}");
}

let search_result = search_dfschema(&ids, schema);
Expand All @@ -132,26 +127,21 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
Some((field, nested_names)) if !nested_names.is_empty() => {
// TODO: remove when can support multiple nested identifiers
if nested_names.len() > 1 {
return Err(DataFusionError::Internal(format!(
return internal_err!(
"Nested identifiers not yet supported for column {}",
field.qualified_column().quoted_flat_name()
)));
);
}
let nested_name = nested_names[0].to_string();
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(Expr::Column(field.qualified_column())),
ScalarValue::Utf8(Some(nested_name)),
)))
Ok(Expr::Column(field.qualified_column()).field(nested_name))
}
// found matching field with no spare identifier(s)
Some((field, _nested_names)) => Ok(Expr::Column(field.qualified_column())),
None => {
// return default where use all identifiers to not have a nested field
// this len check is because at 5 identifiers will have to have a nested field
if ids.len() == 5 {
Err(DataFusionError::Internal(format!(
"Unsupported compound identifier: {ids:?}"
)))
internal_err!("Unsupported compound identifier: {ids:?}")
} else {
// check the outer_query_schema and try to find a match
if let Some(outer) = planner_context.outer_query_schema() {
Expand All @@ -160,10 +150,10 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
// found matching field with spare identifier(s) for nested field(s) in structure
Some((field, nested_names)) if !nested_names.is_empty() => {
// TODO: remove when can support nested identifiers for OuterReferenceColumn
Err(DataFusionError::Internal(format!(
internal_err!(
"Nested identifiers are not yet supported for OuterReferenceColumn {}",
field.qualified_column().quoted_flat_name()
)))
)
}
// found matching field with no spare identifier(s)
Some((field, _nested_names)) => {
Expand Down
124 changes: 77 additions & 47 deletions crates/datafusion_ext/src/planner/expr/mod.rs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file were ported from datafusion

Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::expr::{InList, Placeholder};
use datafusion::logical_expr::{
col, expr, lit, AggregateFunction, Between, BinaryExpr, BuiltinScalarFunction, Cast, Expr,
ExprSchemable, GetIndexedField, Like, Operator, TryCast,
ExprSchemable, GetFieldAccess, GetIndexedField, Like, Operator, TryCast,
};
use datafusion::sql::planner::PlannerContext;
use datafusion::sql::sqlparser::ast::{ArrayAgg, Expr as SQLExpr, Interval, TrimWhereField, Value};
use datafusion::sql::sqlparser::ast::{
ArrayAgg, Expr as SQLExpr, Interval, JsonOperator, TrimWhereField, Value,
};
use datafusion::sql::sqlparser::parser::ParserError::ParserError;

impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
Expand Down Expand Up @@ -191,7 +193,13 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {

SQLExpr::MapAccess { column, keys } => {
if let SQLExpr::Identifier(id) = *column {
plan_indexed(col(self.normalizer.normalize(id)), keys)
self.plan_indexed(
col(self.normalizer.normalize(id)),
keys,
schema,
planner_context,
)
.await
} else {
Err(DataFusionError::NotImplemented(format!(
"map access requires an identifier, found column {column} instead"
Expand All @@ -203,7 +211,8 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
let expr = self
.sql_expr_to_logical_expr(*obj, schema, planner_context)
.await?;
plan_indexed(expr, indexes)
self.plan_indexed(expr, indexes, schema, planner_context)
.await
}

SQLExpr::CompoundIdentifier(ids) => {
Expand Down Expand Up @@ -405,7 +414,6 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
"binary_op should be handled by sql_expr_to_logical_expr.".to_string(),
)),

#[cfg(feature = "unicode_expressions")]
SQLExpr::Substring {
expr,
substring_from,
Expand All @@ -421,12 +429,6 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
.await
}

#[cfg(not(feature = "unicode_expressions"))]
SQLExpr::Substring { .. } => Err(DataFusionError::Internal(
"statement substring requires compilation with feature flag: unicode_expressions."
.to_string(),
)),

SQLExpr::Trim {
expr,
trim_where,
Expand Down Expand Up @@ -711,6 +713,70 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
)),
}
}

async fn plan_indices(
&mut self,
expr: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<GetFieldAccess> {
let field = match expr.clone() {
SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
GetFieldAccess::NamedStructField {
name: ScalarValue::Utf8(Some(s)),
}
}
SQLExpr::JsonAccess {
left,
operator: JsonOperator::Colon,
right,
} => {
let start = Box::new(
self.sql_expr_to_logical_expr(*left, schema, planner_context)
.await?,
);
let stop = Box::new(
self.sql_expr_to_logical_expr(*right, schema, planner_context)
.await?,
);

GetFieldAccess::ListRange { start, stop }
}
_ => GetFieldAccess::ListIndex {
key: Box::new(
self.sql_expr_to_logical_expr(expr, schema, planner_context)
.await?,
),
},
};

Ok(field)
}

#[async_recursion]
async fn plan_indexed(
&mut self,
expr: Expr,
mut keys: Vec<SQLExpr>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let indices = keys.pop().ok_or_else(|| {
ParserError("Internal error: Missing index key expression".to_string())
})?;

let expr = if !keys.is_empty() {
self.plan_indexed(expr, keys, schema, planner_context)
.await?
} else {
expr
};

Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(expr),
self.plan_indices(indices, schema, planner_context).await?,
)))
}
}

// modifies expr if it is a placeholder with datatype of right
Expand Down Expand Up @@ -746,42 +812,6 @@ fn infer_placeholder_types(expr: Expr, schema: &DFSchema) -> Result<Expr> {
})
}

fn plan_key(key: SQLExpr) -> Result<ScalarValue> {
let scalar = match key {
SQLExpr::Value(Value::Number(s, _)) => ScalarValue::Int64(Some(
s.parse()
.map_err(|_| ParserError(format!("Cannot parse {s} as i64.")))?,
)),
SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
ScalarValue::Utf8(Some(s))
}
_ => {
return Err(DataFusionError::SQL(ParserError(format!(
"Unsuported index key expression: {key:?}"
))));
}
};

Ok(scalar)
}

fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
let key = keys
.pop()
.ok_or_else(|| ParserError("Internal error: Missing index key expression".to_string()))?;

let expr = if !keys.is_empty() {
plan_indexed(expr, keys)?
} else {
expr
};

Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(expr),
plan_key(key)?,
)))
}

// #[cfg(test)]
// mod tests {
// use super::*;
Expand Down
10 changes: 10 additions & 0 deletions crates/datafusion_ext/src/planner/set_expr.rs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes ported from datafusion

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
let all = match set_quantifier {
SetQuantifier::All => true,
SetQuantifier::Distinct | SetQuantifier::None => false,
SetQuantifier::ByName => {
return Err(DataFusionError::NotImplemented(
"UNION BY NAME not implemented".to_string(),
));
}
SetQuantifier::AllByName => {
return Err(DataFusionError::NotImplemented(
"UNION ALL BY NAME not implemented".to_string(),
))
}
};

let left_plan = self.set_expr_to_plan(*left, planner_context).await?;
Expand Down
Loading