diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 1e7901403828f..e7821dc108a7c 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -53,6 +53,7 @@ message LogicalPlanNode { UnionNode union = 19; CreateCatalogNode create_catalog = 20; SubqueryAliasNode subquery_alias = 21; + CreateViewNode create_view = 22; } } @@ -171,6 +172,12 @@ message CreateCatalogNode { datafusion.DfSchema schema = 3; } +message CreateViewNode { + string name = 1; + LogicalPlanNode input = 2; + bool or_replace = 3; +} + // a node containing data for defining values list. unlike in SQL where it's two dimensional, here // the list is flattened, and with the field n_cols it can be parsed and partitioned into rows message ValuesNode { diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 5307aff653e45..e6d9e6289740f 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -33,8 +33,8 @@ use datafusion::logical_plan::plan::{ }; use datafusion::logical_plan::{ source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, - CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, - TableScan, Values, + CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, + Repartition, TableScan, Values, }; use datafusion::prelude::SessionContext; @@ -334,6 +334,19 @@ impl AsLogicalPlan for LogicalPlanNode { if_not_exists: create_extern_table.if_not_exists, })) } + LogicalPlanType::CreateView(create_view) => { + let plan = create_view + .input.clone().ok_or_else(|| BallistaError::General(String::from( + "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.", + )))? + .try_into_logical_plan(ctx, extension_codec)?; + + Ok(LogicalPlan::CreateView(CreateView { + name: create_view.name.clone(), + input: Arc::new(plan), + or_replace: create_view.or_replace, + })) + } LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => { let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| { BallistaError::General(String::from( @@ -851,6 +864,22 @@ impl AsLogicalPlan for LogicalPlanNode { )), }) } + LogicalPlan::CreateView(CreateView { + name, + input, + or_replace, + }) => Ok(protobuf::LogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::CreateView(Box::new( + protobuf::CreateViewNode { + name: name.clone(), + input: Some(Box::new(LogicalPlanNode::try_from_logical_plan( + input, + extension_codec, + )?)), + or_replace: *or_replace, + }, + ))), + }), LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema_name, if_not_exists, diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index d8a908986839e..a9a8ef7aa22ac 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -20,7 +20,7 @@ use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::dataframe::DataFrame; -use datafusion::datasource::TableProvider; +use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::TaskContext; use datafusion::logical_plan::{Expr, LogicalPlanBuilder}; @@ -165,6 +165,10 @@ impl TableProvider for CustomDataSource { ])) } + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( &self, projection: &Option>, diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs index 3ff4e780e481f..8ab254525acb1 100644 --- a/datafusion/core/src/datasource/datasource.rs +++ b/datafusion/core/src/datasource/datasource.rs @@ -21,7 +21,7 @@ use std::any::Any; use std::sync::Arc; use async_trait::async_trait; -use datafusion_expr::{TableProviderFilterPushDown, TableType}; +pub use datafusion_expr::{TableProviderFilterPushDown, TableType}; use crate::arrow::datatypes::SchemaRef; use crate::error::Result; diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index b6d47308e920e..2bb3b687b40eb 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! View data source which uses a LogicalPlan as it's input. + use std::{any::Any, sync::Arc}; use arrow::datatypes::SchemaRef; diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index 81e4706deb655..f1356f7d44318 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -24,7 +24,10 @@ use datafusion::from_slice::FromSlice; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::scalar::ScalarValue; -use datafusion::{datasource::TableProvider, physical_plan::collect}; +use datafusion::{ + datasource::{TableProvider, TableType}, + physical_plan::collect, +}; use datafusion::{error::Result, physical_plan::DisplayFormatType}; use datafusion::execution::context::{SessionContext, TaskContext}; @@ -192,6 +195,10 @@ impl TableProvider for CustomTableProvider { TEST_CUSTOM_SCHEMA_REF!() } + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( &self, projection: &Option>, diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs index 49cd70143b99b..c8fe483ea9f46 100644 --- a/datafusion/core/tests/provider_filter_pushdown.rs +++ b/datafusion/core/tests/provider_filter_pushdown.rs @@ -19,7 +19,7 @@ use arrow::array::{as_primitive_array, Int32Builder, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; -use datafusion::datasource::datasource::TableProvider; +use datafusion::datasource::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; @@ -132,6 +132,10 @@ impl TableProvider for CustomProvider { self.zero_batch.schema() } + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( &self, _: &Option>, diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs index 0315067047903..99b53a62d8eee 100644 --- a/datafusion/core/tests/statistics.rs +++ b/datafusion/core/tests/statistics.rs @@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::{ - datasource::TableProvider, + datasource::{TableProvider, TableType}, error::Result, logical_plan::Expr, physical_plan::{ @@ -68,6 +68,10 @@ impl TableProvider for StatisticsValidation { Arc::clone(&self.schema) } + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( &self, projection: &Option>,