Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 62 additions & 21 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector {true});
}
if (!tnode.intermediate_output_tuple_id_list.empty()) {
DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id";
// common subexpression elimination
DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
tnode.intermediate_projections_list.size());
_intermediate_output_row_descriptor.reserve(tnode.intermediate_output_tuple_id_list.size());
for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
_intermediate_output_row_descriptor.push_back(
RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true}));
}
}

_query_statistics = std::make_shared<QueryStatistics>();
}

Expand All @@ -114,7 +126,15 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections));
}

if (!tnode.intermediate_projections_list.empty()) {
DCHECK(tnode.__isset.projections) << "no final projections";
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
for (const auto& tnode_projections : tnode.intermediate_projections_list) {
vectorized::VExprContextSPtrs projections;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections));
_intermediate_projections.push_back(projections);
}
}
return Status::OK();
}

Expand Down Expand Up @@ -143,7 +163,12 @@ Status ExecNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc()));
for (int i = 0; i < _intermediate_projections.size(); i++) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
intermediate_row_desc(i)));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc()));

for (auto& i : _children) {
RETURN_IF_ERROR(i->prepare(state));
Expand All @@ -155,6 +180,9 @@ Status ExecNode::alloc_resource(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->open(state));
}
for (auto& projections : _intermediate_projections) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
return Status::OK();
}
Expand Down Expand Up @@ -514,6 +542,22 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
const size_t rows = origin_block->rows();
if (rows == 0) {
return Status::OK();
}
vectorized::Block input_block = *origin_block;

std::vector<int> result_column_ids;
for (auto& projections : _intermediate_projections) {
result_column_ids.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
}
input_block.shuffle_columns(result_column_ids);
}

DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
Expand All @@ -535,29 +579,26 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
using namespace vectorized;
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
auto rows = origin_block->rows();

if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
auto& mutable_columns = mutable_block.mutable_columns();

if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error during processing {}, output of projections {} mismatches with "
"exec node output {}",
this->get_name(), _projections.size(), mutable_columns.size());
}
if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error during processing {}, output of projections {} mismatches with "
"exec node output {}",
this->get_name(), _projections.size(), mutable_columns.size());
}

for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(&input_block, &result_column_id));
auto column_ptr = input_block.get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));

return Status::OK();
}
Expand Down
24 changes: 24 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,26 @@ class ExecNode {
return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor;
}
virtual const RowDescriptor& intermediate_row_desc() const { return _row_descriptor; }

// input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2] ... -> final projections -> output expr
// prepare _row_descriptor intermediate_row_desc[0] intermediate_row_desc[1] intermediate_row_desc.end() _output_row_descriptor

[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'intermediate_row_desc' can be made const [readability-make-member-function-const]

Suggested change
[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) const {

if (idx == 0) {
return intermediate_row_desc();
}
DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
return _intermediate_output_row_descriptor[idx - 1];
}

[[nodiscard]] const RowDescriptor& projections_row_desc() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'projections_row_desc' can be made static [readability-convert-member-functions-to-static]

Suggested change
[[nodiscard]] const RowDescriptor& projections_row_desc() const {
[[nodiscard]] static const RowDescriptor& projections_row_desc() {

if (_intermediate_output_row_descriptor.empty()) {
return intermediate_row_desc();
} else {
return _intermediate_output_row_descriptor.back();
}
}

int64_t rows_returned() const { return _num_rows_returned; }
int64_t limit() const { return _limit; }
bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; }
Expand Down Expand Up @@ -270,6 +290,10 @@ class ExecNode {
std::unique_ptr<RowDescriptor> _output_row_descriptor;
vectorized::VExprContextSPtrs _projections;

std::vector<RowDescriptor> _intermediate_output_row_descriptor;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;

/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;

Expand Down
52 changes: 46 additions & 6 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <string>

#include "common/logging.h"
#include "common/status.h"
#include "exec/exec_node.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "pipeline/exec/analytic_sink_operator.h"
Expand Down Expand Up @@ -123,19 +125,32 @@ Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) {
}

// create the projections expr

if (tnode.__isset.projections) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections));
}
if (!tnode.intermediate_projections_list.empty()) {
DCHECK(tnode.__isset.projections) << "no final projections";
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
for (const auto& tnode_projections : tnode.intermediate_projections_list) {
vectorized::VExprContextSPtrs projections;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections));
_intermediate_projections.push_back(projections);
}
}
return Status::OK();
}

Status OperatorXBase::prepare(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc()));
for (int i = 0; i < _intermediate_projections.size(); i++) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
intermediate_row_desc(i)));
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc()));

