Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update datafusion and delta-rs #1773

Merged
merged 5 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,960 changes: 876 additions & 1,084 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@ edition = "2021"
lto = "thin"

[workspace.dependencies]
datafusion = { version = "30.0", features = ["avro"] }
datafusion-proto = { version = "30.0" }
object_store = { version = "0.6.1" }
datafusion = { version = "31.0", features = ["avro"] }
datafusion-proto = { version = "31.0" }
object_store = { version = "0.7" }
tokio = { version = "1", features = ["full"] }
url = "2.4.0"
thiserror = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.104"
chrono = "0.4.31"

[workspace.dependencies.deltalake]
git = "https://github.com/delta-io/delta-rs.git"
branch = "main"
features = ["s3", "gcs", "azure", "datafusion", "arrow", "parquet"]

[patch.crates-io]
object_store = { git = "https://github.com/apache/arrow-rs", rev = "a03ce564f1c95e10c78e6a065996cb036ca13cef" }
8 changes: 4 additions & 4 deletions crates/datafusion_ext/src/planner/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,10 @@ mod test {
#[test]
fn test_form_identifier() -> Result<()> {
let err = form_identifier(&[]).expect_err("empty identifiers didn't fail");
let expected = "Internal error: Incorrect number of identifiers: 0. \
let expected = "Internal error: Incorrect number of identifiers: 0.\n\
This was likely caused by a bug in DataFusion's code and we would \
welcome that you file an bug report in our issue tracker";
assert_eq!(err.to_string(), expected);
assert!(expected.starts_with(&err.strip_backtrace()));

let ids = vec!["a".to_string()];
let (qualifier, column) = form_identifier(&ids)?;
Expand Down Expand Up @@ -475,10 +475,10 @@ mod test {
"e".to_string(),
])
.expect_err("too many identifiers didn't fail");
let expected = "Internal error: Incorrect number of identifiers: 5. \
let expected = "Internal error: Incorrect number of identifiers: 5.\n\
This was likely caused by a bug in DataFusion's code and we would \
welcome that you file an bug report in our issue tracker";
assert_eq!(err.to_string(), expected);
assert!(expected.starts_with(&err.strip_backtrace()));

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/datafusion_ext/src/planner/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
expr,
substring_from,
substring_for,
special: false,
} => {
self.sql_substring_to_expr(
expr,
Expand Down
1 change: 1 addition & 0 deletions crates/datafusion_ext/src/planner/expr/substring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
expr,
substring_from: None,
substring_for: None,
special: false,
};

return Err(DataFusionError::Plan(format!(
Expand Down
18 changes: 9 additions & 9 deletions crates/datafusion_ext/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,20 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {

fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
match sql_type {
SQLDataType::Boolean => Ok(DataType::Boolean),
SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
SQLDataType::TinyInt(_) => Ok(DataType::Int8),
SQLDataType::SmallInt(_) => Ok(DataType::Int16),
SQLDataType::Int(_) | SQLDataType::Integer(_) => Ok(DataType::Int32),
SQLDataType::BigInt(_) => Ok(DataType::Int64),
SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => Ok(DataType::Int32),
SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
SQLDataType::UnsignedTinyInt(_) => Ok(DataType::UInt8),
SQLDataType::UnsignedSmallInt(_) => Ok(DataType::UInt16),
SQLDataType::UnsignedInt(_) | SQLDataType::UnsignedInteger(_) => {
SQLDataType::UnsignedSmallInt(_) | SQLDataType::UnsignedInt2(_) => Ok(DataType::UInt16),
SQLDataType::UnsignedInt(_) | SQLDataType::UnsignedInteger(_) | SQLDataType::UnsignedInt4(_) => {
Ok(DataType::UInt32)
}
SQLDataType::UnsignedBigInt(_) => Ok(DataType::UInt64),
SQLDataType::UnsignedBigInt(_) | SQLDataType::UnsignedInt8(_) => Ok(DataType::UInt64),
SQLDataType::Float(_) => Ok(DataType::Float32),
SQLDataType::Real => Ok(DataType::Float32),
SQLDataType::Double | SQLDataType::DoublePrecision => Ok(DataType::Float64),
SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
SQLDataType::Double | SQLDataType::DoublePrecision | SQLDataType::Float8 => Ok(DataType::Float64),
SQLDataType::Char(_)
| SQLDataType::Varchar(_)
| SQLDataType::Text
Expand Down
71 changes: 64 additions & 7 deletions crates/datafusion_ext/src/planner/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::utils::{
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
resolve_columns, resolve_positions_to_exprs,
};
use datafusion::common::{DataFusionError, Result};
use async_recursion::async_recursion;
use datafusion::common::{plan_err, DataFusionError, Result};
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
Expand All @@ -33,9 +34,11 @@ use datafusion::logical_expr::utils::{
use datafusion::logical_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use datafusion::prelude::Column;
use datafusion::sql::planner::PlannerContext;
use datafusion::sql::sqlparser::ast::{
Distinct, Expr as SQLExpr, NamedWindowDefinition, WildcardAdditionalOptions, WindowType,
Distinct, Expr as SQLExpr, NamedWindowDefinition, ReplaceSelectItem, WildcardAdditionalOptions,
WindowType,
};
use datafusion::sql::sqlparser::ast::{Select, SelectItem, TableWithJoins};
use std::collections::HashSet;
Expand Down Expand Up @@ -321,6 +324,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
}

/// Generate a relational expression from a select SQL expression
#[async_recursion]
async fn sql_select_to_rex(
&mut self,
sql: SelectItem,
Expand Down Expand Up @@ -356,18 +360,33 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
Self::check_wildcard_options(&options)?;

if empty_from {
return Err(DataFusionError::Plan(
"SELECT * with no tables specified is not valid".to_string(),
));
return plan_err!("SELECT * with no tables specified is not valid");
}
// do not expand from outer schema
expand_wildcard(plan.schema().as_ref(), plan, Some(options))
let expanded_exprs = expand_wildcard(plan.schema().as_ref(), plan, Some(&options))?;
// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
if let Some(replace) = options.opt_replace {
self.replace_columns(plan, empty_from, planner_context, expanded_exprs, replace)
.await
} else {
Ok(expanded_exprs)
}
}
SelectItem::QualifiedWildcard(ref object_name, options) => {
Self::check_wildcard_options(&options)?;
let qualifier = format!("{object_name}");
// do not expand from outer schema
expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), Some(options))
let expanded_exprs =
expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), Some(&options))?;
// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
if let Some(replace) = options.opt_replace {
self.replace_columns(plan, empty_from, planner_context, expanded_exprs, replace)
.await
} else {
Ok(expanded_exprs)
}
}
}
}
Expand All @@ -390,6 +409,44 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> {
}
}

