Skip to content

Commit

Permalink
Add CreateView to Ballista
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed May 10, 2022
1 parent 807e967 commit 335be74
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 7 deletions.
7 changes: 7 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ message LogicalPlanNode {
UnionNode union = 19;
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
}
}

Expand Down Expand Up @@ -171,6 +172,12 @@ message CreateCatalogNode {
datafusion.DfSchema schema = 3;
}

message CreateViewNode {
string name = 1;
LogicalPlanNode input = 2;
bool or_replace = 3;
}

// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
Expand Down
33 changes: 31 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion::logical_plan::plan::{
};
use datafusion::logical_plan::{
source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition,
TableScan, Values,
CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder,
Repartition, TableScan, Values,
};
use datafusion::prelude::SessionContext;

Expand Down Expand Up @@ -334,6 +334,19 @@ impl AsLogicalPlan for LogicalPlanNode {
if_not_exists: create_extern_table.if_not_exists,
}))
}
LogicalPlanType::CreateView(create_view) => {
let plan = create_view
.input.clone().ok_or_else(|| BallistaError::General(String::from(
"Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
)))?
.try_into_logical_plan(ctx, extension_codec)?;

Ok(LogicalPlan::CreateView(CreateView {
name: create_view.name.clone(),
input: Arc::new(plan),
or_replace: create_view.or_replace,
}))
}
LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
BallistaError::General(String::from(
Expand Down Expand Up @@ -851,6 +864,22 @@ impl AsLogicalPlan for LogicalPlanNode {
)),
})
}
LogicalPlan::CreateView(CreateView {
name,
input,
or_replace,
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
protobuf::CreateViewNode {
name: name.clone(),
input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
input,
extension_codec,
)?)),
or_replace: *or_replace,
},
))),
}),
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
if_not_exists,
Expand Down
6 changes: 5 additions & 1 deletion datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
Expand Down Expand Up @@ -165,6 +165,10 @@ impl TableProvider for CustomDataSource {
]))
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};

use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! View data source which uses a LogicalPlan as it's input.
use std::{any::Any, sync::Arc};

use arrow::datatypes::SchemaRef;
Expand Down
9 changes: 8 additions & 1 deletion datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::scalar::ScalarValue;
use datafusion::{datasource::TableProvider, physical_plan::collect};
use datafusion::{
datasource::{TableProvider, TableType},
physical_plan::collect,
};
use datafusion::{error::Result, physical_plan::DisplayFormatType};

use datafusion::execution::context::{SessionContext, TaskContext};
Expand Down Expand Up @@ -192,6 +195,10 @@ impl TableProvider for CustomTableProvider {
TEST_CUSTOM_SCHEMA_REF!()
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/tests/provider_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use arrow::array::{as_primitive_array, Int32Builder, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::datasource::datasource::TableProvider;
use datafusion::datasource::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
Expand Down Expand Up @@ -132,6 +132,10 @@ impl TableProvider for CustomProvider {
self.zero_batch.schema()
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
_: &Option<Vec<usize>>,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/tests/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::{
datasource::TableProvider,
datasource::{TableProvider, TableType},
error::Result,
logical_plan::Expr,
physical_plan::{
Expand Down Expand Up @@ -68,6 +68,10 @@ impl TableProvider for StatisticsValidation {
Arc::clone(&self.schema)
}

fn table_type(&self) -> TableType {
TableType::Base
}

async fn scan(
&self,
projection: &Option<Vec<usize>>,
Expand Down

0 comments on commit 335be74

Please sign in to comment.