Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
JackTan25 committed Oct 11, 2023
2 parents f911e63 + 52163fa commit 2d6f76a
Show file tree
Hide file tree
Showing 46 changed files with 4,231 additions and 3,181 deletions.
44 changes: 22 additions & 22 deletions src/query/functions/src/scalars/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,28 +652,6 @@ pub fn register(registry: &mut FunctionRegistry) {
}

fn register_array_aggr(registry: &mut FunctionRegistry) {
fn eval_aggr_return_type(name: &str, args_type: &[DataType]) -> Option<DataType> {
if args_type.len() != 1 {
return None;
}
let arg_type = args_type[0].remove_nullable();
if arg_type == DataType::EmptyArray {
if name == "count" {
return Some(DataType::Number(NumberDataType::UInt64));
}
return Some(DataType::Null);
}
let array_type = arg_type.as_array()?;
let factory = AggregateFunctionFactory::instance();
let func = factory.get(name, vec![], vec![*array_type.clone()]).ok()?;
let return_type = func.return_type().ok()?;
if args_type[0].is_nullable() {
Some(return_type.wrap_nullable())
} else {
Some(return_type)
}
}

fn eval_array_aggr(
name: &str,
args: &[ValueRef<AnyType>],
Expand Down Expand Up @@ -728,6 +706,28 @@ fn register_array_aggr(registry: &mut FunctionRegistry) {
}
}

fn eval_aggr_return_type(name: &str, args_type: &[DataType]) -> Option<DataType> {
if args_type.len() != 1 {
return None;
}
let arg_type = args_type[0].remove_nullable();
if arg_type == DataType::EmptyArray {
if name == "count" {
return Some(DataType::Number(NumberDataType::UInt64));
}
return Some(DataType::Null);
}
let array_type = arg_type.as_array()?;
let factory = AggregateFunctionFactory::instance();
let func = factory.get(name, vec![], vec![*array_type.clone()]).ok()?;
let return_type = func.return_type().ok()?;
if args_type[0].is_nullable() {
Some(return_type.wrap_nullable())
} else {
Some(return_type)
}
}

for (fn_name, name) in ARRAY_AGGREGATE_FUNCTIONS {
registry.register_function_factory(fn_name, |_, args_type| {
let return_type = eval_aggr_return_type(name, args_type)?;
Expand Down
62 changes: 31 additions & 31 deletions src/query/sql/src/executor/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,38 @@ use common_profile::SharedProcessorProfiles;
use itertools::Itertools;

use crate::executor::explain::PlanStatsInfo;
use crate::executor::AggregateExpand;
use crate::executor::AggregateFinal;
use crate::executor::AggregateFunctionDesc;
use crate::executor::AggregatePartial;
use crate::executor::CommitSink;
use crate::executor::ConstantTableScan;
use crate::executor::CopyIntoTablePhysicalPlan;
use crate::executor::CteScan;
use crate::executor::DeletePartial;
use crate::executor::DistributedInsertSelect;
use crate::executor::EvalScalar;
use crate::executor::Exchange;
use crate::executor::ExchangeSink;
use crate::executor::ExchangeSource;
use crate::executor::Filter;
use crate::executor::FragmentKind;
use crate::executor::HashJoin;
use crate::executor::Lambda;
use crate::executor::Limit;
use crate::executor::MaterializedCte;
use crate::executor::physical_plans::common::AggregateFunctionDesc;
use crate::executor::physical_plans::common::FragmentKind;
use crate::executor::physical_plans::physical_aggregate_expand::AggregateExpand;
use crate::executor::physical_plans::physical_aggregate_final::AggregateFinal;
use crate::executor::physical_plans::physical_aggregate_partial::AggregatePartial;
use crate::executor::physical_plans::physical_commit_sink::CommitSink;
use crate::executor::physical_plans::physical_constant_table_scan::ConstantTableScan;
use crate::executor::physical_plans::physical_copy_into::CopyIntoTablePhysicalPlan;
use crate::executor::physical_plans::physical_cte_scan::CteScan;
use crate::executor::physical_plans::physical_delete_partial::DeletePartial;
use crate::executor::physical_plans::physical_distributed_insert_select::DistributedInsertSelect;
use crate::executor::physical_plans::physical_eval_scalar::EvalScalar;
use crate::executor::physical_plans::physical_exchange::Exchange;
use crate::executor::physical_plans::physical_exchange_sink::ExchangeSink;
use crate::executor::physical_plans::physical_exchange_source::ExchangeSource;
use crate::executor::physical_plans::physical_filter::Filter;
use crate::executor::physical_plans::physical_hash_join::HashJoin;
use crate::executor::physical_plans::physical_lambda::Lambda;
use crate::executor::physical_plans::physical_limit::Limit;
use crate::executor::physical_plans::physical_materialized_cte::MaterializedCte;
use crate::executor::physical_plans::physical_project::Project;
use crate::executor::physical_plans::physical_project_set::ProjectSet;
use crate::executor::physical_plans::physical_range_join::RangeJoin;
use crate::executor::physical_plans::physical_range_join::RangeJoinType;
use crate::executor::physical_plans::physical_row_fetch::RowFetch;
use crate::executor::physical_plans::physical_runtime_filter_source::RuntimeFilterSource;
use crate::executor::physical_plans::physical_sort::Sort;
use crate::executor::physical_plans::physical_table_scan::TableScan;
use crate::executor::physical_plans::physical_union_all::UnionAll;
use crate::executor::physical_plans::physical_window::Window;
use crate::executor::physical_plans::physical_window::WindowFunction;
use crate::executor::PhysicalPlan;
use crate::executor::Project;
use crate::executor::ProjectSet;
use crate::executor::RangeJoin;
use crate::executor::RangeJoinType;
use crate::executor::RowFetch;
use crate::executor::RuntimeFilterSource;
use crate::executor::Sort;
use crate::executor::TableScan;
use crate::executor::UnionAll;
use crate::executor::Window;
use crate::executor::WindowFunction;
use crate::planner::Metadata;
use crate::planner::MetadataRef;
use crate::planner::DUMMY_TABLE_INDEX;
Expand Down
57 changes: 48 additions & 9 deletions src/query/sql/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,63 @@

mod explain;
mod format;
mod physical_join;
mod physical_plan;
mod physical_plan_builder;
mod physical_plan_display;
mod physical_plan_visitor;
mod physical_plans;
mod profile;
pub mod table_read_plan;
mod util;

pub use physical_join::hash_join;
pub use physical_join::physical_join;
pub use physical_join::range_join;
pub use physical_join::PhysicalJoinType;
pub use physical_plan::Exchange;
pub use physical_plan::MutationKind;
pub use physical_plan::*;
pub use physical_plan::PhysicalPlan;
pub use physical_plan_builder::PhysicalPlanBuilder;
pub use physical_plan_builder::RangeJoinCondition;
pub use physical_plan_visitor::PhysicalPlanReplacer;
pub use physical_plans::common::AggregateFunctionDesc;
pub use physical_plans::common::FragmentKind;
pub use physical_plans::common::MutationKind;
pub use physical_plans::common::OnConflictField;
pub use physical_plans::physical_aggregate_expand::AggregateExpand;
pub use physical_plans::physical_aggregate_final::AggregateFinal;
pub use physical_plans::physical_aggregate_partial::AggregatePartial;
pub use physical_plans::physical_async_source::AsyncSourcerPlan;
pub use physical_plans::physical_commit_sink::CommitSink;
pub use physical_plans::physical_compact_partial::CompactPartial;
pub use physical_plans::physical_constant_table_scan::ConstantTableScan;
pub use physical_plans::physical_copy_into::CopyIntoTablePhysicalPlan;
pub use physical_plans::physical_copy_into::CopyIntoTableSource;
pub use physical_plans::physical_copy_into::QuerySource;
pub use physical_plans::physical_cte_scan::CteScan;
pub use physical_plans::physical_deduplicate::Deduplicate;
pub use physical_plans::physical_deduplicate::SelectCtx;
pub use physical_plans::physical_delete_partial::DeletePartial;
pub use physical_plans::physical_distributed_insert_select::DistributedInsertSelect;
pub use physical_plans::physical_eval_scalar::EvalScalar;
pub use physical_plans::physical_exchange::Exchange;
pub use physical_plans::physical_exchange_sink::ExchangeSink;
pub use physical_plans::physical_exchange_source::ExchangeSource;
pub use physical_plans::physical_filter::Filter;
pub use physical_plans::physical_hash_join::HashJoin;
pub use physical_plans::physical_lambda::Lambda;
pub use physical_plans::physical_lambda::LambdaFunctionDesc;
pub use physical_plans::physical_limit::Limit;
pub use physical_plans::physical_materialized_cte::MaterializedCte;
pub use physical_plans::physical_merge_into::MatchExpr;
pub use physical_plans::physical_merge_into::MergeInto;
pub use physical_plans::physical_merge_into::MergeIntoSource;
pub use physical_plans::physical_project::Project;
pub use physical_plans::physical_project_set::ProjectSet;
pub use physical_plans::physical_range_join::RangeJoin;
pub use physical_plans::physical_range_join::RangeJoinCondition;
pub use physical_plans::physical_range_join::RangeJoinType;
pub use physical_plans::physical_replace_into::ReplaceInto;
pub use physical_plans::physical_row_fetch::RowFetch;
pub use physical_plans::physical_runtime_filter_source::RuntimeFilterSource;
pub use physical_plans::physical_sort::Sort;
pub use physical_plans::physical_table_scan::TableScan;
pub use physical_plans::physical_union_all::UnionAll;
pub use physical_plans::physical_window::LagLeadDefault;
pub use physical_plans::physical_window::Window;
pub use physical_plans::physical_window::WindowFunction;
pub use profile::*;
pub use util::*;
Loading

0 comments on commit 2d6f76a

Please sign in to comment.