Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For avg functions replacing it with (sum / count); count(*) #415

Merged
merged 13 commits into from
Jan 4, 2024
Merged
11 changes: 6 additions & 5 deletions src/executor/expression/expression_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ void ExpressionEvaluator::Execute(const SharedPtr<AggregateExpression> &expr,
// Create output chunk.
// TODO: Now output chunk is pre-allocate memory in expression state
// TODO: In the future, it can be implemented as on-demand allocation.
SharedPtr<ColumnVector> &child_output = child_state->OutputColumnVector();
Execute(child_expr, child_state, child_output);
SharedPtr<ColumnVector> &child_output_col = child_state->OutputColumnVector();
this->Execute(child_expr, child_state, child_output_col);

if (expr->aggregate_function_.argument_type_ != *child_output->data_type()) {
if (expr->aggregate_function_.argument_type_ != *child_output_col->data_type()) {
Error<ExecutorException>("Argument type isn't matched with the child expression output");
}
if (expr->aggregate_function_.return_type_ != *output_column_vector->data_type()) {
Expand All @@ -93,7 +93,7 @@ void ExpressionEvaluator::Execute(const SharedPtr<AggregateExpression> &expr,
expr->aggregate_function_.init_func_(expr->aggregate_function_.GetState());

// 2. Loop to fill the aggregate state
expr->aggregate_function_.update_func_(expr->aggregate_function_.GetState(), child_output);
expr->aggregate_function_.update_func_(expr->aggregate_function_.GetState(), child_output_col);

// 3. Get the aggregate result and append to output column vector.

Expand Down Expand Up @@ -157,7 +157,8 @@ void ExpressionEvaluator::Execute(const SharedPtr<ValueExpression> &expr,
SharedPtr<ExpressionState> &,
SharedPtr<ColumnVector> &output_column_vector) {
// memory copy here.
output_column_vector->SetValue(0, expr->GetValue());
auto value = expr->GetValue();
output_column_vector->SetValue(0, value);
output_column_vector->Finalize(1);
}

Expand Down
2 changes: 1 addition & 1 deletion src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ UniquePtr<PlanFragment> FragmentBuilder::BuildFragment(PhysicalOperator *phys_op
auto plan_fragment = MakeUnique<PlanFragment>(GetFragmentId());
plan_fragment->SetSinkNode(query_context_ptr_, SinkType::kResult, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
BuildFragments(phys_op, plan_fragment.get());
if (plan_fragment->GetSourceNode() != nullptr) {
if (plan_fragment->GetSourceNode() == nullptr) {
plan_fragment->SetSourceNode(query_context_ptr_, SourceType::kEmpty, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
}
return plan_fragment;
Expand Down
125 changes: 51 additions & 74 deletions src/executor/operator/physical_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
module;

#include <string>
#include <vector>
import stl;
import txn;
import query_context;
Expand Down Expand Up @@ -49,14 +50,15 @@ bool PhysicalAggregate::Execute(QueryContext *query_context, OperatorState *oper
Vector<SharedPtr<ColumnDef>> groupby_columns;
SizeT group_count = groups_.size();

if(group_count == 0) {
if (group_count == 0) {
// Aggregate without group by expression
// e.g. SELECT count(a) FROM table;
if (SimpleAggregate(this->output_, prev_op_state, aggregate_operator_state)) {
return true;
} else {
return false;
auto result = SimpleAggregateExecute(prev_op_state->data_block_array_, aggregate_operator_state->data_block_array_);
prev_op_state->data_block_array_.clear();
if (prev_op_state->Complete()) {
aggregate_operator_state->SetComplete();
}
return result;
}
#if 0
groupby_columns.reserve(group_count);
Expand Down Expand Up @@ -275,17 +277,20 @@ void PhysicalAggregate::GroupByInputTable(const SharedPtr<DataTable> &input_tabl
}
case kVarchar: {
Error<NotImplementException>("Varchar data shuffle isn't implemented.");
// VarcharT &dst_ref = ((VarcharT *)(output_datablock->column_vectors[column_id]->data()))[output_row_idx];
// VarcharT &src_ref = ((VarcharT *)(input_datablocks[input_block_id]->column_vectors[column_id]->data()))[input_offset];
// if (src_ref.IsInlined()) {
// Memcpy((char *)&dst_ref, (char *)&src_ref, sizeof(VarcharT));
// } else {
// dst_ref.length = src_ref.length;
// Memcpy(dst_ref.prefix, src_ref.prefix, VarcharT::PREFIX_LENGTH);
//
// dst_ref.ptr = output_datablock->column_vectors[column_id]->buffer_->fix_heap_mgr_->Allocate(src_ref.length);
// Memcpy(dst_ref.ptr, src_ref.ptr, src_ref.length);
// }
// VarcharT &dst_ref = ((VarcharT
// *)(output_datablock->column_vectors[column_id]->data()))[output_row_idx]; VarcharT &src_ref
// = ((VarcharT
// *)(input_datablocks[input_block_id]->column_vectors[column_id]->data()))[input_offset]; if
// (src_ref.IsInlined()) {
// Memcpy((char *)&dst_ref, (char *)&src_ref, sizeof(VarcharT));
// } else {
// dst_ref.length = src_ref.length;
// Memcpy(dst_ref.prefix, src_ref.prefix, VarcharT::PREFIX_LENGTH);
//
// dst_ref.ptr =
// output_datablock->column_vectors[column_id]->buffer_->fix_heap_mgr_->Allocate(src_ref.length);
// Memcpy(dst_ref.ptr, src_ref.ptr, src_ref.length);
// }
break;
}
case kDate: {
Expand Down Expand Up @@ -404,17 +409,19 @@ void PhysicalAggregate::GenerateGroupByResult(const SharedPtr<DataTable> &input_
}
case kVarchar: {
Error<NotImplementException>("Varchar data shuffle isn't implemented.");
// VarcharT &dst_ref = ((VarcharT *)(output_datablock->column_vectors[column_id]->data()))[block_row_idx];
// VarcharT &src_ref = ((VarcharT *)(input_datablocks[input_block_id]->column_vectors[column_id]->data()))[input_offset];
// if (src_ref.IsInlined()) {
// Memcpy((char *)&dst_ref, (char *)&src_ref, sizeof(VarcharT));
// } else {
// dst_ref.length = src_ref.length;
// Memcpy(dst_ref.prefix, src_ref.prefix, VarcharT::PREFIX_LENGTH);
//
// dst_ref.ptr = output_datablock->column_vectors[column_id]->buffer_->fix_heap_mgr_->Allocate(src_ref.length);
// Memcpy(dst_ref.ptr, src_ref.ptr, src_ref.length);
// }
// VarcharT &dst_ref = ((VarcharT *)(output_datablock->column_vectors[column_id]->data()))[block_row_idx];
// VarcharT &src_ref = ((VarcharT
// *)(input_datablocks[input_block_id]->column_vectors[column_id]->data()))[input_offset]; if
// (src_ref.IsInlined()) {
// Memcpy((char *)&dst_ref, (char *)&src_ref, sizeof(VarcharT));
// } else {
// dst_ref.length = src_ref.length;
// Memcpy(dst_ref.prefix, src_ref.prefix, VarcharT::PREFIX_LENGTH);
//
// dst_ref.ptr =
// output_datablock->column_vectors[column_id]->buffer_->fix_heap_mgr_->Allocate(src_ref.length);
// Memcpy(dst_ref.ptr, src_ref.ptr, src_ref.length);
// }
break;
}
case kDate: {
Expand Down Expand Up @@ -559,14 +566,20 @@ void PhysicalAggregate::GenerateGroupByResult(const SharedPtr<DataTable> &input_
#endif
}

bool PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &output_table,
OperatorState *pre_operator_state,
AggregateOperatorState *aggregate_operator_state) {
bool PhysicalAggregate::SimpleAggregateExecute(const Vector<UniquePtr<DataBlock>> &input_blocks, Vector<UniquePtr<DataBlock>> &output_blocks) {
SizeT aggregates_count = aggregates_.size();
if (aggregates_count <= 0) {
Error<ExecutorException>("Simple Aggregate without aggregate expression.");
}

SizeT input_block_count = input_blocks.size();

if (input_block_count == 0) {
// No input data
LOG_TRACE("No input, no aggregate result");
return true;
}

// Prepare the output table columns
Vector<SharedPtr<ColumnDef>> aggregate_columns;
aggregate_columns.reserve(aggregates_count);
Expand All @@ -579,19 +592,14 @@ bool PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &output_table,
Vector<SharedPtr<DataType>> output_types;
output_types.reserve(aggregates_count);

SizeT input_block_count = pre_operator_state->data_block_array_.size();

for (i64 idx = 0; auto &expr: aggregates_) {
for (i64 idx = 0; auto &expr : aggregates_) {
// expression state
expr_states.emplace_back(ExpressionState::CreateState(expr));

SharedPtr<DataType> output_type = MakeShared<DataType>(expr->Type());

// column definition
SharedPtr<ColumnDef> col_def = MakeShared<ColumnDef>(idx,
output_type,
expr->Name(),
HashSet<ConstraintType>());
SharedPtr<ColumnDef> col_def = MakeShared<ColumnDef>(idx, output_type, expr->Name(), HashSet<ConstraintType>());
aggregate_columns.emplace_back(col_def);

// for output block
Expand All @@ -600,56 +608,25 @@ bool PhysicalAggregate::SimpleAggregate(SharedPtr<DataTable> &output_table,
++idx;
}

if (input_block_count == 0) {
// No input data
LOG_TRACE("No input, no aggregate result");
return true;
}
// Loop blocks

// ExpressionEvaluator evaluator;
// //evaluator.Init(input_table_->data_blocks_);
// for (SizeT expr_idx = 0; expr_idx < aggregates_count; ++expr_idx) {
//
// ExpressionEvaluator evaluator;
// evaluator.Init(aggregates_[])
// Vector<SharedPtr<ColumnVector>> blocks_column;
// blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]);
// evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], blocks_column[expr_idx]);
// if(blocks_column[0].get() != output_data_block->column_vectors[expr_idx].get()) {
// // column vector in blocks column might be changed to the column vector from column reference.
// // This check and assignment is to make sure the right column vector are assign to output_data_block
// output_data_block->column_vectors[expr_idx] = blocks_column[0];
// }
// }
//
// output_data_block->Finalize();

for (SizeT block_idx = 0; block_idx < input_block_count; ++block_idx) {
DataBlock *input_data_block = pre_operator_state->data_block_array_[block_idx].get();
DataBlock *input_data_block = input_blocks[block_idx].get();

output_blocks.emplace_back(DataBlock::MakeUniquePtr());

aggregate_operator_state->data_block_array_.emplace_back(DataBlock::MakeUniquePtr());
DataBlock *output_data_block = aggregate_operator_state->data_block_array_.back().get();
DataBlock *output_data_block = output_blocks.back().get();
output_data_block->Init(*GetOutputTypes());

ExpressionEvaluator evaluator;
evaluator.Init(input_data_block);

SizeT expression_count = aggregates_count;
// Prepare the expression states

// calculate every columns value
for (SizeT expr_idx = 0; expr_idx < expression_count; ++expr_idx) {
// Vector<SharedPtr<ColumnVector>> blocks_column;
// blocks_column.emplace_back(output_data_block->column_vectors[expr_idx]);
LOG_TRACE("Physical aggregate Execute");
evaluator.Execute(aggregates_[expr_idx], expr_states[expr_idx], output_data_block->column_vectors[expr_idx]);
}
output_data_block->Finalize();
}

pre_operator_state->data_block_array_.clear();
if (pre_operator_state->Complete()) {
aggregate_operator_state->SetComplete();
}
return true;
}

Expand Down
9 changes: 4 additions & 5 deletions src/executor/operator/physical_aggregate.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import hash_table;
import base_expression;
import load_meta;
import infinity_exception;
import data_block;

export module physical_aggregate;

Expand All @@ -44,8 +45,8 @@ public:
Vector<SharedPtr<BaseExpression>> aggregates,
u64 aggregate_index,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kAggregate, Move(left), nullptr, id, load_metas), groups_(Move(groups)), aggregates_(Move(aggregates)),
groupby_index_(groupby_index), aggregate_index_(aggregate_index) {}
: PhysicalOperator(PhysicalOperatorType::kAggregate, Move(left), nullptr, id, load_metas), groups_(Move(groups)),
aggregates_(Move(aggregates)), groupby_index_(groupby_index), aggregate_index_(aggregate_index) {}

~PhysicalAggregate() override = default;

Expand All @@ -66,9 +67,7 @@ public:
Vector<SharedPtr<BaseExpression>> aggregates_{};
HashTable hash_table_;

bool SimpleAggregate(SharedPtr<DataTable> &output_table,
OperatorState *pre_operator_state,
AggregateOperatorState *aggregate_operator_state);
bool SimpleAggregateExecute(const Vector<UniquePtr<DataBlock>> &input_blocks, Vector<UniquePtr<DataBlock>> &output_blocks);

inline u64 GroupTableIndex() const { return groupby_index_; }

Expand Down
Loading
Loading