diff --git a/Cargo.lock b/Cargo.lock index 209bbff53..00ed18d1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,33 +158,6 @@ version = "1.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854" -[[package]] -name = "apache-avro" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cf4144857f9e4d7dd6cc4ba4c78efd2a46bad682b029bd0d91e76a021af1b2a" -dependencies = [ - "byteorder", - "crc32fast", - "digest 0.10.7", - "lazy_static", - "libflate", - "log", - "num-bigint", - "quad-rand", - "rand 0.8.5", - "regex", - "serde", - "serde_json", - "snap", - "strum 0.24.1", - "strum_macros 0.24.3", - "thiserror", - "typed-builder 0.10.0", - "uuid", - "zerocopy", -] - [[package]] name = "apache-avro" version = "0.15.0" @@ -192,6 +165,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c0fdddc3fdac97394ffcc5c89c634faa9c1c166ced54189af34e407c97b6ee7" dependencies = [ "byteorder", + "crc32fast", "digest 0.10.7", "lazy_static", "libflate", @@ -202,6 +176,7 @@ dependencies = [ "regex", "serde", "serde_json", + "snap", "strum 0.25.0", "strum_macros 0.25.1", "thiserror", @@ -230,9 +205,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2feeebd77b34b0bc88f224e06d01c27da4733997cc4789a4e056196656cdc59a" +checksum = "b7104b9e9761613ae92fe770c741d6bbf1dbc791a0fe204400aebdd429875741" dependencies = [ "ahash 0.8.2", "arrow-arith", @@ -253,9 +228,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7173f5dc49c0ecb5135f52565af33afd3fdc9a12d13bd6f9973e8b96305e4b2e" +checksum = "38e597a8e8efb8ff52c50eaf8f4d85124ce3c1bf20fab82f476d73739d9ab1c2" dependencies = [ "arrow-array", "arrow-buffer", @@ -268,9 +243,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63d7ea725f7d1f8bb2cffc53ef538557e95fc802e217d5be25122d402e22f3d0" +checksum = "2a86d9c1473db72896bd2345ebb6b8ad75b8553ba390875c76708e8dc5c5492d" dependencies = [ "ahash 0.8.2", "arrow-buffer", @@ -285,9 +260,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdbe439e077f484e5000b9e1d47b5e4c0d15f2b311a8f5bcc682553d5d67a722" +checksum = "234b3b1c8ed00c874bf95972030ac4def6f58e02ea5a7884314388307fb3669b" dependencies = [ "half", "num", @@ -295,9 +270,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93913cc14875770aa1eef5e310765e855effa352c094cb1c7c00607d0f37b4e1" +checksum = "22f61168b853c7faea8cea23a2169fdff9c82fb10ae5e2c07ad1cab8f6884931" dependencies = [ "arrow-array", "arrow-buffer", @@ -313,9 +288,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef55b67c55ed877e6fe7b923121c19dae5e31ca70249ea2779a17b58fb0fbd9a" +checksum = "10b545c114d9bf8569c84d2fbe2020ac4eea8db462c0a37d0b65f41a90d066fe" dependencies = [ "arrow-array", "arrow-buffer", @@ -332,9 +307,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4f4f4a3c54614126a71ab91f6631c9743eb4643d6e9318b74191da9dc6e028b" +checksum = "c6b6852635e7c43e5b242841c7470606ff0ee70eef323004cacc3ecedd33dd8f" dependencies = [ "arrow-buffer", "arrow-schema", @@ -344,9 +319,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d41a3659f984a524ef1c2981d43747b24d8eec78e2425267fcd0ef34ce71cd18" +checksum = "a66da9e16aecd9250af0ae9717ae8dd7ea0d8ca5a3e788fe3de9f4ee508da751" dependencies = [ "arrow-array", "arrow-buffer", @@ -358,9 +333,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b95faa95a378f56ef32d84cc0104ea998c39ef7cd1faaa6b4cebf8ea92846d" +checksum = "60ee0f9d8997f4be44a60ee5807443e396e025c23cf14d2b74ce56135cb04474" dependencies = [ "arrow-array", "arrow-buffer", @@ -378,9 +353,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c68549a4284d9f8b39586afb8d5ff8158b8f0286353a4844deb1d11cf1ba1f26" +checksum = "7fcab05410e6b241442abdab6e1035177dc082bdb6f17049a4db49faed986d63" dependencies = [ "arrow-array", "arrow-buffer", @@ -393,9 +368,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a75a4a757afc301ce010adadff54d79d66140c4282ed3de565f6ccb716a5cf3" +checksum = "91a847dd9eb0bacd7836ac63b3475c68b2210c2c96d0ec1b808237b973bd5d73" dependencies = [ "ahash 0.8.2", "arrow-array", @@ -408,18 +383,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bebcb57eef570b15afbcf2d07d813eb476fde9f6dd69c81004d6476c197e87e" +checksum = "54df8c47918eb634c20e29286e69494fdc20cafa5173eb6dad49c7f6acece733" dependencies = [ "bitflags 2.3.3", ] [[package]] name = "arrow-select" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6e2943fa433a48921e914417173816af64eef61c0a3d448280e6c40a62df221" +checksum = "941dbe481da043c4bd40c805a19ec2fc008846080c4953171b62bcad5ee5f7fb" dependencies = [ "arrow-array", "arrow-buffer", @@ -430,9 +405,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbc92ed638851774f6d7af1ad900b92bc1486746497511868b4298fcbcfa35af" +checksum = "359b2cd9e071d5a3bcf44679f9d85830afebc5b9c98a08019a570a65ae933e0f" dependencies = [ "arrow-array", "arrow-buffer", @@ -1005,9 +980,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.26" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" +checksum = "f56b4c72906975ca04becb8a30e102dfecddd0c06181e3e95ddc444be28881f8" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1016,7 +991,7 @@ dependencies = [ "serde", "time 0.1.43", "wasm-bindgen", - "winapi", + "windows-targets 0.48.0", ] [[package]] @@ -1569,12 +1544,12 @@ dependencies = [ [[package]] name = "datafusion" -version = "28.0.0" +version = "30.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ddbcb2dda5b5033537457992ebde78938014390b2b19f9f4282e3be0e18b0c3" +checksum = "45e3bb3a788d9fa793268e9cec2601d79831ed1be437ba74d1deb32b226ae734" dependencies = [ "ahash 0.8.2", - "apache-avro 0.14.0", + "apache-avro", "arrow", "arrow-array", "arrow-schema", @@ -1620,30 +1595,41 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "28.0.0" +version = "30.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85fbb7b4da925031311743ab96662d55f0f7342d3692744f184f99b2257ef435" +checksum = "0dd256483875270612d4fa439359bafa6f1760bae080ecb69eecc59a92b5016f" dependencies = [ - "apache-avro 0.14.0", + "apache-avro", "arrow", "arrow-array", + "async-compression", + "bytes", + "bzip2", "chrono", + "flate2", + "futures", "num_cpus", "object_store", "parquet", "pyo3", "sqlparser", + "tokio", + "tokio-util", + "xz2", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] name = "datafusion-execution" -version = "28.0.0" +version = "30.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb3617466d894eb0ad11d06bab1e6e89c571c0a27d660685d327d0c6e1e1ccd" +checksum = "4973610d680bdc38f409a678c838d3873356cc6c29a543d1f56d7b4801e8d0a4" dependencies = [ + "arrow", "dashmap", "datafusion-common", "datafusion-expr", + "futures", "hashbrown 0.14.0", "log", "object_store", @@ -1655,9 +1641,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "28.0.0" +version = "30.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bd8220a0dfcdfddcc785cd7e71770ef1ce54fbe1e08984e5adf537027ecb6de" +checksum = "7f3599f4cfcf22490f7b7d6d2fc70610ca8045b8bdcd99ef9d4309cf2b387537" dependencies = [ "ahash 0.8.2", "arrow", @@ -1670,9 +1656,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "28.0.0" +version = "30.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d685a100c66952aaadd0cbe766df46d1887d58fc8bcf3589e6387787f18492b" +checksum = "f067401eea6a0967c83021e714746f9153368cca964d45c4a1a4f99869a1512f" dependencies = [ "arrow", "async-trait", @@ -1688,9 +1674,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "28.0.0" +version = "30.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f2c635da9b05b4b4c6c8d935f46fd99f9b6225f834091cf4e3c8a045b68beab" +checksum = "964c19161288d374fe066535f84de37a1dab419e47a24e02f3a0ca6413744451" dependencies = [ "ahash 0.8.2", "arrow", @@ -1723,9 +1709,9 @@ dependencies = [ [[package]] name = "datafusion-proto" -version = "28.0.0" +version = "30.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60b7088150bc98c46e7b3a7168b96ea020c0eff682af08a6d1f3b8415ce1d7a7" +checksum = "4fde2768f10f1a5d47d164e0219ececb00f0dcd36f33079b656e03ad20e33c68" dependencies = [ "arrow", "chrono", @@ -1738,9 +1724,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "28.0.0" +version = "30.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3ef8abf4dd84d3f20c910822b52779c035ab7f4f2d5e7125ede3bae618e9de8" +checksum = "5b0939df21e440efcb35078c22b0192c537f7a53ebf1a34288a3a134753dd364" dependencies = [ "arrow", "arrow-schema", @@ -1776,7 +1762,7 @@ dependencies = [ name = "datasources" version = "0.4.0" dependencies = [ - "apache-avro 0.15.0", + "apache-avro", "async-channel", "async-stream", "async-trait", @@ -1838,7 +1824,7 @@ dependencies = [ [[package]] name = "deltalake" version = "0.14.0" -source = "git+https://github.com/delta-io/delta-rs.git?branch=main#0f850339924f1d7c96fc5dd3b94a738c42ea9399" +source = "git+https://github.com/glaredb/delta-rs.git?branch=fork#c83e4088e57a49239b8d3171c2b767fc1e43cf1c" dependencies = [ "arrow", "arrow-array", @@ -3571,9 +3557,9 @@ dependencies = [ [[package]] name = "num" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" +checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af" dependencies = [ "num-bigint", "num-complex", @@ -3846,9 +3832,9 @@ dependencies = [ [[package]] name = "parquet" -version = "43.0.0" +version = "45.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec7267a9607c3f955d4d0ac41b88a67cecc0d8d009173ad3da390699a6cb3750" +checksum = "49f9739b984380582bdb7749ae5b5d28839bce899212cf16465c1ac1f8b65d79" dependencies = [ "ahash 0.8.2", "arrow-array", @@ -5840,9 +5826,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.35.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca597d77c98894be1f965f2e4e2d2a61575d4998088e655476c73715c54b2b43" +checksum = "2eaa1e88e78d2c2460d78b7dc3f0c08dbb606ab4222f9aff36f420d36e307d87" dependencies = [ "log", "sqlparser_derive", diff --git a/Cargo.toml b/Cargo.toml index 2665077fa..9a5cdd309 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,8 @@ edition = "2021" lto = "thin" [workspace.dependencies] -datafusion = { version = "28.0", features = ["avro"] } -datafusion-proto = { version = "28.0" } +datafusion = { version = "30.0", features = ["avro"] } +datafusion-proto = { version = "30.0" } object_store = { version = "0.6.1" } tokio = { version = "1", features = ["full"] } url = "2.4.0" @@ -21,6 +21,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.104" [workspace.dependencies.deltalake] -git = "https://github.com/delta-io/delta-rs.git" -branch = "main" +git = "https://github.com/glaredb/delta-rs.git" +branch = "fork" features = ["s3", "gcs", "azure", "datafusion", "arrow", "parquet"] diff --git a/crates/datafusion_ext/src/local_hint.rs b/crates/datafusion_ext/src/local_hint.rs index 011dc9485..1296ff704 100644 --- a/crates/datafusion_ext/src/local_hint.rs +++ b/crates/datafusion_ext/src/local_hint.rs @@ -73,8 +73,9 @@ impl TableProvider for LocalTableHint { &self, state: &SessionState, input: Arc, + overwrite: bool, ) -> Result> { - let plan = self.0.insert_into(state, input).await?; + let plan = self.0.insert_into(state, input, overwrite).await?; Ok(plan) } } diff --git a/crates/datafusion_ext/src/planner/expr/identifier.rs b/crates/datafusion_ext/src/planner/expr/identifier.rs index 3ba01826b..49d38d492 100644 --- a/crates/datafusion_ext/src/planner/expr/identifier.rs +++ b/crates/datafusion_ext/src/planner/expr/identifier.rs @@ -16,10 +16,9 @@ // under the License. use crate::planner::{AsyncContextProvider, SqlQueryPlanner}; -use datafusion::common::{ - Column, DFField, DFSchema, DataFusionError, Result, ScalarValue, TableReference, -}; -use datafusion::logical_expr::{Case, Expr, GetIndexedField}; +use datafusion::common::{Column, DFField, DFSchema, DataFusionError, Result, TableReference}; +use datafusion::logical_expr::{Case, Expr}; +use datafusion::physical_plan::internal_err; use datafusion::sql::planner::PlannerContext; use datafusion::sql::sqlparser::ast::{Expr as SQLExpr, Ident}; @@ -91,9 +90,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { planner_context: &mut PlannerContext, ) -> Result { if ids.len() < 2 { - return Err(DataFusionError::Internal(format!( - "Not a compound identifier: {ids:?}" - ))); + return internal_err!("Not a compound identifier: {ids:?}"); } if ids[0].value.starts_with('@') { @@ -121,9 +118,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { // Though ideally once that support is in place, this code should work with it // TODO: remove when can support multiple nested identifiers if ids.len() > 5 { - return Err(DataFusionError::Internal(format!( - "Unsupported compound identifier: {ids:?}" - ))); + return internal_err!("Unsupported compound identifier: {ids:?}"); } let search_result = search_dfschema(&ids, schema); @@ -132,16 +127,13 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { Some((field, nested_names)) if !nested_names.is_empty() => { // TODO: remove when can support multiple nested identifiers if nested_names.len() > 1 { - return Err(DataFusionError::Internal(format!( + return internal_err!( "Nested identifiers not yet supported for column {}", field.qualified_column().quoted_flat_name() - ))); + ); } let nested_name = nested_names[0].to_string(); - Ok(Expr::GetIndexedField(GetIndexedField::new( - Box::new(Expr::Column(field.qualified_column())), - ScalarValue::Utf8(Some(nested_name)), - ))) + Ok(Expr::Column(field.qualified_column()).field(nested_name)) } // found matching field with no spare identifier(s) Some((field, _nested_names)) => Ok(Expr::Column(field.qualified_column())), @@ -149,9 +141,7 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { // return default where use all identifiers to not have a nested field // this len check is because at 5 identifiers will have to have a nested field if ids.len() == 5 { - Err(DataFusionError::Internal(format!( - "Unsupported compound identifier: {ids:?}" - ))) + internal_err!("Unsupported compound identifier: {ids:?}") } else { // check the outer_query_schema and try to find a match if let Some(outer) = planner_context.outer_query_schema() { @@ -160,10 +150,10 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { // found matching field with spare identifier(s) for nested field(s) in structure Some((field, nested_names)) if !nested_names.is_empty() => { // TODO: remove when can support nested identifiers for OuterReferenceColumn - Err(DataFusionError::Internal(format!( + internal_err!( "Nested identifiers are not yet supported for OuterReferenceColumn {}", field.qualified_column().quoted_flat_name() - ))) + ) } // found matching field with no spare identifier(s) Some((field, _nested_names)) => { diff --git a/crates/datafusion_ext/src/planner/expr/mod.rs b/crates/datafusion_ext/src/planner/expr/mod.rs index 4931e9201..107f16e72 100644 --- a/crates/datafusion_ext/src/planner/expr/mod.rs +++ b/crates/datafusion_ext/src/planner/expr/mod.rs @@ -35,10 +35,12 @@ use datafusion::logical_expr::expr::ScalarFunction; use datafusion::logical_expr::expr::{InList, Placeholder}; use datafusion::logical_expr::{ col, expr, lit, AggregateFunction, Between, BinaryExpr, BuiltinScalarFunction, Cast, Expr, - ExprSchemable, GetIndexedField, Like, Operator, TryCast, + ExprSchemable, GetFieldAccess, GetIndexedField, Like, Operator, TryCast, }; use datafusion::sql::planner::PlannerContext; -use datafusion::sql::sqlparser::ast::{ArrayAgg, Expr as SQLExpr, Interval, TrimWhereField, Value}; +use datafusion::sql::sqlparser::ast::{ + ArrayAgg, Expr as SQLExpr, Interval, JsonOperator, TrimWhereField, Value, +}; use datafusion::sql::sqlparser::parser::ParserError::ParserError; impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { @@ -191,7 +193,13 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { SQLExpr::MapAccess { column, keys } => { if let SQLExpr::Identifier(id) = *column { - plan_indexed(col(self.normalizer.normalize(id)), keys) + self.plan_indexed( + col(self.normalizer.normalize(id)), + keys, + schema, + planner_context, + ) + .await } else { Err(DataFusionError::NotImplemented(format!( "map access requires an identifier, found column {column} instead" @@ -203,7 +211,8 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { let expr = self .sql_expr_to_logical_expr(*obj, schema, planner_context) .await?; - plan_indexed(expr, indexes) + self.plan_indexed(expr, indexes, schema, planner_context) + .await } SQLExpr::CompoundIdentifier(ids) => { @@ -405,7 +414,6 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { "binary_op should be handled by sql_expr_to_logical_expr.".to_string(), )), - #[cfg(feature = "unicode_expressions")] SQLExpr::Substring { expr, substring_from, @@ -421,12 +429,6 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { .await } - #[cfg(not(feature = "unicode_expressions"))] - SQLExpr::Substring { .. } => Err(DataFusionError::Internal( - "statement substring requires compilation with feature flag: unicode_expressions." - .to_string(), - )), - SQLExpr::Trim { expr, trim_where, @@ -711,6 +713,70 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { )), } } + + async fn plan_indices( + &mut self, + expr: SQLExpr, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let field = match expr.clone() { + SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => { + GetFieldAccess::NamedStructField { + name: ScalarValue::Utf8(Some(s)), + } + } + SQLExpr::JsonAccess { + left, + operator: JsonOperator::Colon, + right, + } => { + let start = Box::new( + self.sql_expr_to_logical_expr(*left, schema, planner_context) + .await?, + ); + let stop = Box::new( + self.sql_expr_to_logical_expr(*right, schema, planner_context) + .await?, + ); + + GetFieldAccess::ListRange { start, stop } + } + _ => GetFieldAccess::ListIndex { + key: Box::new( + self.sql_expr_to_logical_expr(expr, schema, planner_context) + .await?, + ), + }, + }; + + Ok(field) + } + + #[async_recursion] + async fn plan_indexed( + &mut self, + expr: Expr, + mut keys: Vec, + schema: &DFSchema, + planner_context: &mut PlannerContext, + ) -> Result { + let indices = keys.pop().ok_or_else(|| { + ParserError("Internal error: Missing index key expression".to_string()) + })?; + + let expr = if !keys.is_empty() { + self.plan_indexed(expr, keys, schema, planner_context) + .await? + } else { + expr + }; + + Ok(Expr::GetIndexedField(GetIndexedField::new( + Box::new(expr), + self.plan_indices(indices, schema, planner_context).await?, + ))) + } } // modifies expr if it is a placeholder with datatype of right @@ -746,42 +812,6 @@ fn infer_placeholder_types(expr: Expr, schema: &DFSchema) -> Result { }) } -fn plan_key(key: SQLExpr) -> Result { - let scalar = match key { - SQLExpr::Value(Value::Number(s, _)) => ScalarValue::Int64(Some( - s.parse() - .map_err(|_| ParserError(format!("Cannot parse {s} as i64.")))?, - )), - SQLExpr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => { - ScalarValue::Utf8(Some(s)) - } - _ => { - return Err(DataFusionError::SQL(ParserError(format!( - "Unsuported index key expression: {key:?}" - )))); - } - }; - - Ok(scalar) -} - -fn plan_indexed(expr: Expr, mut keys: Vec) -> Result { - let key = keys - .pop() - .ok_or_else(|| ParserError("Internal error: Missing index key expression".to_string()))?; - - let expr = if !keys.is_empty() { - plan_indexed(expr, keys)? - } else { - expr - }; - - Ok(Expr::GetIndexedField(GetIndexedField::new( - Box::new(expr), - plan_key(key)?, - ))) -} - // #[cfg(test)] // mod tests { // use super::*; diff --git a/crates/datafusion_ext/src/planner/set_expr.rs b/crates/datafusion_ext/src/planner/set_expr.rs index 57291afcd..d5dde43c6 100644 --- a/crates/datafusion_ext/src/planner/set_expr.rs +++ b/crates/datafusion_ext/src/planner/set_expr.rs @@ -41,6 +41,16 @@ impl<'a, S: AsyncContextProvider> SqlQueryPlanner<'a, S> { let all = match set_quantifier { SetQuantifier::All => true, SetQuantifier::Distinct | SetQuantifier::None => false, + SetQuantifier::ByName => { + return Err(DataFusionError::NotImplemented( + "UNION BY NAME not implemented".to_string(), + )); + } + SetQuantifier::AllByName => { + return Err(DataFusionError::NotImplemented( + "UNION ALL BY NAME not implemented".to_string(), + )) + } }; let left_plan = self.set_expr_to_plan(*left, planner_context).await?; diff --git a/crates/datafusion_ext/src/planner/utils.rs b/crates/datafusion_ext/src/planner/utils.rs index 53ac9fcfb..0389eb0b0 100644 --- a/crates/datafusion_ext/src/planner/utils.rs +++ b/crates/datafusion_ext/src/planner/utils.rs @@ -19,27 +19,24 @@ use datafusion::arrow::datatypes::{DataType, DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE}; +use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::common::{DataFusionError, Result, ScalarValue}; -use datafusion::logical_expr::expr::{ - AggregateFunction, AggregateUDF, Between, BinaryExpr, Case, GetIndexedField, GroupingSet, - InList, InSubquery, Like, Placeholder, ScalarFunction, ScalarUDF, WindowFunction, -}; -use datafusion::logical_expr::expr::{Cast, Sort}; +use datafusion::logical_expr::expr::{GroupingSet, WindowFunction}; use datafusion::logical_expr::utils::{expr_as_column_expr, find_column_exprs}; -use datafusion::logical_expr::{expr::Alias, Expr, LogicalPlan, TryCast}; +use datafusion::logical_expr::{expr::Alias, Expr, LogicalPlan}; use std::collections::HashMap; /// Make a best-effort attempt at resolving all columns in the expression tree pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { - clone_with_replacement(expr, &|nested_expr| { + expr.clone().transform_up(&|nested_expr| { match nested_expr { Expr::Column(col) => { - let field = plan.schema().field_from_column(col)?; - Ok(Some(Expr::Column(field.qualified_column()))) + let field = plan.schema().field_from_column(&col)?; + Ok(Transformed::Yes(Expr::Column(field.qualified_column()))) } _ => { // keep recursing - Ok(None) + Ok(Transformed::No(nested_expr)) } } }) @@ -60,11 +57,11 @@ pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { /// individual columns `a` and `b`, but rather it is a projection against the /// `a + b` found in the GROUP BY. pub(crate) fn rebase_expr(expr: &Expr, base_exprs: &[Expr], plan: &LogicalPlan) -> Result { - clone_with_replacement(expr, &|nested_expr| { - if base_exprs.contains(nested_expr) { - Ok(Some(expr_as_column_expr(nested_expr, plan)?)) + expr.clone().transform_up(&|nested_expr| { + if base_exprs.contains(&nested_expr) { + Ok(Transformed::Yes(expr_as_column_expr(&nested_expr, plan)?)) } else { - Ok(None) + Ok(Transformed::No(nested_expr)) } }) } @@ -124,295 +121,6 @@ fn check_column_satisfies_expr(columns: &[Expr], expr: &Expr, message_prefix: &s Ok(()) } -/// Returns a cloned `Expr`, but any of the `Expr`'s in the tree may be -/// replaced/customized by the replacement function. -/// -/// The replacement function is called repeatedly with `Expr`, starting with -/// the argument `expr`, then descending depth-first through its -/// descendants. The function chooses to replace or keep (clone) each `Expr`. -/// -/// The function's return type is `Result>>`, where: -/// -/// * `Ok(Some(replacement_expr))`: A replacement `Expr` is provided; it is -/// swapped in at the particular node in the tree. Any nested `Expr` are -/// not subject to cloning/replacement. -/// * `Ok(None)`: A replacement `Expr` is not provided. The `Expr` is -/// recreated, with all of its nested `Expr`'s subject to -/// cloning/replacement. -/// * `Err(err)`: Any error returned by the function is returned as-is by -/// `clone_with_replacement()`. -fn clone_with_replacement(expr: &Expr, replacement_fn: &F) -> Result -where - F: Fn(&Expr) -> Result>, -{ - let replacement_opt = replacement_fn(expr)?; - - match replacement_opt { - // If we were provided a replacement, use the replacement. Do not - // descend further. - Some(replacement) => Ok(replacement), - // No replacement was provided, clone the node and recursively call - // clone_with_replacement() on any nested expressions. - None => { - match expr { - Expr::AggregateFunction(AggregateFunction { - fun, - args, - distinct, - filter, - order_by, - }) => Ok(Expr::AggregateFunction(AggregateFunction::new( - fun.clone(), - args.iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - *distinct, - filter.clone(), - order_by.clone(), - ))), - Expr::WindowFunction(WindowFunction { - fun, - args, - partition_by, - order_by, - window_frame, - }) => Ok(Expr::WindowFunction(WindowFunction::new( - fun.clone(), - args.iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - partition_by - .iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - order_by - .iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - window_frame.clone(), - ))), - Expr::AggregateUDF(AggregateUDF { - fun, - args, - filter, - order_by, - }) => Ok(Expr::AggregateUDF(AggregateUDF::new( - fun.clone(), - args.iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - filter.clone(), - order_by.clone(), - ))), - Expr::Alias(Alias { expr, name, .. }) => Ok(Expr::Alias(Alias::new( - clone_with_replacement(expr, replacement_fn)?, - name.clone(), - ))), - Expr::Between(Between { - expr, - negated, - low, - high, - }) => Ok(Expr::Between(Between::new( - Box::new(clone_with_replacement(expr, replacement_fn)?), - *negated, - Box::new(clone_with_replacement(low, replacement_fn)?), - Box::new(clone_with_replacement(high, replacement_fn)?), - ))), - Expr::InList(InList { - expr: nested_expr, - list, - negated, - }) => Ok(Expr::InList(InList::new( - Box::new(clone_with_replacement(nested_expr, replacement_fn)?), - list.iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - *negated, - ))), - Expr::BinaryExpr(BinaryExpr { left, right, op }) => { - Ok(Expr::BinaryExpr(BinaryExpr::new( - Box::new(clone_with_replacement(left, replacement_fn)?), - *op, - Box::new(clone_with_replacement(right, replacement_fn)?), - ))) - } - Expr::Like(Like { - negated, - expr, - pattern, - escape_char, - case_insensitive, - }) => Ok(Expr::Like(Like::new( - *negated, - Box::new(clone_with_replacement(expr, replacement_fn)?), - Box::new(clone_with_replacement(pattern, replacement_fn)?), - *escape_char, - *case_insensitive, - ))), - Expr::SimilarTo(Like { - negated, - expr, - pattern, - escape_char, - case_insensitive, - }) => Ok(Expr::SimilarTo(Like::new( - *negated, - Box::new(clone_with_replacement(expr, replacement_fn)?), - Box::new(clone_with_replacement(pattern, replacement_fn)?), - *escape_char, - *case_insensitive, - ))), - Expr::Case(case) => Ok(Expr::Case(Case::new( - match &case.expr { - Some(case_expr) => { - Some(Box::new(clone_with_replacement(case_expr, replacement_fn)?)) - } - None => None, - }, - case.when_then_expr - .iter() - .map(|(a, b)| { - Ok(( - Box::new(clone_with_replacement(a, replacement_fn)?), - Box::new(clone_with_replacement(b, replacement_fn)?), - )) - }) - .collect::>>()?, - match &case.else_expr { - Some(else_expr) => { - Some(Box::new(clone_with_replacement(else_expr, replacement_fn)?)) - } - None => None, - }, - ))), - Expr::ScalarFunction(ScalarFunction { fun, args }) => { - Ok(Expr::ScalarFunction(ScalarFunction::new( - *fun, - args.iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - ))) - } - Expr::ScalarUDF(ScalarUDF { fun, args }) => Ok(Expr::ScalarUDF(ScalarUDF::new( - fun.clone(), - args.iter() - .map(|arg| clone_with_replacement(arg, replacement_fn)) - .collect::>>()?, - ))), - Expr::Negative(nested_expr) => Ok(Expr::Negative(Box::new( - clone_with_replacement(nested_expr, replacement_fn)?, - ))), - Expr::Not(nested_expr) => Ok(Expr::Not(Box::new(clone_with_replacement( - nested_expr, - replacement_fn, - )?))), - Expr::IsNotNull(nested_expr) => Ok(Expr::IsNotNull(Box::new( - clone_with_replacement(nested_expr, replacement_fn)?, - ))), - Expr::IsNull(nested_expr) => Ok(Expr::IsNull(Box::new(clone_with_replacement( - nested_expr, - replacement_fn, - )?))), - Expr::IsTrue(nested_expr) => Ok(Expr::IsTrue(Box::new(clone_with_replacement( - nested_expr, - replacement_fn, - )?))), - Expr::IsFalse(nested_expr) => Ok(Expr::IsFalse(Box::new(clone_with_replacement( - nested_expr, - replacement_fn, - )?))), - Expr::IsUnknown(nested_expr) => Ok(Expr::IsUnknown(Box::new( - clone_with_replacement(nested_expr, replacement_fn)?, - ))), - Expr::IsNotTrue(nested_expr) => Ok(Expr::IsNotTrue(Box::new( - clone_with_replacement(nested_expr, replacement_fn)?, - ))), - Expr::IsNotFalse(nested_expr) => Ok(Expr::IsNotFalse(Box::new( - clone_with_replacement(nested_expr, replacement_fn)?, - ))), - Expr::IsNotUnknown(nested_expr) => Ok(Expr::IsNotUnknown(Box::new( - clone_with_replacement(nested_expr, replacement_fn)?, - ))), - Expr::Cast(Cast { expr, data_type }) => Ok(Expr::Cast(Cast::new( - Box::new(clone_with_replacement(expr, replacement_fn)?), - data_type.clone(), - ))), - Expr::TryCast(TryCast { - expr: nested_expr, - data_type, - }) => Ok(Expr::TryCast(TryCast::new( - Box::new(clone_with_replacement(nested_expr, replacement_fn)?), - data_type.clone(), - ))), - Expr::Sort(Sort { - expr: nested_expr, - asc, - nulls_first, - }) => Ok(Expr::Sort(Sort::new( - Box::new(clone_with_replacement(nested_expr, replacement_fn)?), - *asc, - *nulls_first, - ))), - Expr::Column { .. } - | Expr::OuterReferenceColumn(_, _) - | Expr::Literal(_) - | Expr::ScalarVariable(_, _) - | Expr::Exists { .. } - | Expr::ScalarSubquery(_) => Ok(expr.clone()), - Expr::InSubquery(InSubquery { - expr: nested_expr, - subquery, - negated, - }) => Ok(Expr::InSubquery(InSubquery::new( - Box::new(clone_with_replacement(nested_expr, replacement_fn)?), - subquery.clone(), - *negated, - ))), - Expr::Wildcard => Ok(Expr::Wildcard), - Expr::QualifiedWildcard { .. } => Ok(expr.clone()), - Expr::GetIndexedField(GetIndexedField { key, expr }) => { - Ok(Expr::GetIndexedField(GetIndexedField::new( - Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?), - key.clone(), - ))) - } - Expr::GroupingSet(set) => match set { - GroupingSet::Rollup(exprs) => Ok(Expr::GroupingSet(GroupingSet::Rollup( - exprs - .iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - ))), - GroupingSet::Cube(exprs) => Ok(Expr::GroupingSet(GroupingSet::Cube( - exprs - .iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - ))), - GroupingSet::GroupingSets(lists_of_exprs) => { - let mut new_lists_of_exprs = vec![]; - for exprs in lists_of_exprs { - new_lists_of_exprs.push( - exprs - .iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - ); - } - Ok(Expr::GroupingSet(GroupingSet::GroupingSets( - new_lists_of_exprs, - ))) - } - }, - Expr::Placeholder(Placeholder { id, data_type }) => Ok(Expr::Placeholder( - Placeholder::new(id.clone(), data_type.clone()), - )), - } - } - } -} - /// Returns mapping of each alias (`String`) to the expression (`Expr`) it is /// aliasing. pub(crate) fn extract_aliases(exprs: &[Expr]) -> HashMap { @@ -452,15 +160,15 @@ pub(crate) fn resolve_aliases_to_exprs( expr: &Expr, aliases: &HashMap, ) -> Result { - clone_with_replacement(expr, &|nested_expr| match nested_expr { + expr.clone().transform_up(&|nested_expr| match nested_expr { Expr::Column(c) if c.relation.is_none() => { if let Some(aliased_expr) = aliases.get(&c.name) { - Ok(Some(aliased_expr.clone())) + Ok(Transformed::Yes(aliased_expr.clone())) } else { - Ok(None) + Ok(Transformed::No(Expr::Column(c))) } } - _ => Ok(None), + _ => Ok(Transformed::No(nested_expr)), }) } diff --git a/crates/datasources/src/common/sink/csv.rs b/crates/datasources/src/common/sink/csv.rs index d48c4eca5..023616a5d 100644 --- a/crates/datasources/src/common/sink/csv.rs +++ b/crates/datasources/src/common/sink/csv.rs @@ -91,13 +91,18 @@ impl CsvSink { impl DataSink for CsvSink { async fn write_all( &self, - stream: SendableRecordBatchStream, + data: Vec, _context: &Arc, ) -> DfResult { - self.stream_into_inner(stream) - .await - .map(|x| x as u64) - .map_err(|e| DataFusionError::External(Box::new(e))) + let mut count = 0; + for stream in data { + count += self + .stream_into_inner(stream) + .await + .map(|x| x as u64) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + Ok(count) } } diff --git a/crates/datasources/src/common/sink/json.rs b/crates/datasources/src/common/sink/json.rs index b096c809e..2da84400f 100644 --- a/crates/datasources/src/common/sink/json.rs +++ b/crates/datasources/src/common/sink/json.rs @@ -93,13 +93,18 @@ impl JsonSink { impl DataSink for JsonSink { async fn write_all( &self, - data: SendableRecordBatchStream, + data: Vec, _context: &Arc, ) -> DfResult { - self.stream_into_inner(data) - .await - .map(|x| x as u64) - .map_err(|e| DataFusionError::External(Box::new(e))) + let mut count = 0; + for stream in data { + count += self + .stream_into_inner(stream) + .await + .map(|x| x as u64) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + Ok(count) } } diff --git a/crates/datasources/src/common/sink/parquet.rs b/crates/datasources/src/common/sink/parquet.rs index 9b28bb45c..012542775 100644 --- a/crates/datasources/src/common/sink/parquet.rs +++ b/crates/datasources/src/common/sink/parquet.rs @@ -88,9 +88,13 @@ impl ParquetSink { impl DataSink for ParquetSink { async fn write_all( &self, - stream: SendableRecordBatchStream, + data: Vec, _context: &Arc, ) -> DfResult { - self.stream_into_inner(stream).await.map(|x| x as u64) + let mut count = 0; + for stream in data { + count += self.stream_into_inner(stream).await.map(|x| x as u64)?; + } + Ok(count) } } diff --git a/crates/datasources/src/native/access.rs b/crates/datasources/src/native/access.rs index 98772a3af..f61f55d45 100644 --- a/crates/datasources/src/native/access.rs +++ b/crates/datasources/src/native/access.rs @@ -290,6 +290,7 @@ impl TableProvider for NativeTable { &self, _state: &SessionState, input: Arc, + _overwrite: bool, ) -> DataFusionResult> { Ok(self.insert_exec(input)) } diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index bf1f53620..fd984d836 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; -use datafusion::datasource::file_format::file_type::FileType; +use datafusion::common::FileType; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::datasource::TableProvider; diff --git a/crates/pgrepr/src/writer.rs b/crates/pgrepr/src/writer.rs index 71d31f41e..90dd873b4 100644 --- a/crates/pgrepr/src/writer.rs +++ b/crates/pgrepr/src/writer.rs @@ -162,7 +162,7 @@ impl Writer for BinaryWriter { } fn write_timestamptz(buf: &mut BytesMut, v: &DateTime) -> Result<()> { - let utc_date_time = DateTime::::from_utc(v.naive_utc(), Utc); + let utc_date_time = DateTime::::from_naive_utc_and_offset(v.naive_utc(), Utc); put_to_sql!(buf, TIMESTAMPTZ, utc_date_time) } diff --git a/crates/sqlbuiltins/src/functions/object_store.rs b/crates/sqlbuiltins/src/functions/object_store.rs index 45d8458bd..9764e6cd7 100644 --- a/crates/sqlbuiltins/src/functions/object_store.rs +++ b/crates/sqlbuiltins/src/functions/object_store.rs @@ -2,9 +2,8 @@ use std::collections::HashMap; use std::{sync::Arc, vec}; use async_trait::async_trait; -use datafusion::common::OwnedTableReference; +use datafusion::common::{FileCompressionType, FileType, OwnedTableReference}; use datafusion::datasource::file_format::csv::CsvFormat; -use datafusion::datasource::file_format::file_type::{FileCompressionType, FileType}; use datafusion::datasource::file_format::json::JsonFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index 5c0403cdd..0e854210e 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -1,8 +1,8 @@ use std::str::FromStr; use std::sync::Arc; +use datafusion::common::{FileCompressionType, FileType}; use datafusion::datasource::file_format::csv::CsvFormat; -use datafusion::datasource::file_format::file_type::{FileCompressionType, FileType}; use datafusion::datasource::file_format::json::JsonFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; diff --git a/crates/sqlexec/src/parser/options.rs b/crates/sqlexec/src/parser/options.rs index 0a8e59747..d73184e3e 100644 --- a/crates/sqlexec/src/parser/options.rs +++ b/crates/sqlexec/src/parser/options.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeMap, fmt}; use datafusion::{ - common::parsers::CompressionTypeVariant, datasource::file_format::file_type::FileType, + common::{parsers::CompressionTypeVariant, FileType}, sql::sqlparser::parser::ParserError, }; use datasources::{debug::DebugTableType, mongodb::MongoProtocol}; diff --git a/crates/sqlexec/src/planner/physical_plan/copy_to.rs b/crates/sqlexec/src/planner/physical_plan/copy_to.rs index 74d87153e..b5b70eb10 100644 --- a/crates/sqlexec/src/planner/physical_plan/copy_to.rs +++ b/crates/sqlexec/src/planner/physical_plan/copy_to.rs @@ -124,7 +124,7 @@ impl CopyToExec { }; let stream = execute_stream(self.source, context.clone())?; - let count = sink.write_all(stream, &context).await?; + let count = sink.write_all(vec![stream], &context).await?; Ok(new_operation_with_count_batch("copy", count)) } diff --git a/crates/sqlexec/src/planner/physical_plan/create_temp_table.rs b/crates/sqlexec/src/planner/physical_plan/create_temp_table.rs index d22a8a0cf..20512b561 100644 --- a/crates/sqlexec/src/planner/physical_plan/create_temp_table.rs +++ b/crates/sqlexec/src/planner/physical_plan/create_temp_table.rs @@ -121,7 +121,7 @@ async fn create_temp_table( let state = SessionState::with_config_rt(context.session_config().clone(), context.runtime_env()); - let exec = table.insert_into(&state, source).await?; + let exec = table.insert_into(&state, source, false).await?; let mut stream = exec.execute(0, context)?; // Drain stream to write everything. diff --git a/crates/sqlexec/src/planner/physical_plan/insert.rs b/crates/sqlexec/src/planner/physical_plan/insert.rs index 9223db034..f100b7358 100644 --- a/crates/sqlexec/src/planner/physical_plan/insert.rs +++ b/crates/sqlexec/src/planner/physical_plan/insert.rs @@ -146,7 +146,7 @@ impl InsertExec { let state = SessionState::with_config_rt(context.session_config().clone(), context.runtime_env()); - let exec = table.insert_into(&state, source).await?; + let exec = table.insert_into(&state, source, false).await?; let mut stream = exec.execute(0, context)?; diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index ee62c9547..9f0392258 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -5,8 +5,7 @@ use datafusion::arrow::datatypes::{ DataType, Field, Schema, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, }; use datafusion::common::parsers::CompressionTypeVariant; -use datafusion::common::{OwnedSchemaReference, OwnedTableReference, ToDFSchema}; -use datafusion::datasource::file_format::file_type::FileType; +use datafusion::common::{FileType, OwnedSchemaReference, OwnedTableReference, ToDFSchema}; use datafusion::logical_expr::{cast, col, LogicalPlanBuilder}; use datafusion::sql::planner::{object_name_to_table_reference, IdentNormalizer, PlannerContext}; use datafusion::sql::sqlparser::ast::AlterTableOperation; diff --git a/crates/sqlexec/src/remote/local_side.rs b/crates/sqlexec/src/remote/local_side.rs index cddd2d2f9..81a4754f4 100644 --- a/crates/sqlexec/src/remote/local_side.rs +++ b/crates/sqlexec/src/remote/local_side.rs @@ -137,8 +137,9 @@ impl TableProvider for LocalSideTableProvider { &self, state: &SessionState, input: Arc, + overwrite: bool, ) -> Result> { - let plan = self.inner.insert_into(state, input).await?; + let plan = self.inner.insert_into(state, input, overwrite).await?; Ok(plan) } } diff --git a/crates/sqlexec/src/remote/table.rs b/crates/sqlexec/src/remote/table.rs index c8b50329b..9d36cc1c5 100644 --- a/crates/sqlexec/src/remote/table.rs +++ b/crates/sqlexec/src/remote/table.rs @@ -87,6 +87,7 @@ impl TableProvider for StubRemoteTableProvider { &self, _state: &SessionState, _input: Arc, + _overwrite: bool, ) -> DfResult> { Err(DataFusionError::NotImplemented( "insert_into called on a stub provider".to_string(),