Skip to content

Commit

Permalink
Add CREATE VIEW (#2279)
Browse files Browse the repository at this point in the history
* Initial commit

* First passing test

* Add OR REPLACE and more tests

* Update doc comment

* More tests

* Add CreateView to Ballista

* Include Q15 for TPCH

* Ignore q15

* Delete view physical plan
  • Loading branch information
matthewmturner authored May 11, 2022
1 parent 1ded769 commit 19d937a
Show file tree
Hide file tree
Showing 23 changed files with 547 additions and 36 deletions.
7 changes: 7 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ message LogicalPlanNode {
UnionNode union = 19;
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
}
}

Expand Down Expand Up @@ -171,6 +172,12 @@ message CreateCatalogNode {
datafusion.DfSchema schema = 3;
}

message CreateViewNode {
string name = 1;
LogicalPlanNode input = 2;
bool or_replace = 3;
}

// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
Expand Down
33 changes: 31 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion::logical_plan::plan::{
};
use datafusion::logical_plan::{
source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition,
TableScan, Values,
CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder,
Repartition, TableScan, Values,
};
use datafusion::prelude::SessionContext;

Expand Down Expand Up @@ -334,6 +334,19 @@ impl AsLogicalPlan for LogicalPlanNode {
if_not_exists: create_extern_table.if_not_exists,
}))
}
LogicalPlanType::CreateView(create_view) => {
let plan = create_view
.input.clone().ok_or_else(|| BallistaError::General(String::from(
"Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
)))?
.try_into_logical_plan(ctx, extension_codec)?;

Ok(LogicalPlan::CreateView(CreateView {
name: create_view.name.clone(),
input: Arc::new(plan),
or_replace: create_view.or_replace,
}))
}
LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
BallistaError::General(String::from(
Expand Down Expand Up @@ -851,6 +864,22 @@ impl AsLogicalPlan for LogicalPlanNode {
)),
})
}
LogicalPlan::CreateView(CreateView {
name,
input,
or_replace,
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
protobuf::CreateViewNode {
name: name.clone(),
input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
input,
extension_codec,
)?)),
or_replace: *or_replace,
},
))),
}),
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
if_not_exists,
Expand Down
6 changes: 5 additions & 1 deletion datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
Expand Down Expand Up @@ -165,6 +165,10 @@ impl TableProvider for CustomDataSource {
]))
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};

use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
Expand All @@ -39,9 +39,7 @@ pub trait TableProvider: Sync + Send {
fn schema(&self) -> SchemaRef;

/// Get the type of this table for metadata/catalog purposes.
fn table_type(&self) -> TableType {
TableType::Base
}
fn table_type(&self) -> TableType;

/// Create an ExecutionPlan that will scan the table.
/// The table provider will be usually responsible of grouping
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::*;
use async_trait::async_trait;

use crate::datasource::TableProvider;
use crate::datasource::{TableProvider, TableType};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::project_schema;
Expand Down Expand Up @@ -51,6 +51,10 @@ impl TableProvider for EmptyTable {
self.schema.clone()
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::datasource::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
FileFormat,
},
get_statistics_with_limit, TableProvider,
get_statistics_with_limit, TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::{
Expand Down Expand Up @@ -298,6 +298,10 @@ impl TableProvider for ListingTable {
Arc::clone(&self.table_schema)
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;

use crate::datasource::TableProvider;
use crate::datasource::{TableProvider, TableType};
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::logical_plan::Expr;
Expand Down Expand Up @@ -127,6 +127,10 @@ impl TableProvider for MemTable {
self.schema.clone()
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ pub mod file_format;
pub mod listing;
pub mod memory;
pub mod object_store_registry;
pub mod view;

use futures::Stream;

pub use self::datasource::TableProvider;
use self::listing::PartitionedFile;
pub use self::memory::MemTable;
pub use self::view::ViewTable;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
pub use crate::logical_expr::TableType;
Expand Down
Loading

0 comments on commit 19d937a

Please sign in to comment.