Skip to content

Commit

Permalink
Ballista
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Mar 28, 2022
1 parent a9fc1b1 commit 2aae820
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
7 changes: 7 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message LogicalPlanNode {
ValuesNode values = 16;
LogicalExtensionNode extension = 17;
CreateCatalogSchemaNode create_catalog_schema = 18;
CreateCatalogNode create_catalog_schema = 18;
}
}

Expand Down Expand Up @@ -153,6 +154,12 @@ message CreateCatalogSchemaNode {
datafusion.DfSchema schema = 3;
}

message CreateCatalogNode {
string catalog_name = 1;
bool if_not_exists = 2;
datafusion.DfSchema schema = 3;
}

// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
Expand Down
31 changes: 29 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
};
use datafusion::logical_plan::{
Column, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, JoinConstraint,
Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, Values,
Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan,
Values,
};
use datafusion::prelude::SessionContext;

Expand Down Expand Up @@ -340,6 +341,19 @@ impl AsLogicalPlan for LogicalPlanNode {
schema: pb_schema.try_into()?,
}))
}
LogicalPlanType::CreateCatalog(create_catalog) => {
let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
BallistaError::General(String::from(
"Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
))
})?;

Ok(LogicalPlan::CreateCatalog(CreateCatalog {
catalog_name: create_catalog.schema_name.clone(),
if_not_exists: create_catalog.if_not_exists,
schema: pb_schema.try_into()?,
}))
}
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan =
into_logical_plan!(analyze.input, &ctx, extension_codec)?;
Expand Down Expand Up @@ -787,6 +801,19 @@ impl AsLogicalPlan for LogicalPlanNode {
},
)),
}),
LogicalPlan::CreateCatalog(CreateCatalog {
catalog_name,
if_not_exists,
schema: df_schema,
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateCatalog(
protobuf::CreateCatalogNode {
catalog_name: catalog_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.into()),
},
)),
}),
LogicalPlan::Analyze(a) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
a.input.as_ref(),
Expand Down

0 comments on commit 2aae820

Please sign in to comment.