Skip to content

Commit

Permalink
Encapsulate create table/view construction
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Nov 19, 2024
1 parent 9fb5ff9 commit 435f9c1
Show file tree
Hide file tree
Showing 8 changed files with 627 additions and 159 deletions.
25 changes: 7 additions & 18 deletions datafusion/core/src/catalog_common/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use std::sync::{Arc, Mutex};
use crate::catalog::{SchemaProvider, TableProvider, TableProviderFactory};
use crate::execution::context::SessionState;

use datafusion_common::{
Constraints, DFSchema, DataFusionError, HashMap, TableReference,
};
use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference};
use datafusion_expr::CreateExternalTable;

use async_trait::async_trait;
Expand Down Expand Up @@ -131,21 +129,12 @@ impl ListingSchemaProvider {
.factory
.create(
state,
&CreateExternalTable {
schema: Arc::new(DFSchema::empty()),
name,
location: table_url,
file_type: self.format.clone(),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options: Default::default(),
constraints: Constraints::empty(),
column_defaults: Default::default(),
},
&CreateExternalTable::builder()
.schema(Arc::new(DFSchema::empty()))
.name(name)
.location(table_url)
.file_type(self.format.clone())
.build()?,
)
.await?;
let _ =
Expand Down
52 changes: 19 additions & 33 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ mod tests {
datasource::file_format::csv::CsvFormat, execution::context::SessionContext,
};

use datafusion_common::{Constraints, DFSchema, TableReference};
use datafusion_common::{DFSchema, TableReference};

#[tokio::test]
async fn test_create_using_non_std_file_ext() {
async fn test_create_using_non_std_file_ext() -> Result<()> {
let csv_file = tempfile::Builder::new()
.prefix("foo")
.suffix(".tbl")
Expand All @@ -190,32 +190,25 @@ mod tests {
let context = SessionContext::new();
let state = context.state();
let name = TableReference::bare("foo");
let cmd = CreateExternalTable {
name,
location: csv_file.path().to_str().unwrap().to_string(),
file_type: "csv".to_string(),
schema: Arc::new(DFSchema::empty()),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options: HashMap::from([("format.has_header".into(), "true".into())]),
constraints: Constraints::empty(),
column_defaults: HashMap::new(),
};
let cmd = CreateExternalTable::builder()
.name(name)
.location(csv_file.path().to_str().unwrap().to_string())
.file_type("csv".to_string())
.schema(Arc::new(DFSchema::empty()))
.options(HashMap::from([("format.has_header".into(), "true".into())]))
.build()?;
let table_provider = factory.create(&state, &cmd).await.unwrap();
let listing_table = table_provider
.as_any()
.downcast_ref::<ListingTable>()
.unwrap();
let listing_options = listing_table.options();
assert_eq!(".tbl", listing_options.file_extension);
Ok(())
}

#[tokio::test]
async fn test_create_using_non_std_file_ext_csv_options() {
async fn test_create_using_non_std_file_ext_csv_options() -> Result<()> {
let csv_file = tempfile::Builder::new()
.prefix("foo")
.suffix(".tbl")
Expand All @@ -230,21 +223,13 @@ mod tests {
let mut options = HashMap::new();
options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned());
options.insert("format.has_header".into(), "true".into());
let cmd = CreateExternalTable {
name,
location: csv_file.path().to_str().unwrap().to_string(),
file_type: "csv".to_string(),
schema: Arc::new(DFSchema::empty()),
table_partition_cols: vec![],
if_not_exists: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options,
constraints: Constraints::empty(),
column_defaults: HashMap::new(),
};
let cmd = CreateExternalTable::builder()
.name(name)
.location(csv_file.path().to_str().unwrap().to_string())
.file_type("csv".to_string())
.schema(Arc::new(DFSchema::empty()))
.options(options)
.build()?;
let table_provider = factory.create(&state, &cmd).await.unwrap();
let listing_table = table_provider
.as_any()
Expand All @@ -257,5 +242,6 @@ mod tests {
assert_eq!(csv_options.schema_infer_max_rec, Some(1000));
let listing_options = listing_table.options();
assert_eq!(".tbl", listing_options.file_extension);
Ok(())
}
}
14 changes: 7 additions & 7 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ use crate::{
logical_expr::ScalarUDF,
logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, SetVariable,
TableType, UNNAMED_TABLE,
CreateMemoryTable, CreateMemoryTableFields, CreateView, CreateViewFields,
DropCatalogSchema, DropFunction, DropTable, DropView, Execute, LogicalPlan,
LogicalPlanBuilder, Prepare, SetVariable, TableType, UNNAMED_TABLE,
},
physical_expr::PhysicalExpr,
physical_plan::ExecutionPlan,
Expand Down Expand Up @@ -792,15 +792,15 @@ impl SessionContext {
}

async fn create_memory_table(&self, cmd: CreateMemoryTable) -> Result<DataFrame> {
let CreateMemoryTable {
let CreateMemoryTableFields {
name,
input,
if_not_exists,
or_replace,
constraints,
column_defaults,
temporary,
} = cmd;
} = cmd.into_fields();

let input = Arc::unwrap_or_clone(input);
let input = self.state().optimize(&input)?;
Expand Down Expand Up @@ -852,13 +852,13 @@ impl SessionContext {
}

async fn create_view(&self, cmd: CreateView) -> Result<DataFrame> {
let CreateView {
let CreateViewFields {
name,
input,
or_replace,
definition,
temporary,
} = cmd;
} = cmd.into_fields();

let view = self.table(name.clone()).await;

Expand Down
Loading

0 comments on commit 435f9c1

Please sign in to comment.