/// If there is a REPLACE statement in the projected expression in the form of
/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
/// that column with the given replace expression. Column name remains the same.
/// Multiple REPLACEs are also possible with comma separations.
async fn replace_columns(
&mut self,
plan: &LogicalPlan,
empty_from: bool,
planner_context: &mut PlannerContext,
mut exprs: Vec<Expr>,
replace: ReplaceSelectItem,
) -> Result<Vec<Expr>> {
for expr in exprs.iter_mut() {
if let Expr::Column(Column { name, .. }) = expr {
if let Some(item) = replace
.items
.iter()
.find(|item| item.column_name.value == *name)
{
let new_expr = self
.sql_select_to_rex(
SelectItem::UnnamedExpr(item.expr.clone()),
plan,
empty_from,
planner_context,
)
.await?[0]
.clone();
*expr = Expr::Alias(Alias {
expr: Box::new(new_expr),
name: name.clone(),
});
}
}
}
Ok(exprs)
}

/// Wrap a plan in a projection
fn project(&self, input: LogicalPlan, expr: Vec<Expr>) -> Result<LogicalPlan> {
self.validate_schema_satisfies_exprs(input.schema(), &expr)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bigquery-storage = { git = "https://github.com/glaredb/bigquery-storage", branch
bitflags = "2.3"
bitvec = "1"
bytes = "1.4.0"
chrono = "0.4.26"
chrono = { workspace = true }
datafusion = { workspace = true }
decimal = { path = "../decimal" }
deltalake = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/datasources/src/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ fn mysql_row_to_record_batch(rows: Vec<MysqlRow>, schema: ArrowSchemaRef) -> Res
for row in rows.iter() {
let val: Option<NaiveDateTime> =
row.get_opt(col_idx).expect("row value should exist")?;
let val = val.map(|v| v.timestamp_nanos());
let val = val.map(|v| v.timestamp_nanos_opt().unwrap());
arr.append_option(val);
}
Arc::new(arr.finish())
Expand All @@ -595,7 +595,7 @@ fn mysql_row_to_record_batch(rows: Vec<MysqlRow>, schema: ArrowSchemaRef) -> Res
for row in rows.iter() {
let val: Option<NaiveDateTime> =
row.get_opt(col_idx).expect("row value should exist")?;
let val = val.map(|v| v.timestamp_nanos());
let val = val.map(|v| v.timestamp_nanos_opt().unwrap());
arr.append_option(val);
}
Arc::new(arr.finish())
Expand Down
8 changes: 1 addition & 7 deletions crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,7 @@ impl NativeTableStorage {
let _ = Self::opts_from_ent(table)?; // Check that this is the correct table type.

let delta_store = self.create_delta_store_for_table(table).await?;
let mut table = DeltaTable::new(
delta_store,
DeltaTableConfig {
require_tombstones: true,
require_files: true,
},
);
let mut table = DeltaTable::new(delta_store, DeltaTableConfig::default());

table.load().await?;

Expand Down
4 changes: 2 additions & 2 deletions crates/datasources/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ fn binary_rows_to_record_batch<E: Into<PostgresError>>(
let mut arr = TimestampNanosecondBuilder::with_capacity(rows.len());
for row in rows.iter() {
let val: Option<NaiveDateTime> = row.try_get(col_idx)?;
let val = val.map(|v| v.timestamp_nanos());
let val = val.map(|v| v.timestamp_nanos_opt().unwrap());
arr.append_option(val);
}
Arc::new(arr.finish())
Expand All @@ -1073,7 +1073,7 @@ fn binary_rows_to_record_batch<E: Into<PostgresError>>(
.with_data_type(dt.clone());
for row in rows.iter() {
let val: Option<DateTime<Utc>> = row.try_get(col_idx)?;
let val = val.map(|v| v.timestamp_nanos());
let val = val.map(|v| v.timestamp_nanos_opt().unwrap());
arr.append_option(val);
}
Arc::new(arr.finish())
Expand Down
2 changes: 1 addition & 1 deletion crates/logutil/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ edition = {workspace = true}
tracing = "0.1"
tracing-subscriber = {version = "0.3", features = ["std", "fmt", "json", "env-filter"] }
tracing-log = "0.1"
chrono = "0.4.29"
chrono = { workspace = true }
2 changes: 1 addition & 1 deletion crates/pgrepr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ datafusion = {workspace = true}
tokio-postgres = { version = "0.7.8", features = ["with-uuid-1", "with-serde_json-1", "with-chrono-0_4"] }
num-traits = "0.2.16"
dtoa = "1.0.9"
chrono = "0.4.26"
chrono = { workspace = true }
chrono-tz = "0.8.3"
tracing = "0.1"
decimal = { path = "../decimal" }
8 changes: 4 additions & 4 deletions crates/pgrepr/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ impl Scalar {
(Self::Text(v), ArrowType::Utf8) => DfScalar::Utf8(Some(v)),
(Self::Bytea(v), ArrowType::Binary) => DfScalar::Binary(Some(v)),
(Self::Timestamp(v), ArrowType::Timestamp(TimeUnit::Microsecond, None)) => {
let nanos = v.timestamp_nanos();
let nanos = v.timestamp_nanos_opt().unwrap();
let micros = nanos_to_micros(nanos);
DfScalar::TimestampMicrosecond(Some(micros), None)
}
(Self::Timestamp(v), ArrowType::Timestamp(TimeUnit::Nanosecond, None)) => {
let nanos = v.timestamp_nanos();
let nanos = v.timestamp_nanos_opt().unwrap();
DfScalar::TimestampNanosecond(Some(nanos), None)
}
(
Expand All @@ -224,7 +224,7 @@ impl Scalar {
v, arrow_type
)));
}
let nanos = v.timestamp_nanos();
let nanos = v.timestamp_nanos_opt().unwrap();
let micros = nanos_to_micros(nanos);
DfScalar::TimestampMicrosecond(Some(micros), Some(tz.clone()))
}
Expand All @@ -238,7 +238,7 @@ impl Scalar {
v, arrow_type
)));
}
let nanos = v.timestamp_nanos();
let nanos = v.timestamp_nanos_opt().unwrap();
DfScalar::TimestampNanosecond(Some(nanos), Some(tz.clone()))
}
(Self::Time(v), ArrowType::Time64(TimeUnit::Microsecond)) => {
Expand Down
4 changes: 3 additions & 1 deletion crates/protogen/src/sqlexec/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ pub struct RuntimeGroupExec {}
pub struct AnalyzeExec {
#[prost(bool, tag = "1")]
pub verbose: bool,
#[prost(message, tag = "2")]
#[prost(bool, tag = "2")]
pub show_statistics: bool,
#[prost(message, tag = "3")]
pub schema: Option<Schema>,
}

Expand Down
2 changes: 1 addition & 1 deletion crates/repr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ edition = {workspace = true}
thiserror.workspace = true
num-traits = "0.2.16"
dtoa = "1.0.9"
chrono = "0.4.26"
chrono = { workspace = true }
decimal = { path = "../decimal" }
2 changes: 1 addition & 1 deletion crates/snowflake_connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ reqwest = { version = "0.11.18", default-features = false, features = ["json", "
serde = { workspace = true }
serde_json = { workspace = true }
tracing = "0.1"
chrono = "0.4.26"
chrono = { workspace = true }
uuid = { version = "1.4.1", features = ["v4", "fast-rng", "macro-diagnostics"] }
datafusion = { workspace = true }
hex = "0.4.3"
Expand Down
2 changes: 2 additions & 0 deletions crates/sqlexec/src/extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> {
.ok_or_else(|| DataFusionError::Internal("missing schema".to_string()))?;
Arc::new(AnalyzeExec::new(
ext.verbose,
ext.show_statistics,
input.clone(),
Arc::new((&schema).try_into()?),
))
Expand Down Expand Up @@ -974,6 +975,7 @@ impl<'a> PhysicalExtensionCodec for GlareDBExtensionCodec<'a> {
// TODO: update this once verbose is set to pub in datafusion
proto::ExecutionPlanExtensionType::AnalyzeExec(proto::AnalyzeExec {
verbose: true,
show_statistics: true,
schema: Some(exec.schema().try_into()?),
})
} else {
Expand Down
20 changes: 11 additions & 9 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1526,22 +1526,24 @@ fn convert_data_type(sql_type: &ast::DataType) -> Result<DataType> {
}
}

// TODO: We already copy this in by way of the `datafusion_ext` crate. Is there
// a way to ensure we only have a single copy?
fn convert_simple_data_type(sql_type: &ast::DataType) -> Result<DataType> {
match sql_type {
ast::DataType::Boolean => Ok(DataType::Boolean),
ast::DataType::Boolean | ast::DataType::Bool => Ok(DataType::Boolean),
ast::DataType::TinyInt(_) => Ok(DataType::Int8),
ast::DataType::SmallInt(_) => Ok(DataType::Int16),
ast::DataType::Int(_) | ast::DataType::Integer(_) => Ok(DataType::Int32),
ast::DataType::BigInt(_) => Ok(DataType::Int64),
ast::DataType::SmallInt(_) | ast::DataType::Int2(_) => Ok(DataType::Int16),
ast::DataType::Int(_) | ast::DataType::Integer(_) | ast::DataType::Int4(_) => Ok(DataType::Int32),
ast::DataType::BigInt(_) | ast::DataType::Int8(_) => Ok(DataType::Int64),
ast::DataType::UnsignedTinyInt(_) => Ok(DataType::UInt8),
ast::DataType::UnsignedSmallInt(_) => Ok(DataType::UInt16),
ast::DataType::UnsignedInt(_) | ast::DataType::UnsignedInteger(_) => {
ast::DataType::UnsignedSmallInt(_) | ast::DataType::UnsignedInt2(_) => Ok(DataType::UInt16),
ast::DataType::UnsignedInt(_) | ast::DataType::UnsignedInteger(_) | ast::DataType::UnsignedInt4(_) => {
Ok(DataType::UInt32)
}
ast::DataType::UnsignedBigInt(_) => Ok(DataType::UInt64),
ast::DataType::UnsignedBigInt(_) | ast::DataType::UnsignedInt8(_) => Ok(DataType::UInt64),
ast::DataType::Float(_) => Ok(DataType::Float32),
ast::DataType::Real => Ok(DataType::Float32),
ast::DataType::Double | ast::DataType::DoublePrecision => Ok(DataType::Float64),
ast::DataType::Real | ast::DataType::Float4 => Ok(DataType::Float32),
ast::DataType::Double | ast::DataType::DoublePrecision | ast::DataType::Float8 => Ok(DataType::Float64),
ast::DataType::Char(_)
| ast::DataType::Varchar(_)
| ast::DataType::Text
Expand Down