Skip to content

Commit

Permalink
chore: Update delta-rs to fork and datafusion to 30 (#1668)
Browse files Browse the repository at this point in the history
Upstream pr for delta-rs: delta-io/delta-rs#1606
  • Loading branch information
scsmithr authored Aug 29, 2023
1 parent a5302c9 commit 333b3ae
Show file tree
Hide file tree
Showing 22 changed files with 228 additions and 488 deletions.
154 changes: 70 additions & 84 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ edition = "2021"
lto = "thin"

[workspace.dependencies]
datafusion = { version = "28.0", features = ["avro"] }
datafusion-proto = { version = "28.0" }
datafusion = { version = "30.0", features = ["avro"] }
datafusion-proto = { version = "30.0" }
object_store = { version = "0.6.1" }
tokio = { version = "1", features = ["full"] }
url = "2.4.0"
Expand All @@ -21,6 +21,6 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.104"

[workspace.dependencies.deltalake]
git = "https://github.com/delta-io/delta-rs.git"
branch = "main"
git = "https://github.com/glaredb/delta-rs.git"
branch = "fork"
features = ["s3", "gcs", "azure", "datafusion", "arrow", "parquet"]
3 changes: 2 additions & 1 deletion crates/datafusion_ext/src/local_hint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ impl TableProvider for LocalTableHint {
&self,
state: &SessionState,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = self.0.insert_into(state, input).await?;
let plan = self.0.insert_into(state, input, overwrite).await?;
Ok(plan)
}
}
32 changes: 11 additions & 21 deletions crates/datafusion_ext/src/planner/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
// under the License.

use crate::planner::{AsyncContextProvider, SqlQueryPlanner};
use datafusion::common::{
Column, DFField, DFSchema, DataFusionError, Result, ScalarValue, TableReference,
};
use datafusion::logical_expr::{Case, Expr, GetIndexedField};
use datafusion::common::{Column, DFField, DFSchema, DataFusionError, Result, TableReference};
use datafusion::logical_expr::{Case, Expr};
use datafusion::physical_plan::internal_err;
use datafusion::sql::planner::PlannerContext;
use datafusion::sql::sqlparser::ast::{Expr as SQLExpr, Ident};

