Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): support logical filter #122

Merged
merged 4 commits into from
Dec 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/common/cast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use arrow::array::{Array, BooleanArray};

use crate::function::FunctionError;

/// Downcast an Arrow Array to a concrete type
macro_rules! downcast_value {
($Value:expr, $Type:ident) => {{
use std::any::type_name;
$Value.as_any().downcast_ref::<$Type>().ok_or_else(|| {
FunctionError::CastError(format!("could not cast value to {}", type_name::<$Type>()))
})?
}};
}

/// Downcast ArrayRef to BooleanArray
pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray, FunctionError> {
Ok(downcast_value!(array, BooleanArray))
}
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod cast;
mod create_info;

pub use cast::*;
pub use create_info::*;
4 changes: 3 additions & 1 deletion src/execution/expression_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ impl ExpressionExecutor {
) -> Result<ArrayRef, ExecutorError> {
Ok(match expr {
BoundExpression::BoundColumnRefExpression(_) => todo!(),
BoundExpression::BoundConstantExpression(e) => e.value.to_array(),
BoundExpression::BoundConstantExpression(e) => {
e.value.to_array_of_size(input.num_rows())
}
BoundExpression::BoundReferenceExpression(e) => input.column(e.index).clone(),
BoundExpression::BoundCastExpression(e) => {
let child_result = Self::execute_internal(&e.child, input)?;
Expand Down
10 changes: 7 additions & 3 deletions src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod physical_create_table;
mod physical_dummy_scan;
mod physical_explain;
mod physical_expression_scan;
mod physical_filter;
mod physical_insert;
mod physical_projection;
mod physical_table_scan;
Expand All @@ -13,17 +14,18 @@ pub use physical_create_table::*;
pub use physical_dummy_scan::*;
pub use physical_explain::*;
pub use physical_expression_scan::*;
pub use physical_filter::*;
pub use physical_insert::*;
pub use physical_projection::*;
pub use physical_table_scan::*;

use crate::types_v2::LogicalType;
use crate::planner_v2::BoundExpression;

#[derive(new, Default, Clone)]
pub struct PhysicalOperatorBase {
pub(crate) children: Vec<PhysicalOperator>,
/// The types returned by this physical operator
pub(crate) types: Vec<LogicalType>,
// The set of expressions contained within the operator, if any
pub(crate) expressioins: Vec<BoundExpression>,
}

#[derive(Clone)]
Expand All @@ -35,6 +37,7 @@ pub enum PhysicalOperator {
PhysicalTableScan(PhysicalTableScan),
PhysicalProjection(PhysicalProjection),
PhysicalColumnDataScan(PhysicalColumnDataScan),
PhysicalFilter(PhysicalFilter),
}

impl PhysicalOperator {
Expand All @@ -47,6 +50,7 @@ impl PhysicalOperator {
PhysicalOperator::PhysicalProjection(op) => &op.base.children,
PhysicalOperator::PhysicalDummyScan(op) => &op.base.children,
PhysicalOperator::PhysicalColumnDataScan(op) => &op.base.children,
PhysicalOperator::PhysicalFilter(op) => &op.base.children,
}
}
}
2 changes: 1 addition & 1 deletion src/execution/physical_plan/physical_dummy_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct PhysicalDummyScan {

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_dummy_scan(&self, op: LogicalDummyScan) -> PhysicalOperator {
let base = PhysicalOperatorBase::new(vec![], op.base.types);
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalDummyScan(PhysicalDummyScan::new(base))
}
}
4 changes: 2 additions & 2 deletions src/execution/physical_plan/physical_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use arrow::array::StringArray;
use arrow::record_batch::RecordBatch;

use super::{PhysicalColumnDataScan, PhysicalOperator, PhysicalOperatorBase};
use super::{PhysicalColumnDataScan, PhysicalOperator};
use crate::execution::{PhysicalPlanGenerator, SchemaUtil};
use crate::planner_v2::LogicalExplain;
use crate::util::tree_render::TreeRender;
Expand All @@ -19,7 +19,7 @@ impl PhysicalPlanGenerator {
// physical plan explain string
let physical_plan_string = TreeRender::physical_plan_tree(&physical_child);

let base = PhysicalOperatorBase::new(vec![], types.clone());
let base = self.create_physical_operator_base(op.base);

let schema = SchemaUtil::new_schema_ref(&["type".to_string(), "plan".to_string()], &types);
let types_column = Arc::new(StringArray::from(vec![
Expand Down
4 changes: 3 additions & 1 deletion src/execution/physical_plan/physical_expression_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::types_v2::LogicalType;
/// The PhysicalExpressionScan scans a set of expressions
#[derive(new, Clone)]
pub struct PhysicalExpressionScan {
#[new(default)]
pub(crate) base: PhysicalOperatorBase,
/// The types of the expressions
pub(crate) expr_types: Vec<LogicalType>,
Expand All @@ -21,7 +20,10 @@ impl PhysicalPlanGenerator {
&self,
op: LogicalExpressionGet,
) -> PhysicalOperator {
assert!(op.base.children.len() == 1);
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalExpressionScan(PhysicalExpressionScan::new(
base,
op.expr_types,
op.expressions,
))
Expand Down
25 changes: 25 additions & 0 deletions src/execution/physical_plan/physical_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use super::{PhysicalOperator, PhysicalOperatorBase};
use crate::execution::PhysicalPlanGenerator;
use crate::planner_v2::{BoundConjunctionExpression, LogicalFilter};

#[derive(Clone)]
pub struct PhysicalFilter {
pub(crate) base: PhysicalOperatorBase,
}

impl PhysicalFilter {
pub fn new(mut base: PhysicalOperatorBase) -> Self {
let expression =
BoundConjunctionExpression::try_build_and_conjunction_expression(base.expressioins);
base.expressioins = vec![expression];
Self { base }
}
}

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_filter(&self, op: LogicalFilter) -> PhysicalOperator {
assert!(op.base.children.len() == 1);
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalFilter(PhysicalFilter::new(base))
}
}
8 changes: 1 addition & 7 deletions src/execution/physical_plan/physical_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,7 @@ impl PhysicalInsert {

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_insert(&self, op: LogicalInsert) -> PhysicalOperator {
let new_children = op
.base
.children
.into_iter()
.map(|op| self.create_plan_internal(op))
.collect::<Vec<_>>();
let base = PhysicalOperatorBase::new(new_children, op.base.types);
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalInsert(PhysicalInsert::new(
base,
op.column_index_list,
Expand Down
14 changes: 3 additions & 11 deletions src/execution/physical_plan/physical_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,16 @@ use derive_new::new;

use super::{PhysicalOperator, PhysicalOperatorBase};
use crate::execution::PhysicalPlanGenerator;
use crate::planner_v2::{BoundExpression, LogicalProjection};
use crate::planner_v2::LogicalProjection;

#[derive(new, Clone)]
pub struct PhysicalProjection {
pub(crate) base: PhysicalOperatorBase,
pub(crate) select_list: Vec<BoundExpression>,
}

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_projection(&self, op: LogicalProjection) -> PhysicalOperator {
let new_children = op
.base
.children
.into_iter()
.map(|p| self.create_plan_internal(p))
.collect::<Vec<_>>();
let types = op.base.types;
let base = PhysicalOperatorBase::new(new_children, types);
PhysicalOperator::PhysicalProjection(PhysicalProjection::new(base, op.base.expressioins))
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalProjection(PhysicalProjection::new(base))
}
}
2 changes: 1 addition & 1 deletion src/execution/physical_plan/physical_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct PhysicalTableScan {

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_table_scan(&self, op: LogicalGet) -> PhysicalOperator {
let base = PhysicalOperatorBase::new(vec![], op.base.types);
let base = self.create_physical_operator_base(op.base);
let plan =
PhysicalTableScan::new(base, op.function, op.bind_data, op.returned_types, op.names);
PhysicalOperator::PhysicalTableScan(plan)
Expand Down
17 changes: 15 additions & 2 deletions src/execution/physical_plan_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::sync::Arc;
use derive_new::new;
use log::debug;

use super::{ColumnBindingResolver, PhysicalOperator};
use super::{ColumnBindingResolver, PhysicalOperator, PhysicalOperatorBase};
use crate::execution::LOGGING_TARGET;
use crate::main_entry::ClientContext;
use crate::planner_v2::{LogicalOperator, LogicalOperatorVisitor};
use crate::planner_v2::{LogicalOperator, LogicalOperatorBase, LogicalOperatorVisitor};
use crate::util::tree_render::TreeRender;

#[derive(new)]
Expand Down Expand Up @@ -42,6 +42,19 @@ impl PhysicalPlanGenerator {
LogicalOperator::LogicalProjection(op) => self.create_physical_projection(op),
LogicalOperator::LogicalDummyScan(op) => self.create_physical_dummy_scan(op),
LogicalOperator::LogicalExplain(op) => self.create_physical_explain(op),
LogicalOperator::LogicalFilter(op) => self.create_physical_filter(op),
}
}

pub(crate) fn create_physical_operator_base(
&self,
base: LogicalOperatorBase,
) -> PhysicalOperatorBase {
let children = base
.children
.iter()
.map(|op| self.create_plan_internal(op.clone()))
.collect::<Vec<_>>();
PhysicalOperatorBase::new(children, base.expressioins)
}
}
17 changes: 6 additions & 11 deletions src/execution/volcano_executor/dummy_scan.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use derive_new::new;
use futures_async_stream::try_stream;

use crate::execution::{ExecutionContext, ExecutorError, PhysicalDummyScan};
use crate::types_v2::ScalarValue;

#[derive(new)]
pub struct DummyScan {
pub(crate) plan: PhysicalDummyScan,
pub(crate) _plan: PhysicalDummyScan,
}

impl DummyScan {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, _context: Arc<ExecutionContext>) {
let mut fields = vec![];
for (idx, ty) in self.plan.base.types.iter().enumerate() {
fields.push(Field::new(
format!("col{}", idx).as_str(),
ty.clone().into(),
true,
));
}
let fields = vec![Field::new("dummy", DataType::Boolean, true)];
let schema = SchemaRef::new(Schema::new_with_metadata(fields, HashMap::new()));
yield RecordBatch::new_empty(schema.clone());
let array = ScalarValue::Boolean(Some(true)).to_array();
yield RecordBatch::try_new(schema.clone(), vec![array])?;
}
}
15 changes: 10 additions & 5 deletions src/execution/volcano_executor/expression_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use derive_new::new;
use futures_async_stream::try_stream;

use crate::execution::{
ExecutionContext, ExecutorError, ExpressionExecutor, PhysicalExpressionScan,
BoxedExecutor, ExecutionContext, ExecutorError, ExpressionExecutor, PhysicalExpressionScan,
};

#[derive(new)]
pub struct ExpressionScan {
pub(crate) plan: PhysicalExpressionScan,
pub(crate) child: BoxedExecutor,
}

impl ExpressionScan {
Expand All @@ -27,10 +28,14 @@ impl ExpressionScan {
));
}
let schema = SchemaRef::new(Schema::new_with_metadata(fields, HashMap::new()));
let input = RecordBatch::new_empty(schema.clone());
for exprs in self.plan.expressions.iter() {
let columns = ExpressionExecutor::execute(exprs, &input)?;
yield RecordBatch::try_new(schema.clone(), columns)?;

#[for_await]
for batch in self.child {
let input = batch?;
for exprs in self.plan.expressions.iter() {
let columns = ExpressionExecutor::execute(exprs, &input)?;
yield RecordBatch::try_new(schema.clone(), columns)?;
}
}
}
}
32 changes: 32 additions & 0 deletions src/execution/volcano_executor/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::sync::Arc;

use arrow::compute::filter_record_batch;
use arrow::record_batch::RecordBatch;
use derive_new::new;
use futures_async_stream::try_stream;

use crate::common::as_boolean_array;
use crate::execution::{
BoxedExecutor, ExecutionContext, ExecutorError, ExpressionExecutor, PhysicalFilter,
};

#[derive(new)]
pub struct Filter {
pub(crate) plan: PhysicalFilter,
pub(crate) child: BoxedExecutor,
}

impl Filter {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, _context: Arc<ExecutionContext>) {
let exprs = self.plan.base.expressioins;

#[for_await]
for batch in self.child {
let batch = batch?;
let eval_mask = ExpressionExecutor::execute(&exprs, &batch)?;
let predicate = as_boolean_array(&eval_mask[0])?;
yield filter_record_batch(&batch, predicate)?;
}
}
}
11 changes: 10 additions & 1 deletion src/execution/volcano_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod column_data_scan;
mod create_table;
mod dummy_scan;
mod expression_scan;
mod filter;
mod insert;
mod projection;
mod table_scan;
Expand All @@ -12,6 +13,7 @@ pub use column_data_scan::*;
pub use create_table::*;
pub use dummy_scan::*;
pub use expression_scan::*;
pub use filter::*;
use futures::stream::BoxStream;
use futures::TryStreamExt;
pub use insert::*;
Expand All @@ -34,7 +36,9 @@ impl VolcanoExecutor {
match plan {
PhysicalOperator::PhysicalCreateTable(op) => CreateTable::new(op).execute(context),
PhysicalOperator::PhysicalExpressionScan(op) => {
ExpressionScan::new(op).execute(context)
let child = op.base.children.first().unwrap().clone();
let child_executor = self.build(child, context.clone());
ExpressionScan::new(op, child_executor).execute(context)
}
PhysicalOperator::PhysicalInsert(op) => {
let child = op.base.children.first().unwrap().clone();
Expand All @@ -51,6 +55,11 @@ impl VolcanoExecutor {
PhysicalOperator::PhysicalColumnDataScan(op) => {
ColumnDataScan::new(op).execute(context)
}
PhysicalOperator::PhysicalFilter(op) => {
let child = op.base.children.first().unwrap().clone();
let child_executor = self.build(child, context.clone());
Filter::new(op, child_executor).execute(context)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano_executor/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Projection {
impl Projection {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, _context: Arc<ExecutionContext>) {
let exprs = self.plan.select_list;
let exprs = self.plan.base.expressioins;
let schema = SchemaUtil::new_schema_ref_from_exprs(&exprs);

#[for_await]
Expand Down
Loading