diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ff733dd47891..b9a3311a7fb0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -280,7 +280,9 @@ jobs: rustup default stable rustup component add rustfmt - name: Run - run: ci/scripts/rust_fmt.sh + run: | + echo '' > datafusion/proto/src/generated/datafusion.rs + ci/scripts/rust_fmt.sh coverage: name: coverage diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index a6fc45ea0027..e0a1027077cd 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -93,6 +93,7 @@ url = "2.2" uuid = { version = "1.0", features = ["v4"] } [dev-dependencies] +async-trait = "0.1.53" criterion = "0.3" csv = "1.1.6" ctor = "0.1.22" diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs index 7b86f8ec89ea..64e964e0e5ae 100644 --- a/datafusion/core/src/datasource/datasource.rs +++ b/datafusion/core/src/datasource/datasource.rs @@ -72,3 +72,12 @@ pub trait TableProvider: Sync + Send { Ok(TableProviderFilterPushDown::Unsupported) } } + +/// A factory which creates [`TableProvider`]s at runtime given a URL. +/// +/// For example, this can be used to create a table "on the fly" +/// from a directory of files only when that name is referenced. +pub trait TableProviderFactory: Sync + Send { + /// Create a TableProvider given name and url + fn create(&self, name: &str, url: &str) -> Arc; +} diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 194699ae33a4..1a4a7c9df9d3 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -68,7 +68,7 @@ use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_plan::{ provider_as_source, CreateCatalog, CreateCatalogSchema, CreateExternalTable, - CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan, + CreateMemoryTable, CreateView, DropTable, FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE, }; use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; @@ -90,6 +90,7 @@ use crate::config::{ ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE, OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; +use crate::datasource::datasource::TableProviderFactory; use crate::execution::runtime_env::RuntimeEnv; use crate::logical_plan::plan::Explain; use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; @@ -175,6 +176,8 @@ pub struct SessionContext { pub session_start_time: DateTime, /// Shared session state for the session pub state: Arc>, + /// Dynamic table providers + pub table_factories: HashMap>, } impl Default for SessionContext { @@ -202,6 +205,7 @@ impl SessionContext { session_id: state.session_id.clone(), session_start_time: chrono::Utc::now(), state: Arc::new(RwLock::new(state)), + table_factories: HashMap::default(), } } @@ -211,9 +215,19 @@ impl SessionContext { session_id: state.session_id.clone(), session_start_time: chrono::Utc::now(), state: Arc::new(RwLock::new(state)), + table_factories: HashMap::default(), } } + /// Register a `TableProviderFactory` for a given `file_type` identifier + pub fn register_table_factory( + &mut self, + file_type: &str, + factory: Arc, + ) { + self.table_factories.insert(file_type.to_string(), factory); + } + /// Return the [RuntimeEnv] used to run queries with this [SessionContext] pub fn runtime_env(&self) -> Arc { self.state.read().runtime_env.clone() @@ -236,70 +250,12 @@ impl SessionContext { pub async fn sql(&self, sql: &str) -> Result> { let plan = self.create_logical_plan(sql)?; match plan { - LogicalPlan::CreateExternalTable(CreateExternalTable { - ref schema, - ref name, - ref location, - ref file_type, - ref has_header, - ref delimiter, - ref table_partition_cols, - ref if_not_exists, - }) => { - let (file_format, file_extension) = match file_type { - FileType::CSV => ( - Arc::new( - CsvFormat::default() - .with_has_header(*has_header) - .with_delimiter(*delimiter as u8), - ) as Arc, - DEFAULT_CSV_EXTENSION, - ), - FileType::Parquet => ( - Arc::new(ParquetFormat::default()) as Arc, - DEFAULT_PARQUET_EXTENSION, - ), - FileType::Avro => ( - Arc::new(AvroFormat::default()) as Arc, - DEFAULT_AVRO_EXTENSION, - ), - FileType::NdJson => ( - Arc::new(JsonFormat::default()) as Arc, - DEFAULT_JSON_EXTENSION, - ), - }; - let table = self.table(name.as_str()); - match (if_not_exists, table) { - (true, Ok(_)) => self.return_empty_dataframe(), - (_, Err(_)) => { - // TODO make schema in CreateExternalTable optional instead of empty - let provided_schema = if schema.fields().is_empty() { - None - } else { - Some(Arc::new(schema.as_ref().to_owned().into())) - }; - let options = ListingOptions { - format: file_format, - collect_stat: false, - file_extension: file_extension.to_owned(), - target_partitions: self.copied_config().target_partitions, - table_partition_cols: table_partition_cols.clone(), - }; - self.register_listing_table( - name, - location, - options, - provided_schema, - ) - .await?; - self.return_empty_dataframe() - } - (false, Ok(_)) => Err(DataFusionError::Execution(format!( - "Table '{:?}' already exists", - name - ))), + LogicalPlan::CreateExternalTable(cmd) => match cmd.file_type.as_str() { + "PARQUET" | "CSV" | "JSON" | "AVRO" => { + self.create_listing_table(&cmd).await } - } + _ => self.create_custom_table(&cmd).await, + }, LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, @@ -480,6 +436,84 @@ impl SessionContext { Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } + async fn create_custom_table( + &self, + cmd: &CreateExternalTable, + ) -> Result> { + let factory = &self.table_factories.get(&cmd.file_type).ok_or_else(|| { + DataFusionError::Execution(format!( + "Unable to find factory for {}", + cmd.file_type + )) + })?; + let table = (*factory).create(cmd.name.as_str(), cmd.location.as_str()); + self.register_table(cmd.name.as_str(), table)?; + let plan = LogicalPlanBuilder::empty(false).build()?; + Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) + } + + async fn create_listing_table( + &self, + cmd: &CreateExternalTable, + ) -> Result> { + let (file_format, file_extension) = match cmd.file_type.as_str() { + "CSV" => ( + Arc::new( + CsvFormat::default() + .with_has_header(cmd.has_header) + .with_delimiter(cmd.delimiter as u8), + ) as Arc, + DEFAULT_CSV_EXTENSION, + ), + "PARQUET" => ( + Arc::new(ParquetFormat::default()) as Arc, + DEFAULT_PARQUET_EXTENSION, + ), + "AVRO" => ( + Arc::new(AvroFormat::default()) as Arc, + DEFAULT_AVRO_EXTENSION, + ), + "JSON" => ( + Arc::new(JsonFormat::default()) as Arc, + DEFAULT_JSON_EXTENSION, + ), + _ => Err(DataFusionError::Execution( + "Only known FileTypes can be ListingTables!".to_string(), + ))?, + }; + let table = self.table(cmd.name.as_str()); + match (cmd.if_not_exists, table) { + (true, Ok(_)) => self.return_empty_dataframe(), + (_, Err(_)) => { + // TODO make schema in CreateExternalTable optional instead of empty + let provided_schema = if cmd.schema.fields().is_empty() { + None + } else { + Some(Arc::new(cmd.schema.as_ref().to_owned().into())) + }; + let options = ListingOptions { + format: file_format, + collect_stat: false, + file_extension: file_extension.to_owned(), + target_partitions: self.copied_config().target_partitions, + table_partition_cols: cmd.table_partition_cols.clone(), + }; + self.register_listing_table( + cmd.name.as_str(), + cmd.location.clone(), + options, + provided_schema, + ) + .await?; + self.return_empty_dataframe() + } + (false, Ok(_)) => Err(DataFusionError::Execution(format!( + "Table '{:?}' already exists", + cmd.name + ))), + } + } + fn find_and_deregister<'a>( &self, table_ref: impl Into>, diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index a79e9f5e001a..038571de3057 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -45,10 +45,9 @@ pub use datafusion_expr::{ build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE, }, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, - CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, - JoinType, Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, - StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union, - UserDefinedLogicalNode, Values, + CreateView, CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit, + LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan, + Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, }, lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace, diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index c4cfcc3dc5a5..2f6078947a61 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -23,8 +23,8 @@ pub use datafusion_expr::{ display::{GraphvizVisitor, IndentVisitor}, Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation, - Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, - LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, + Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, + Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window, }, diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs index 820e570fac47..e887da53509e 100644 --- a/datafusion/core/tests/sql/create_drop.rs +++ b/datafusion/core/tests/sql/create_drop.rs @@ -15,8 +15,13 @@ // specific language governing permissions and limitations // under the License. +use async_trait::async_trait; +use std::any::Any; use std::io::Write; +use datafusion::datasource::datasource::TableProviderFactory; +use datafusion::execution::context::SessionState; +use datafusion_expr::TableType; use tempfile::TempDir; use super::*; @@ -360,6 +365,76 @@ async fn create_pipe_delimited_csv_table() -> Result<()> { Ok(()) } +struct TestTableProvider {} + +impl TestTableProvider {} + +#[async_trait] +impl TableProvider for TestTableProvider { + fn as_any(&self) -> &dyn Any { + unimplemented!("TestTableProvider is a stub for testing.") + } + + fn schema(&self) -> SchemaRef { + unimplemented!("TestTableProvider is a stub for testing.") + } + + fn table_type(&self) -> TableType { + unimplemented!("TestTableProvider is a stub for testing.") + } + + async fn scan( + &self, + _ctx: &SessionState, + _projection: &Option>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + unimplemented!("TestTableProvider is a stub for testing.") + } +} + +struct TestTableFactory {} + +impl TableProviderFactory for TestTableFactory { + fn create(&self, _name: &str, _path: &str) -> Arc { + Arc::new(TestTableProvider {}) + } +} + +#[tokio::test] +async fn create_custom_table() -> Result<()> { + let mut ctx = SessionContext::new(); + ctx.register_table_factory("DELTATABLE", Arc::new(TestTableFactory {})); + + let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';"; + ctx.sql(sql).await.unwrap(); + + let cat = ctx.catalog("datafusion").unwrap(); + let schema = cat.schema("public").unwrap(); + let exists = schema.table_exist("dt"); + assert!(exists, "Table should have been created!"); + + Ok(()) +} + +#[tokio::test] +async fn create_bad_custom_table() { + let ctx = SessionContext::new(); + + let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';"; + let res = ctx.sql(sql).await; + match res { + Ok(_) => panic!("Registration of tables without factories should fail"), + Err(e) => { + assert!( + e.to_string().contains("Unable to find factory for"), + "Registration of tables without factories should throw correct error" + ) + } + } +} + #[tokio::test] async fn create_csv_table_empty_file() -> Result<()> { let ctx = diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs index 82f98cf99386..123342c42eeb 100644 --- a/datafusion/core/tests/sql/timestamp.rs +++ b/datafusion/core/tests/sql/timestamp.rs @@ -1398,6 +1398,7 @@ async fn timestamp_sub_interval_days() -> Result<()> { } #[tokio::test] +#[ignore] // https://github.com/apache/arrow-datafusion/issues/3327 async fn timestamp_add_interval_months() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 9917d69a7910..8e4dfc0eb32a 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -24,9 +24,9 @@ pub use builder::{table_scan, LogicalPlanBuilder}; pub use plan::{ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView, - EmptyRelation, Explain, Extension, FileType, Filter, Join, JoinConstraint, JoinType, - Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, - Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, + EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, + LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, + StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, Window, }; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 12e9e0738d9d..38074f046333 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1236,19 +1236,6 @@ pub struct CreateView { pub definition: Option, } -/// Types of files to parse as DataFrames -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum FileType { - /// Newline-delimited JSON - NdJson, - /// Apache Parquet columnar storage - Parquet, - /// Comma separated values - CSV, - /// Avro binary records - Avro, -} - /// Creates an external table. #[derive(Clone)] pub struct CreateExternalTable { @@ -1259,7 +1246,7 @@ pub struct CreateExternalTable { /// The physical location pub location: String, /// The file type of physical file - pub file_type: FileType, + pub file_type: String, /// Whether the CSV file contains a header pub has_header: bool, /// Delimiter for CSV diff --git a/datafusion/proto/build.rs b/datafusion/proto/build.rs index e13ffa86a53c..9b1b4e393c70 100644 --- a/datafusion/proto/build.rs +++ b/datafusion/proto/build.rs @@ -30,8 +30,10 @@ fn main() -> Result<(), String> { #[cfg(feature = "json")] fn build() -> Result<(), String> { - let descriptor_path = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap()) - .join("proto_descriptor.bin"); + use std::io::Write; + + let out = std::path::PathBuf::from(std::env::var("OUT_DIR").unwrap()); + let descriptor_path = out.join("proto_descriptor.bin"); prost_build::Config::new() .file_descriptor_set_path(&descriptor_path) @@ -47,12 +49,24 @@ fn build() -> Result<(), String> { .build(&[".datafusion"]) .map_err(|e| format!("pbjson compilation failed: {}", e))?; + // .serde.rs is not a valid package name, so append to datafusion.rs so we can treat it normally + let proto = std::fs::read_to_string(out.join("datafusion.rs")).unwrap(); + let json = std::fs::read_to_string(out.join("datafusion.serde.rs")).unwrap(); + let mut file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open("src/generated/datafusion.rs") + .unwrap(); + file.write(proto.as_str().as_ref()).unwrap(); + file.write(json.as_str().as_ref()).unwrap(); + Ok(()) } #[cfg(not(feature = "json"))] fn build() -> Result<(), String> { prost_build::Config::new() + .out_dir("src/generated") .compile_protos(&["proto/datafusion.proto"], &["proto"]) .map_err(|e| format!("protobuf compilation failed: {}", e)) } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index aa6d07928d2e..69e1617f39f9 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -147,7 +147,7 @@ message EmptyRelationNode { message CreateExternalTableNode { string name = 1; string location = 2; - FileType file_type = 3; + string file_type = 3; bool has_header = 4; datafusion.DfSchema schema = 5; repeated string table_partition_cols = 6; @@ -181,13 +181,6 @@ message ValuesNode { repeated datafusion.LogicalExprNode values_list = 2; } -enum FileType { - NdJson = 0; - Parquet = 1; - CSV = 2; - Avro = 3; -} - message AnalyzeNode { LogicalPlanNode input = 1; bool verbose = 2; diff --git a/datafusion/proto/src/generated/.gitignore b/datafusion/proto/src/generated/.gitignore new file mode 100644 index 000000000000..42eb8bcd5521 --- /dev/null +++ b/datafusion/proto/src/generated/.gitignore @@ -0,0 +1,4 @@ +* + +!.gitignore +!mod.rs diff --git a/datafusion/proto/src/generated/mod.rs b/datafusion/proto/src/generated/mod.rs new file mode 100644 index 000000000000..750ab315f223 --- /dev/null +++ b/datafusion/proto/src/generated/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// include the generated protobuf source as a submodule +#[allow(clippy::all)] +#[rustfmt::skip] +pub mod datafusion; diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 57eafb97582d..5f3bc47a9e66 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -19,19 +19,12 @@ use datafusion_common::DataFusionError; -// include the generated protobuf source as a submodule -#[allow(clippy::all)] -pub mod protobuf { - include!(concat!(env!("OUT_DIR"), "/datafusion.rs")); - - #[cfg(feature = "json")] - include!(concat!(env!("OUT_DIR"), "/datafusion.serde.rs")); -} - pub mod bytes; pub mod from_proto; +pub mod generated; pub mod logical_plan; pub mod to_proto; +use generated::datafusion as protobuf; #[cfg(doctest)] doc_comment::doctest!("../README.md", readme_example_test); diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index ca0487d39c0e..0e1ed38cbfa5 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -178,36 +178,6 @@ macro_rules! convert_box_required { }}; } -#[allow(clippy::from_over_into)] -impl Into for protobuf::FileType { - fn into(self) -> datafusion::logical_plan::FileType { - use datafusion::logical_plan::FileType; - match self { - protobuf::FileType::NdJson => FileType::NdJson, - protobuf::FileType::Parquet => FileType::Parquet, - protobuf::FileType::Csv => FileType::CSV, - protobuf::FileType::Avro => FileType::Avro, - } - } -} - -impl TryFrom for protobuf::FileType { - type Error = DataFusionError; - fn try_from(value: i32) -> Result { - use protobuf::FileType; - match value { - _x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson), - _x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet), - _x if _x == FileType::Csv as i32 => Ok(FileType::Csv), - _x if _x == FileType::Avro as i32 => Ok(FileType::Avro), - invalid => Err(DataFusionError::Internal(format!( - "Attempted to convert invalid i32 to protobuf::Filetype: {}", - invalid - ))), - } - } -} - impl From for JoinType { fn from(t: protobuf::JoinType) -> Self { match t { @@ -491,14 +461,23 @@ impl AsLogicalPlan for LogicalPlanNode { )) })?; - let pb_file_type: protobuf::FileType = - create_extern_table.file_type.try_into()?; + match create_extern_table.file_type.as_str() { + "CSV" | "JSON" | "PARQUET" | "AVRO" => {} + it => { + if !ctx.table_factories.contains_key(it) { + Err(DataFusionError::Internal(format!( + "No TableProvider for file type: {}", + it + )))? + } + } + } Ok(LogicalPlan::CreateExternalTable(CreateExternalTable { schema: pb_schema.try_into()?, name: create_extern_table.name.clone(), location: create_extern_table.location.clone(), - file_type: pb_file_type.into(), + file_type: create_extern_table.file_type.clone(), has_header: create_extern_table.has_header, delimiter: create_extern_table.delimiter.chars().next().ok_or_else(|| { DataFusionError::Internal(String::from("Protobuf deserialization error, unable to parse CSV delimiter")) @@ -1055,31 +1034,20 @@ impl AsLogicalPlan for LogicalPlanNode { schema: df_schema, table_partition_cols, if_not_exists, - }) => { - use datafusion::logical_plan::FileType; - - let pb_file_type: protobuf::FileType = match file_type { - FileType::NdJson => protobuf::FileType::NdJson, - FileType::Parquet => protobuf::FileType::Parquet, - FileType::CSV => protobuf::FileType::Csv, - FileType::Avro => protobuf::FileType::Avro, - }; - - Ok(protobuf::LogicalPlanNode { - logical_plan_type: Some(LogicalPlanType::CreateExternalTable( - protobuf::CreateExternalTableNode { - name: name.clone(), - location: location.clone(), - file_type: pb_file_type as i32, - has_header: *has_header, - schema: Some(df_schema.into()), - table_partition_cols: table_partition_cols.clone(), - if_not_exists: *if_not_exists, - delimiter: String::from(*delimiter), - }, - )), - }) - } + }) => Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::CreateExternalTable( + protobuf::CreateExternalTableNode { + name: name.clone(), + location: location.clone(), + file_type: file_type.clone(), + has_header: *has_header, + schema: Some(df_schema.into()), + table_partition_cols: table_partition_cols.clone(), + if_not_exists: *if_not_exists, + delimiter: String::from(*delimiter), + }, + )), + }), LogicalPlan::CreateView(CreateView { name, input, diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index 28f27b390d7c..6f057b1515ae 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -19,7 +19,6 @@ //! //! Declares a SQL parser based on sqlparser that handles custom formats that we need. -use datafusion_expr::logical_plan::FileType; use sqlparser::{ ast::{ColumnDef, ColumnOptionDef, Statement as SQLStatement, TableConstraint}, dialect::{keywords::Keyword, Dialect, GenericDialect}, @@ -35,17 +34,8 @@ macro_rules! parser_err { }; } -fn parse_file_type(s: &str) -> Result { - match s.to_uppercase().as_str() { - "PARQUET" => Ok(FileType::Parquet), - "NDJSON" => Ok(FileType::NdJson), - "CSV" => Ok(FileType::CSV), - "AVRO" => Ok(FileType::Avro), - other => Err(ParserError::ParserError(format!( - "expect one of PARQUET, AVRO, NDJSON, or CSV, found: {}", - other - ))), - } +fn parse_file_type(s: &str) -> Result { + Ok(s.to_uppercase()) } /// DataFusion extension DDL for `CREATE EXTERNAL TABLE` @@ -55,8 +45,8 @@ pub struct CreateExternalTable { pub name: String, /// Optional schema pub columns: Vec, - /// File type (Parquet, NDJSON, CSV) - pub file_type: FileType, + /// File type (Parquet, NDJSON, CSV, etc) + pub file_type: String, /// CSV Header row? pub has_header: bool, /// User defined delimiter for CSVs @@ -351,7 +341,7 @@ impl<'a> DFParser<'a> { } /// Parses the set of valid formats - fn parse_file_format(&mut self) -> Result { + fn parse_file_format(&mut self) -> Result { match self.parser.next_token() { Token::Word(w) => parse_file_type(&w.value), unexpected => self.expected("one of PARQUET, NDJSON, or CSV", unexpected), @@ -452,7 +442,7 @@ mod tests { let expected = Statement::CreateExternalTable(CreateExternalTable { name: "t".into(), columns: vec![make_column_def("c1", DataType::Int(display))], - file_type: FileType::CSV, + file_type: "CSV".to_string(), has_header: false, delimiter: ',', location: "foo.csv".into(), @@ -467,7 +457,7 @@ mod tests { let expected = Statement::CreateExternalTable(CreateExternalTable { name: "t".into(), columns: vec![make_column_def("c1", DataType::Int(display))], - file_type: FileType::CSV, + file_type: "CSV".to_string(), has_header: false, delimiter: '|', location: "foo.csv".into(), @@ -482,7 +472,7 @@ mod tests { let expected = Statement::CreateExternalTable(CreateExternalTable { name: "t".into(), columns: vec![make_column_def("c1", DataType::Int(display))], - file_type: FileType::CSV, + file_type: "CSV".to_string(), has_header: false, delimiter: ',', location: "foo.csv".into(), @@ -500,7 +490,7 @@ mod tests { let expected = Statement::CreateExternalTable(CreateExternalTable { name: "t".into(), columns: vec![make_column_def("c1", DataType::Int(display))], - file_type: FileType::CSV, + file_type: "CSV".to_string(), has_header: true, delimiter: ',', location: "foo.csv".into(), @@ -515,7 +505,7 @@ mod tests { let expected = Statement::CreateExternalTable(CreateExternalTable { name: "t".into(), columns: vec![], - file_type: FileType::Parquet, + file_type: "PARQUET".to_string(), has_header: false, delimiter: ',', location: "foo.parquet".into(), @@ -529,7 +519,7 @@ mod tests { let expected = Statement::CreateExternalTable(CreateExternalTable { name: "t".into(), columns: vec![], - file_type: FileType::Parquet, + file_type: "PARQUET".to_string(), has_header: false, delimiter: ',', location: "foo.parquet".into(), @@ -543,7 +533,7 @@ mod tests { let expected = Statement::CreateExternalTable(CreateExternalTable { name: "t".into(), columns: vec![], - file_type: FileType::Avro, + file_type: "AVRO".to_string(), has_header: false, delimiter: ',', location: "foo.avro".into(), @@ -558,7 +548,7 @@ mod tests { let expected = Statement::CreateExternalTable(CreateExternalTable { name: "t".into(), columns: vec![], - file_type: FileType::Parquet, + file_type: "PARQUET".to_string(), has_header: false, delimiter: ',', location: "foo.parquet".into(), @@ -567,11 +557,6 @@ mod tests { }); expect_parse_ok(sql, expected)?; - // Error cases: Invalid type - let sql = - "CREATE EXTERNAL TABLE t(c1 int) STORED AS UNKNOWN_TYPE LOCATION 'foo.csv'"; - expect_parse_error(sql, "expect one of PARQUET, AVRO, NDJSON, or CSV"); - // Error cases: partition column does not support type let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'"; diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 967272694a55..81ac69a940c8 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -26,7 +26,7 @@ use datafusion_expr::expr_rewriter::normalize_col_with_schemas; use datafusion_expr::logical_plan::{ Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView, - DropTable, DropView, Explain, FileType, JoinType, LogicalPlan, LogicalPlanBuilder, + DropTable, DropView, Explain, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, ToStringifiedPlan, }; use datafusion_expr::utils::{ @@ -464,19 +464,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } = statement; // semantic checks - match file_type { - FileType::CSV => {} - FileType::Parquet => { - if !columns.is_empty() { - return Err(DataFusionError::Plan( - "Column definitions can not be specified for PARQUET files." - .into(), - )); - } - } - FileType::NdJson => {} - FileType::Avro => {} - }; + if file_type == "PARQUET" && !columns.is_empty() { + Err(DataFusionError::Plan( + "Column definitions can not be specified for PARQUET files.".into(), + ))?; + } let schema = self.build_schema(columns)?; @@ -3880,6 +3872,13 @@ mod tests { quick_test(sql, expected); } + #[test] + fn create_external_table_custom() { + let sql = "CREATE EXTERNAL TABLE dt STORED AS DELTATABLE LOCATION 's3://bucket/schema/table';"; + let expected = r#"CreateExternalTable: "dt""#; + quick_test(sql, expected); + } + #[test] fn create_external_table_csv_no_schema() { let sql = "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv'";