Expand Down Expand Up @@ -91,9 +90,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
planner_context: &mut PlannerContext,
) -> Result<Expr> {
if ids.len() < 2 {
return Err(DataFusionError::Internal(format!(
"Not a compound identifier: {ids:?}"
)));
return internal_err!("Not a compound identifier: {ids:?}");
}

if ids[0].value.starts_with('@') {
Expand Down Expand Up @@ -121,9 +118,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
// Though ideally once that support is in place, this code should work with it
// TODO: remove when can support multiple nested identifiers
if ids.len() > 5 {
return Err(DataFusionError::Internal(format!(
"Unsupported compound identifier: {ids:?}"
)));
return internal_err!("Unsupported compound identifier: {ids:?}");
}

let search_result = search_dfschema(&ids, schema);
Expand All @@ -132,26 +127,21 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
Some((field, nested_names)) if !nested_names.is_empty() => {
// TODO: remove when can support multiple nested identifiers
if nested_names.len() > 1 {
return Err(DataFusionError::Internal(format!(
return internal_err!(
"Nested identifiers not yet supported for column {}",
field.qualified_column().quoted_flat_name()
)));
);
}
let nested_name = nested_names[0].to_string();
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(Expr::Column(field.qualified_column())),
ScalarValue::Utf8(Some(nested_name)),
)))
Ok(Expr::Column(field.qualified_column()).field(nested_name))
}
// found matching field with no spare identifier(s)
Some((field, _nested_names)) => Ok(Expr::Column(field.qualified_column())),
None => {
// return default where use all identifiers to not have a nested field
// this len check is because at 5 identifiers will have to have a nested field
if ids.len() == 5 {
Err(DataFusionError::Internal(format!(
"Unsupported compound identifier: {ids:?}"
)))
internal_err!("Unsupported compound identifier: {ids:?}")
} else {
// check the outer_query_schema and try to find a match
if let Some(outer) = planner_context.outer_query_schema() {
Expand All @@ -160,10 +150,10 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
// found matching field with spare identifier(s) for nested field(s) in structure
Some((field, nested_names)) if !nested_names.is_empty() => {
// TODO: remove when can support nested identifiers for OuterReferenceColumn
Err(DataFusionError::Internal(format!(
internal_err!(
"Nested identifiers are not yet supported for OuterReferenceColumn {}",
field.qualified_column().quoted_flat_name()
)))
)
}
// found matching field with no spare identifier(s)
Some((field, _nested_names)) => {
Expand Down
124 changes: 77 additions & 47 deletions crates/datafusion_ext/src/planner/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::expr::{InList, Placeholder};
use datafusion::logical_expr::{
col, expr, lit, AggregateFunction, Between, BinaryExpr, BuiltinScalarFunction, Cast, Expr,
ExprSchemable, GetIndexedField, Like, Operator, TryCast,
ExprSchemable, GetFieldAccess, GetIndexedField, Like, Operator, TryCast,
};
use datafusion::sql::planner::PlannerContext;
use datafusion::sql::sqlparser::ast::{ArrayAgg, Expr as SQLExpr, Interval, TrimWhereField, Value};
use datafusion::sql::sqlparser::ast::{
ArrayAgg, Expr as SQLExpr, Interval, JsonOperator, TrimWhereField, Value,
};
use datafusion::sql::sqlparser::parser::ParserError::ParserError;

impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
Expand Down Expand Up @@ -191,7 +193,13 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {

SQLExpr::MapAccess { column, keys } => {
if let SQLExpr::Identifier(id) = *column {
plan_indexed(col(self.normalizer.normalize(id)), keys)
self.plan_indexed(
col(self.normalizer.normalize(id)),
keys,
schema,
planner_context,
)
.await
} else {
Err(DataFusionError::NotImplemented(format!(
"map access requires an identifier, found column {column} instead"
Expand All @@ -203,7 +211,8 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
let expr = self
.sql_expr_to_logical_expr(*obj, schema, planner_context)
.await?;
plan_indexed(expr, indexes)
self.plan_indexed(expr, indexes, schema, planner_context)
.await
}

SQLExpr::CompoundIdentifier(ids) => {
Expand Down Expand Up @@ -405,7 +414,6 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
"binary_op should be handled by sql_expr_to_logical_expr.".to_string(),
)),

#[cfg(feature = "unicode_expressions")]
SQLExpr::Substring {
expr,
substring_from,
Expand All @@ -421,12 +429,6 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
.await
}

#[cfg(not(feature = "unicode_expressions"))]
SQLExpr::Substring { .. } => Err(DataFusionError::Internal(
"statement substring requires compilation with feature flag: unicode_expressions."
.to_string(),
)),

SQLExpr::Trim {
expr,
trim_where,
Expand Down Expand Up @@ -711,6 +713,70 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
)),
}
}

async fn plan_indices(
&mut self,
expr: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<GetFieldAccess> {
let field = match expr.clone() {
SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
GetFieldAccess::NamedStructField {
name: ScalarValue::Utf8(Some(s)),
}
}
SQLExpr::JsonAccess {
left,
operator: JsonOperator::Colon,
right,
} => {
let start = Box::new(
self.sql_expr_to_logical_expr(*left, schema, planner_context)
.await?,
);
let stop = Box::new(
self.sql_expr_to_logical_expr(*right, schema, planner_context)
.await?,
);

GetFieldAccess::ListRange { start, stop }
}
_ => GetFieldAccess::ListIndex {
key: Box::new(
self.sql_expr_to_logical_expr(expr, schema, planner_context)
.await?,
),
},
};

Ok(field)
}

#[async_recursion]
async fn plan_indexed(
&mut self,
expr: Expr,
mut keys: Vec<SQLExpr>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let indices = keys.pop().ok_or_else(|| {
ParserError("Internal error: Missing index key expression".to_string())
})?;

let expr = if !keys.is_empty() {
self.plan_indexed(expr, keys, schema, planner_context)
.await?
} else {
expr
};

Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(expr),
self.plan_indices(indices, schema, planner_context).await?,
)))
}
}

// modifies expr if it is a placeholder with datatype of right
Expand Down Expand Up @@ -746,42 +812,6 @@ fn infer_placeholder_types(expr: Expr, schema: &DFSchema) -> Result<Expr> {
})
}

fn plan_key(key: SQLExpr) -> Result<ScalarValue> {
let scalar = match key {
SQLExpr::Value(Value::Number(s, _)) => ScalarValue::Int64(Some(
s.parse()
.map_err(|_| ParserError(format!("Cannot parse {s} as i64.")))?,
)),
SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
ScalarValue::Utf8(Some(s))
}
_ => {
return Err(DataFusionError::SQL(ParserError(format!(
"Unsuported index key expression: {key:?}"
))));
}
};

Ok(scalar)
}

fn plan_indexed(expr: Expr, mut keys: Vec<SQLExpr>) -> Result<Expr> {
let key = keys
.pop()
.ok_or_else(|| ParserError("Internal error: Missing index key expression".to_string()))?;

let expr = if !keys.is_empty() {
plan_indexed(expr, keys)?
} else {
expr
};

Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(expr),
plan_key(key)?,
)))
}

// #[cfg(test)]
// mod tests {
// use super::*;
Expand Down
10 changes: 10 additions & 0 deletions crates/datafusion_ext/src/planner/set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
let all = match set_quantifier {
SetQuantifier::All => true,
SetQuantifier::Distinct | SetQuantifier::None => false,
SetQuantifier::ByName => {
return Err(DataFusionError::NotImplemented(
"UNION BY NAME not implemented".to_string(),
));
}
SetQuantifier::AllByName => {
return Err(DataFusionError::NotImplemented(
"UNION ALL BY NAME not implemented".to_string(),
))
}
};

let left_plan = self.set_expr_to_plan(*left, planner_context).await?;
Expand Down
Loading

0 comments on commit 333b3ae

Please sign in to comment.