Skip to content

Commit

Permalink
Merge pull request #122 from Fedomn/logical-filter
Browse files Browse the repository at this point in the history
feat(planner): support logical filter
  • Loading branch information
mergify[bot] authored Dec 26, 2022
2 parents ee32753 + e9ca60c commit 2fe54f5
Show file tree
Hide file tree
Showing 38 changed files with 576 additions and 93 deletions.
18 changes: 18 additions & 0 deletions src/common/cast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use arrow::array::{Array, BooleanArray};

use crate::function::FunctionError;

/// Downcast an Arrow Array to a concrete type
macro_rules! downcast_value {
($Value:expr, $Type:ident) => {{
use std::any::type_name;
$Value.as_any().downcast_ref::<$Type>().ok_or_else(|| {
FunctionError::CastError(format!("could not cast value to {}", type_name::<$Type>()))
})?
}};
}

/// Downcast ArrayRef to BooleanArray
pub fn as_boolean_array(array: &dyn Array) -> Result<&BooleanArray, FunctionError> {
Ok(downcast_value!(array, BooleanArray))
}
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod cast;
mod create_info;

pub use cast::*;
pub use create_info::*;
4 changes: 3 additions & 1 deletion src/execution/expression_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ impl ExpressionExecutor {
) -> Result<ArrayRef, ExecutorError> {
Ok(match expr {
BoundExpression::BoundColumnRefExpression(_) => todo!(),
BoundExpression::BoundConstantExpression(e) => e.value.to_array(),
BoundExpression::BoundConstantExpression(e) => {
e.value.to_array_of_size(input.num_rows())
}
BoundExpression::BoundReferenceExpression(e) => input.column(e.index).clone(),
BoundExpression::BoundCastExpression(e) => {
let child_result = Self::execute_internal(&e.child, input)?;
Expand Down
10 changes: 7 additions & 3 deletions src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod physical_create_table;
mod physical_dummy_scan;
mod physical_explain;
mod physical_expression_scan;
mod physical_filter;
mod physical_insert;
mod physical_projection;
mod physical_table_scan;
Expand All @@ -13,17 +14,18 @@ pub use physical_create_table::*;
pub use physical_dummy_scan::*;
pub use physical_explain::*;
pub use physical_expression_scan::*;
pub use physical_filter::*;
pub use physical_insert::*;
pub use physical_projection::*;
pub use physical_table_scan::*;

use crate::types_v2::LogicalType;
use crate::planner_v2::BoundExpression;

#[derive(new, Default, Clone)]
pub struct PhysicalOperatorBase {
pub(crate) children: Vec<PhysicalOperator>,
/// The types returned by this physical operator
pub(crate) types: Vec<LogicalType>,
// The set of expressions contained within the operator, if any
pub(crate) expressioins: Vec<BoundExpression>,
}

#[derive(Clone)]
Expand All @@ -35,6 +37,7 @@ pub enum PhysicalOperator {
PhysicalTableScan(PhysicalTableScan),
PhysicalProjection(PhysicalProjection),
PhysicalColumnDataScan(PhysicalColumnDataScan),
PhysicalFilter(PhysicalFilter),
}

impl PhysicalOperator {
Expand All @@ -47,6 +50,7 @@ impl PhysicalOperator {
PhysicalOperator::PhysicalProjection(op) => &op.base.children,
PhysicalOperator::PhysicalDummyScan(op) => &op.base.children,
PhysicalOperator::PhysicalColumnDataScan(op) => &op.base.children,
PhysicalOperator::PhysicalFilter(op) => &op.base.children,
}
}
}
2 changes: 1 addition & 1 deletion src/execution/physical_plan/physical_dummy_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct PhysicalDummyScan {

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_dummy_scan(&self, op: LogicalDummyScan) -> PhysicalOperator {
let base = PhysicalOperatorBase::new(vec![], op.base.types);
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalDummyScan(PhysicalDummyScan::new(base))
}
}
4 changes: 2 additions & 2 deletions src/execution/physical_plan/physical_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use arrow::array::StringArray;
use arrow::record_batch::RecordBatch;

use super::{PhysicalColumnDataScan, PhysicalOperator, PhysicalOperatorBase};
use super::{PhysicalColumnDataScan, PhysicalOperator};
use crate::execution::{PhysicalPlanGenerator, SchemaUtil};
use crate::planner_v2::LogicalExplain;
use crate::util::tree_render::TreeRender;
Expand All @@ -19,7 +19,7 @@ impl PhysicalPlanGenerator {
// physical plan explain string
let physical_plan_string = TreeRender::physical_plan_tree(&physical_child);

let base = PhysicalOperatorBase::new(vec![], types.clone());
let base = self.create_physical_operator_base(op.base);

let schema = SchemaUtil::new_schema_ref(&["type".to_string(), "plan".to_string()], &types);
let types_column = Arc::new(StringArray::from(vec![
Expand Down
4 changes: 3 additions & 1 deletion src/execution/physical_plan/physical_expression_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::types_v2::LogicalType;
/// The PhysicalExpressionScan scans a set of expressions
#[derive(new, Clone)]
pub struct PhysicalExpressionScan {
#[new(default)]
pub(crate) base: PhysicalOperatorBase,
/// The types of the expressions
pub(crate) expr_types: Vec<LogicalType>,
Expand All @@ -21,7 +20,10 @@ impl PhysicalPlanGenerator {
&self,
op: LogicalExpressionGet,
) -> PhysicalOperator {
assert!(op.base.children.len() == 1);
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalExpressionScan(PhysicalExpressionScan::new(
base,
op.expr_types,
op.expressions,
))
Expand Down
25 changes: 25 additions & 0 deletions src/execution/physical_plan/physical_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use super::{PhysicalOperator, PhysicalOperatorBase};
use crate::execution::PhysicalPlanGenerator;
use crate::planner_v2::{BoundConjunctionExpression, LogicalFilter};

#[derive(Clone)]
pub struct PhysicalFilter {
pub(crate) base: PhysicalOperatorBase,
}

impl PhysicalFilter {
pub fn new(mut base: PhysicalOperatorBase) -> Self {
let expression =
BoundConjunctionExpression::try_build_and_conjunction_expression(base.expressioins);
base.expressioins = vec![expression];
Self { base }
}
}

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_filter(&self, op: LogicalFilter) -> PhysicalOperator {
assert!(op.base.children.len() == 1);
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalFilter(PhysicalFilter::new(base))
}
}
8 changes: 1 addition & 7 deletions src/execution/physical_plan/physical_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,7 @@ impl PhysicalInsert {

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_insert(&self, op: LogicalInsert) -> PhysicalOperator {
let new_children = op
.base
.children
.into_iter()
.map(|op| self.create_plan_internal(op))
.collect::<Vec<_>>();
let base = PhysicalOperatorBase::new(new_children, op.base.types);
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalInsert(PhysicalInsert::new(
base,
op.column_index_list,
Expand Down
14 changes: 3 additions & 11 deletions src/execution/physical_plan/physical_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,16 @@ use derive_new::new;

use super::{PhysicalOperator, PhysicalOperatorBase};
use crate::execution::PhysicalPlanGenerator;
use crate::planner_v2::{BoundExpression, LogicalProjection};
use crate::planner_v2::LogicalProjection;

#[derive(new, Clone)]
pub struct PhysicalProjection {
pub(crate) base: PhysicalOperatorBase,
pub(crate) select_list: Vec<BoundExpression>,
}

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_projection(&self, op: LogicalProjection) -> PhysicalOperator {
let new_children = op
.base
.children
.into_iter()
.map(|p| self.create_plan_internal(p))
.collect::<Vec<_>>();
let types = op.base.types;
let base = PhysicalOperatorBase::new(new_children, types);
PhysicalOperator::PhysicalProjection(PhysicalProjection::new(base, op.base.expressioins))
let base = self.create_physical_operator_base(op.base);
PhysicalOperator::PhysicalProjection(PhysicalProjection::new(base))
}
}
2 changes: 1 addition & 1 deletion src/execution/physical_plan/physical_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct PhysicalTableScan {

impl PhysicalPlanGenerator {
pub(crate) fn create_physical_table_scan(&self, op: LogicalGet) -> PhysicalOperator {
let base = PhysicalOperatorBase::new(vec![], op.base.types);
let base = self.create_physical_operator_base(op.base);
let plan =
PhysicalTableScan::new(base, op.function, op.bind_data, op.returned_types, op.names);
PhysicalOperator::PhysicalTableScan(plan)
Expand Down
17 changes: 15 additions & 2 deletions src/execution/physical_plan_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::sync::Arc;
use derive_new::new;
use log::debug;

use super::{ColumnBindingResolver, PhysicalOperator};
use super::{ColumnBindingResolver, PhysicalOperator, PhysicalOperatorBase};
use crate::execution::LOGGING_TARGET;
use crate::main_entry::ClientContext;
use crate::planner_v2::{LogicalOperator, LogicalOperatorVisitor};
use crate::planner_v2::{LogicalOperator, LogicalOperatorBase, LogicalOperatorVisitor};
use crate::util::tree_render::TreeRender;

#[derive(new)]
Expand Down Expand Up @@ -42,6 +42,19 @@ impl PhysicalPlanGenerator {
LogicalOperator::LogicalProjection(op) => self.create_physical_projection(op),
LogicalOperator::LogicalDummyScan(op) => self.create_physical_dummy_scan(op),
LogicalOperator::LogicalExplain(op) => self.create_physical_explain(op),
LogicalOperator::LogicalFilter(op) => self.create_physical_filter(op),
}
}

pub(crate) fn create_physical_operator_base(
&self,
base: LogicalOperatorBase,
) -> PhysicalOperatorBase {
let children = base
.children
.iter()
.map(|op| self.create_plan_internal(op.clone()))
.collect::<Vec<_>>();
PhysicalOperatorBase::new(children, base.expressioins)
}
}
17 changes: 6 additions & 11 deletions src/execution/volcano_executor/dummy_scan.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use derive_new::new;
use futures_async_stream::try_stream;

use crate::execution::{ExecutionContext, ExecutorError, PhysicalDummyScan};
use crate::types_v2::ScalarValue;

#[derive(new)]
pub struct DummyScan {
pub(crate) plan: PhysicalDummyScan,
pub(crate) _plan: PhysicalDummyScan,
}

impl DummyScan {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, _context: Arc<ExecutionContext>) {
let mut fields = vec![];
for (idx, ty) in self.plan.base.types.iter().enumerate() {
fields.push(Field::new(
format!("col{}", idx).as_str(),
ty.clone().into(),
true,
));
}
let fields = vec![Field::new("dummy", DataType::Boolean, true)];
let schema = SchemaRef::new(Schema::new_with_metadata(fields, HashMap::new()));
yield RecordBatch::new_empty(schema.clone());
let array = ScalarValue::Boolean(Some(true)).to_array();
yield RecordBatch::try_new(schema.clone(), vec![array])?;
}
}
15 changes: 10 additions & 5 deletions src/execution/volcano_executor/expression_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use derive_new::new;
use futures_async_stream::try_stream;

use crate::execution::{
ExecutionContext, ExecutorError, ExpressionExecutor, PhysicalExpressionScan,
BoxedExecutor, ExecutionContext, ExecutorError, ExpressionExecutor, PhysicalExpressionScan,
};

#[derive(new)]
pub struct ExpressionScan {
pub(crate) plan: PhysicalExpressionScan,
pub(crate) child: BoxedExecutor,
}

impl ExpressionScan {
Expand All @@ -27,10 +28,14 @@ impl ExpressionScan {
));
}
let schema = SchemaRef::new(Schema::new_with_metadata(fields, HashMap::new()));
let input = RecordBatch::new_empty(schema.clone());
for exprs in self.plan.expressions.iter() {
let columns = ExpressionExecutor::execute(exprs, &input)?;
yield RecordBatch::try_new(schema.clone(), columns)?;

#[for_await]
for batch in self.child {
let input = batch?;
for exprs in self.plan.expressions.iter() {
let columns = ExpressionExecutor::execute(exprs, &input)?;
yield RecordBatch::try_new(schema.clone(), columns)?;
}
}
}
}
32 changes: 32 additions & 0 deletions src/execution/volcano_executor/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::sync::Arc;

use arrow::compute::filter_record_batch;
use arrow::record_batch::RecordBatch;
use derive_new::new;
use futures_async_stream::try_stream;

use crate::common::as_boolean_array;
use crate::execution::{
BoxedExecutor, ExecutionContext, ExecutorError, ExpressionExecutor, PhysicalFilter,
};

#[derive(new)]
pub struct Filter {
pub(crate) plan: PhysicalFilter,
pub(crate) child: BoxedExecutor,
}

impl Filter {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, _context: Arc<ExecutionContext>) {
let exprs = self.plan.base.expressioins;

#[for_await]
for batch in self.child {
let batch = batch?;
let eval_mask = ExpressionExecutor::execute(&exprs, &batch)?;
let predicate = as_boolean_array(&eval_mask[0])?;
yield filter_record_batch(&batch, predicate)?;
}
}
}
11 changes: 10 additions & 1 deletion src/execution/volcano_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod column_data_scan;
mod create_table;
mod dummy_scan;
mod expression_scan;
mod filter;
mod insert;
mod projection;
mod table_scan;
Expand All @@ -12,6 +13,7 @@ pub use column_data_scan::*;
pub use create_table::*;
pub use dummy_scan::*;
pub use expression_scan::*;
pub use filter::*;
use futures::stream::BoxStream;
use futures::TryStreamExt;
pub use insert::*;
Expand All @@ -34,7 +36,9 @@ impl VolcanoExecutor {
match plan {
PhysicalOperator::PhysicalCreateTable(op) => CreateTable::new(op).execute(context),
PhysicalOperator::PhysicalExpressionScan(op) => {
ExpressionScan::new(op).execute(context)
let child = op.base.children.first().unwrap().clone();
let child_executor = self.build(child, context.clone());
ExpressionScan::new(op, child_executor).execute(context)
}
PhysicalOperator::PhysicalInsert(op) => {
let child = op.base.children.first().unwrap().clone();
Expand All @@ -51,6 +55,11 @@ impl VolcanoExecutor {
PhysicalOperator::PhysicalColumnDataScan(op) => {
ColumnDataScan::new(op).execute(context)
}
PhysicalOperator::PhysicalFilter(op) => {
let child = op.base.children.first().unwrap().clone();
let child_executor = self.build(child, context.clone());
Filter::new(op, child_executor).execute(context)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution/volcano_executor/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Projection {
impl Projection {
#[try_stream(boxed, ok = RecordBatch, error = ExecutorError)]
pub async fn execute(self, _context: Arc<ExecutionContext>) {
let exprs = self.plan.select_list;
let exprs = self.plan.base.expressioins;
let schema = SchemaUtil::new_schema_ref_from_exprs(&exprs);

#[for_await]
Expand Down
Loading

0 comments on commit 2fe54f5

Please sign in to comment.