if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->prepare(state));
Expand All @@ -149,6 +164,9 @@ Status OperatorXBase::open(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->open(state));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
for (auto& projections : _intermediate_projections) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->open(state));
}
Expand All @@ -175,7 +193,22 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
auto* local_state = state->get_local_state(operator_id());
SCOPED_TIMER(local_state->exec_time_counter());
SCOPED_TIMER(local_state->_projection_timer);
const size_t rows = origin_block->rows();
if (rows == 0) {
return Status::OK();
}
vectorized::Block input_block = *origin_block;

std::vector<int> result_column_ids;
for (const auto& projections : _intermediate_projections) {
result_column_ids.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
}
input_block.shuffle_columns(result_column_ids);
}

DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
Expand All @@ -198,15 +231,13 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
vectorized::MutableBlock mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_output_row_descriptor);
auto rows = origin_block->rows();

if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
DCHECK(mutable_columns.size() == local_state->_projections.size());
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, &result_column_id));
auto column_ptr = input_block.get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
Expand Down Expand Up @@ -365,6 +396,15 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
for (size_t i = 0; i < _projections.size(); i++) {
RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i]));
}
_intermediate_projections.resize(_parent->_intermediate_projections.size());
for (int i = 0; i < _parent->_intermediate_projections.size(); i++) {
_intermediate_projections[i].resize(_parent->_intermediate_projections[i].size());
for (int j = 0; j < _parent->_intermediate_projections[i].size(); j++) {
RETURN_IF_ERROR(_parent->_intermediate_projections[i][j]->clone(
state, _intermediate_projections[i][j]));
}
}

_rows_returned_counter =
ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", TUnit::UNIT, 1);
_blocks_returned_counter =
Expand Down
42 changes: 42 additions & 0 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class PipelineXLocalStateBase {
RuntimeState* _state = nullptr;
vectorized::VExprContextSPtrs _conjuncts;
vectorized::VExprContextSPtrs _projections;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;

bool _closed = false;
vectorized::Block _origin_block;
};
Expand All @@ -155,6 +158,22 @@ class OperatorXBase : public OperatorBase {
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true}));
}
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector {true});
}
if (!tnode.intermediate_output_tuple_id_list.empty()) {
DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id";
// common subexpression elimination
DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
tnode.intermediate_projections_list.size());
_intermediate_output_row_descriptor.reserve(
tnode.intermediate_output_tuple_id_list.size());
for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
_intermediate_output_row_descriptor.push_back(
RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true}));
}
}
}

OperatorXBase(ObjectPool* pool, int node_id, int operator_id)
Expand Down Expand Up @@ -247,6 +266,25 @@ class OperatorXBase : public OperatorBase {
return _row_descriptor;
}

// input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2] ... -> final projections -> output expr
// prepare _row_descriptor intermediate_row_desc[0] intermediate_row_desc[1] intermediate_row_desc.end() _output_row_descriptor

[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'intermediate_row_desc' can be made const [readability-make-member-function-const]

Suggested change
[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) const {

if (idx == 0) {
return intermediate_row_desc();
}
DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
return _intermediate_output_row_descriptor[idx - 1];
}

[[nodiscard]] const RowDescriptor& projections_row_desc() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'projections_row_desc' can be made static [readability-convert-member-functions-to-static]

Suggested change
[[nodiscard]] const RowDescriptor& projections_row_desc() const {
[[nodiscard]] static const RowDescriptor& projections_row_desc() {

if (_intermediate_output_row_descriptor.empty()) {
return intermediate_row_desc();
} else {
return _intermediate_output_row_descriptor.back();
}
}

[[nodiscard]] std::string debug_string() const override { return ""; }

virtual std::string debug_string(int indentation_level = 0) const;
Expand Down Expand Up @@ -318,6 +356,10 @@ class OperatorXBase : public OperatorBase {
std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
vectorized::VExprContextSPtrs _projections;

std::vector<RowDescriptor> _intermediate_output_row_descriptor;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;

/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;

Expand Down
9 changes: 9 additions & 0 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,15 @@ void Block::swap(Block&& other) noexcept {
row_same_bit = std::move(other.row_same_bit);
}

void Block::shuffle_columns(const std::vector<int>& result_column_ids) {
Container tmp_data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmp_data(result_column_ids.size())

tmp_data.reserve(result_column_ids.size());
for (const int result_column_id : result_column_ids) {
tmp_data.push_back(data[result_column_id]);
}
swap(Block {tmp_data});
}

void Block::update_hash(SipHash& hash) const {
for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) {
for (const auto& col : data) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ class Block {
void swap(Block& other) noexcept;
void swap(Block&& other) noexcept;

// Shuffle columns in place based on the result_column_ids
void shuffle_columns(const std::vector<int>& result_column_ids);

// Default column size = -1 means clear all column in block
// Else clear column [0, column_size) delete column [column_size, data.size)
void clear_column_data(int column_size = -1) noexcept;
Expand Down
Loading