From 7185e6ecb3b70beb47accf8430b5e0b0cec9dadb Mon Sep 17 00:00:00 2001 From: Zhengguo Yang Date: Fri, 30 Apr 2021 10:58:08 +0800 Subject: [PATCH] add vectorized union node (#22) * add vectorized union node --- be/src/exec/exec_node.cpp | 18 +- be/src/runtime/plan_fragment_executor.cpp | 2 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/core/block.cpp | 112 +++++++++- be/src/vec/core/block.h | 54 ++++- be/src/vec/core/column_with_type_and_name.cpp | 3 + be/src/vec/core/column_with_type_and_name.h | 1 + be/src/vec/data_types/data_type.cpp | 5 + be/src/vec/data_types/data_type.h | 1 + be/src/vec/data_types/data_type_nullable.cpp | 14 +- be/src/vec/data_types/data_type_nullable.h | 1 + .../vec/data_types/data_type_number_base.cpp | 15 +- be/src/vec/data_types/data_type_number_base.h | 1 + be/src/vec/data_types/data_type_string.cpp | 10 +- be/src/vec/data_types/data_type_string.h | 1 + be/src/vec/data_types/data_types_decimal.cpp | 13 +- be/src/vec/data_types/data_types_decimal.h | 1 + be/src/vec/exec/vexcept_node.cpp | 65 ++++++ be/src/vec/exec/vexcept_node.h | 35 +++ be/src/vec/exec/vintersect_node.cpp | 65 ++++++ be/src/vec/exec/vintersect_node.h | 35 +++ be/src/vec/exec/volap_scan_node.cpp | 1 - be/src/vec/exec/vset_operation_node.cpp | 104 +++++++++ be/src/vec/exec/vset_operation_node.h | 71 ++++++ be/src/vec/exec/vunion_node.cpp | 203 ++++++++++++++++++ be/src/vec/exec/vunion_node.h | 83 +++++++ be/src/vec/exprs/vectorized_agg_fn.cpp | 17 ++ be/src/vec/exprs/vectorized_agg_fn.h | 2 + be/src/vec/exprs/vectorized_fn_call.cpp | 19 ++ be/src/vec/exprs/vectorized_fn_call.h | 2 + be/src/vec/exprs/vexpr.cpp | 42 +++- be/src/vec/exprs/vexpr.h | 13 +- be/src/vec/exprs/vliteral.cpp | 3 + be/src/vec/exprs/vslot_ref.cpp | 6 +- be/src/vec/exprs/vslot_ref.h | 1 + be/src/vec/io/io_helper.h | 39 ++++ be/src/vec/runtime/vdata_stream_recvr.cpp | 5 + be/src/vec/runtime/vdata_stream_recvr.h | 1 + be/test/vec/core/block_test.cpp | 44 ++++ 39 files changed, 1081 insertions(+), 29 deletions(-) create mode 100644 be/src/vec/exec/vexcept_node.cpp create mode 100644 be/src/vec/exec/vexcept_node.h create mode 100644 be/src/vec/exec/vintersect_node.cpp create mode 100644 be/src/vec/exec/vintersect_node.h create mode 100644 be/src/vec/exec/vset_operation_node.cpp create mode 100644 be/src/vec/exec/vset_operation_node.h create mode 100644 be/src/vec/exec/vunion_node.cpp create mode 100644 be/src/vec/exec/vunion_node.h diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index d2906c1f4bf8db..a0bd5c008871db 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -61,12 +61,12 @@ #include "runtime/runtime_state.h" #include "util/debug_util.h" #include "util/runtime_profile.h" - #include "vec/core/block.h" #include "vec/exec/vaggregation_node.h" -#include "vec/exec/volap_scan_node.h" #include "vec/exec/vexchange_node.h" +#include "vec/exec/volap_scan_node.h" #include "vec/exec/vsort_node.h" +#include "vec/exec/vunion_node.h" #include "vec/exprs/vexpr.h" namespace doris { @@ -134,8 +134,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl _rows_returned_counter(NULL), _rows_returned_rate(NULL), _memory_used_counter(NULL), - _is_closed(false) { -} + _is_closed(false) {} ExecNode::~ExecNode() {} @@ -165,8 +164,9 @@ void ExecNode::push_down_predicate(RuntimeState* state, std::list* } Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) { - init_runtime_profile(state->enable_vectorized_exec()? "V" + print_plan_node_type(tnode.node_type) : - print_plan_node_type(tnode.node_type)); + init_runtime_profile(state->enable_vectorized_exec() + ? "V" + print_plan_node_type(tnode.node_type) + : print_plan_node_type(tnode.node_type)); if (tnode.__isset.vconjunct) { _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); @@ -456,7 +456,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::UNION_NODE: - *node = pool->add(new UnionNode(pool, tnode, descs)); + if (state->enable_vectorized_exec()) { + *node = pool->add(new vectorized::VUnionNode(pool, tnode, descs)); + } else { + *node = pool->add(new UnionNode(pool, tnode, descs)); + } return Status::OK(); case TPlanNodeType::INTERSECT_NODE: diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 988fa87caa4387..46e6690032dca6 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -323,7 +323,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() { _collect_query_statistics(); Status status; { - boost::lock_guard l(_status_lock); + std::lock_guard l(_status_lock); status = _status; } status = _sink->close(runtime_state(), status); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index d83b1a394e6388..a5d591a5aa68e7 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -61,6 +61,8 @@ set(VEC_FILES exec/vsort_exec_exprs.cpp exec/volap_scanner.cpp exec/vexchange_node.cpp + exec/vset_operation_node.cpp + exec/vunion_node.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp exprs/vexpr.cpp diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 00f494364a35f6..098bcdfe58b36c 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -17,9 +17,11 @@ #include "vec/core/block.h" +#include #include #include +#include "fmt/format.h" #include "gen_cpp/data.pb.h" #include "vec/columns/column_const.h" #include "vec/columns/column_nullable.h" @@ -381,12 +383,51 @@ std::string Block::dumpNames() const { return out.str(); } -std::string Block::dumpData() const { - // WriteBufferFromOwnString out; - std::stringstream out; +std::string Block::dumpData(size_t row_limit) const { + if (rows() == 0) { + return "empty block."; + } + std::vector headers; + std::vector headers_size; for (auto it = data.begin(); it != data.end(); ++it) { - if (it != data.begin()) out << ", "; - out << it->name; + std::string s = fmt::format("{}({})", it->name, it->type->getName()); + headers_size.push_back(s.size() > 15 ? s.size() : 15); + headers.emplace_back(s); + } + + std::stringstream out; + // header upper line + auto line = [&]() { + for (size_t i = 0; i < columns(); ++i) { + out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; + } + out << std::setw(1) << "+" << std::endl; + }; + line(); + // header text + for (size_t i = 0; i < columns(); ++i) { + out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) + << headers[i]; + } + out << std::setw(1) << "|" << std::endl; + // header bottom line + line(); + // content + for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) { + for (size_t i = 0; i < columns(); ++i) { + std::string s = data[i].to_string(row_num); + if (s.length() > headers_size[i]) { + s = s.substr(0, headers_size[i] - 3) + "..."; + } + out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) + << std::right << s; + } + out << std::setw(1) << "|" << std::endl; + } + // bottom line + line(); + if (row_limit < rows()) { + out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; } return out.str(); } @@ -658,6 +699,12 @@ void Block::swap(Block& other) noexcept { index_by_name.swap(other.index_by_name); } +void Block::swap(Block&& other) noexcept { + clear(); + data = std::move(other.data); + initializeIndexByName(); +} + void Block::updateHash(SipHash& hash) const { for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) for (const auto& col : data) col.column->updateHashWithValue(row_no, hash); @@ -696,9 +743,11 @@ void Block::serialize(PBlock* pblock) const { } } -int MutableBlock::rows() { +size_t MutableBlock::rows() const { for (const auto& column : _columns) - if (column) return column->size(); + if (column) { + return column->size(); + } return 0; } @@ -715,7 +764,54 @@ Block MutableBlock::to_block() { for (int i = 0; i < _columns.size(); ++i) { columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], ""); } - return columns_with_schema; + return {columns_with_schema}; } +std::string MutableBlock::dumpData(size_t row_limit) const { + if (rows() == 0) { + return "empty block."; + } + std::vector headers; + std::vector headers_size; + for (size_t i = 0; i < columns(); ++i) { + std::string s = _data_types[i]->getName(); + headers_size.push_back(s.size() > 15 ? s.size() : 15); + headers.emplace_back(s); + } + std::stringstream out; + // header upper line + auto line = [&]() { + for (size_t i = 0; i < columns(); ++i) { + out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-"; + } + out << std::setw(1) << "+" << std::endl; + }; + line(); + // header text + for (size_t i = 0; i < columns(); ++i) { + out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i]) + << headers[i]; + } + out << std::setw(1) << "|" << std::endl; + // header bottom line + line(); + // content + for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) { + for (size_t i = 0; i < columns(); ++i) { + std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num); + if (s.length() > headers_size[i]) { + s = s.substr(0, headers_size[i] - 3) + "..."; + } + out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) + << std::right << s; + } + out << std::setw(1) << "|" << std::endl; + } + // bottom line + line(); + if (row_limit < rows()) { + out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl; + } + return out.str(); +} } // namespace doris::vectorized diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 779f57cae41266..cd0a7fc56405be 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -147,6 +147,7 @@ class Block { void clear(); void swap(Block& other) noexcept; + void swap(Block&& other) noexcept; /** Updates SipHash of the Block, using update method of columns. * Returns hash for block, that could be used to differentiate blocks @@ -155,7 +156,7 @@ class Block { void updateHash(SipHash& hash) const; /** Get block data in string. */ - std::string dumpData() const; + std::string dumpData(size_t row_limit = 100) const; static void filter_block(Block* block, int filter_conlumn_id, int column_to_keep); // serialize block to PRowBatch @@ -191,8 +192,11 @@ class MutableBlock { MutableBlock(MutableColumns&& columns, DataTypes&& data_types) : _columns(std::move(columns)), _data_types(std::move(data_types)) {} + MutableBlock(Block* block) + : _columns(block->mutateColumns()), _data_types(block->getDataTypes()) {} - int rows(); + size_t rows() const; + size_t columns() const { return _columns.size(); } bool empty() { return rows() == 0; } @@ -200,9 +204,55 @@ class MutableBlock { DataTypes& data_types() { return _data_types; } + void merge(Block&& block) { + if (_columns.size() == 0 && _data_types.size() == 0) { + _data_types = std::move(block.getDataTypes()); + _columns.resize(block.columns()); + for (size_t i = 0; i < block.columns(); ++i) { + if (block.getByPosition(i).column) { + _columns[i] = + (*std::move( + block.getByPosition(i).column->convertToFullColumnIfConst())) + .mutate(); + } else { + _columns[i] = _data_types[i]->createColumn(); + } + } + } else { + for (int i = 0; i < _columns.size(); ++i) { + _columns[i]->insertRangeFrom( + *block.getByPosition(i).column->convertToFullColumnIfConst().get(), 0, + block.rows()); + } + } + } + void merge(Block& block) { + if (_columns.size() == 0 && _data_types.size() == 0) { + _data_types = block.getDataTypes(); + _columns.resize(block.columns()); + for (size_t i = 0; i < block.columns(); ++i) { + if (block.getByPosition(i).column) { + _columns[i] = + (*std::move( + block.getByPosition(i).column->convertToFullColumnIfConst())) + .mutate(); + } else { + _columns[i] = _data_types[i]->createColumn(); + } + } + } else { + for (int i = 0; i < _columns.size(); ++i) { + _columns[i]->insertRangeFrom( + *block.getByPosition(i).column->convertToFullColumnIfConst().get(), 0, + block.rows()); + } + } + } + Block to_block(); void add_row(const Block* block, int row); + std::string dumpData(size_t row_limit = 100) const; void clear() { _columns.clear(); diff --git a/be/src/vec/core/column_with_type_and_name.cpp b/be/src/vec/core/column_with_type_and_name.cpp index 0f867d68fda2e3..4b2a4cedd760b1 100644 --- a/be/src/vec/core/column_with_type_and_name.cpp +++ b/be/src/vec/core/column_with_type_and_name.cpp @@ -62,5 +62,8 @@ String ColumnWithTypeAndName::dumpStructure() const { dumpStructure(out); return out.str(); } +std::string ColumnWithTypeAndName::to_string(size_t row_num) const { + return type->to_string(*column.get(), row_num); +} } // namespace doris::vectorized diff --git a/be/src/vec/core/column_with_type_and_name.h b/be/src/vec/core/column_with_type_and_name.h index df88333001a189..fbd6da82d13ac8 100644 --- a/be/src/vec/core/column_with_type_and_name.h +++ b/be/src/vec/core/column_with_type_and_name.h @@ -47,6 +47,7 @@ struct ColumnWithTypeAndName { void dumpStructure(std::ostream& out) const; String dumpStructure() const; + std::string to_string(size_t row_num) const; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type.cpp b/be/src/vec/data_types/data_type.cpp index 207234a4995b72..6ba83d20789309 100644 --- a/be/src/vec/data_types/data_type.cpp +++ b/be/src/vec/data_types/data_type.cpp @@ -127,6 +127,11 @@ void IDataType::to_string(const IColumn& column, size_t row_num, BufferWritable& ErrorCodes::NOT_IMPLEMENTED); } +std::string IDataType::to_string(const IColumn& column, size_t row_num) const { + throw Exception("Data type " + getName() + "to_string not implement.", + ErrorCodes::NOT_IMPLEMENTED); +} + void IDataType::insertDefaultInto(IColumn& column) const { column.insertDefault(); } diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h index 07161c26f60eb9..145765663a008c 100644 --- a/be/src/vec/data_types/data_type.h +++ b/be/src/vec/data_types/data_type.h @@ -75,6 +75,7 @@ class IDataType : private boost::noncopyable { virtual TypeIndex getTypeId() const = 0; virtual void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const; + virtual std::string to_string(const IColumn& column, size_t row_num) const; /** Binary serialization for range of values in column - for writing to disk/network, etc. * diff --git a/be/src/vec/data_types/data_type_nullable.cpp b/be/src/vec/data_types/data_type_nullable.cpp index 43afe2b6576498..c6ff526cb18f11 100644 --- a/be/src/vec/data_types/data_type_nullable.cpp +++ b/be/src/vec/data_types/data_type_nullable.cpp @@ -44,6 +44,17 @@ bool DataTypeNullable::onlyNull() const { return typeid_cast(nested_data_type.get()); } +std::string DataTypeNullable::to_string(const IColumn& column, size_t row_num) const { + const ColumnNullable& col = + assert_cast(*column.convertToFullColumnIfConst().get()); + + if (col.isNullAt(row_num)) { + return "\\N"; + } else { + return nested_data_type->to_string(col.getNestedColumn(), row_num); + } +} + // void DataTypeNullable::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const // { // path.push_back(Substream::NullMap); @@ -477,7 +488,8 @@ bool DataTypeNullable::onlyNull() const { // } void DataTypeNullable::serialize(const IColumn& column, PColumn* pcolumn) const { - const ColumnNullable& col = assert_cast(column); + const ColumnNullable& col = + assert_cast(*column.convertToFullColumnIfConst().get()); for (size_t i = 0; i < column.size(); ++i) { bool is_null = col.isNullAt(i); pcolumn->add_is_null(is_null); diff --git a/be/src/vec/data_types/data_type_nullable.h b/be/src/vec/data_types/data_type_nullable.h index 3f800c2b1ed522..887633538e52f7 100644 --- a/be/src/vec/data_types/data_type_nullable.h +++ b/be/src/vec/data_types/data_type_nullable.h @@ -132,6 +132,7 @@ class DataTypeNullable final : public IDataType { bool canBeInsideLowCardinality() const override { return nested_data_type->canBeInsideLowCardinality(); } + std::string to_string(const IColumn& column, size_t row_num) const; const DataTypePtr& getNestedType() const { return nested_data_type; } diff --git a/be/src/vec/data_types/data_type_number_base.cpp b/be/src/vec/data_types/data_type_number_base.cpp index 37162dd28abf8b..e5652f2b7ca4cb 100644 --- a/be/src/vec/data_types/data_type_number_base.cpp +++ b/be/src/vec/data_types/data_type_number_base.cpp @@ -39,6 +39,17 @@ void DataTypeNumberBase::to_string(const IColumn& column, size_t row_num, } } +template +std::string DataTypeNumberBase::to_string(const IColumn& column, size_t row_num) const { + if constexpr (std::is_same::value || std::is_same::value) { + // write int 128 + } else if constexpr (std::is_integral::value || std::numeric_limits::is_iec559) { + return std::to_string( + assert_cast&>(*column.convertToFullColumnIfConst().get()) + .getData()[row_num]); + } +} + // template // void DataTypeNumberBase::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const // { @@ -256,7 +267,9 @@ template void DataTypeNumberBase::serialize(const IColumn& column, PColumn* pcolumn) const { std::ostringstream buf; for (size_t i = 0; i < column.size(); ++i) { - const FieldType& x = assert_cast&>(column).getData()[i]; + const FieldType& x = + assert_cast&>(*column.convertToFullColumnIfConst().get()) + .getData()[i]; writeBinary(x, buf); } write_binary(buf, pcolumn); diff --git a/be/src/vec/data_types/data_type_number_base.h b/be/src/vec/data_types/data_type_number_base.h index 033c4f8ab6ab7b..51c84b7dc4f843 100644 --- a/be/src/vec/data_types/data_type_number_base.h +++ b/be/src/vec/data_types/data_type_number_base.h @@ -74,6 +74,7 @@ class DataTypeNumberBase : public IDataType { bool canBeInsideLowCardinality() const override { return true; } void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const; + std::string to_string(const IColumn& column, size_t row_num) const; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_string.cpp b/be/src/vec/data_types/data_type_string.cpp index 5e74d5fee0b0fb..d39ea9535e6a33 100644 --- a/be/src/vec/data_types/data_type_string.cpp +++ b/be/src/vec/data_types/data_type_string.cpp @@ -247,10 +247,18 @@ static inline void read(IColumn& column, Reader&& reader) { } } +std::string DataTypeString::to_string(const IColumn& column, size_t row_num) const { + const StringRef& s = + assert_cast(*column.convertToFullColumnIfConst().get()) + .getDataAt(row_num); + return s.toString(); +} + void DataTypeString::serialize(const IColumn& column, PColumn* pcolumn) const { std::ostringstream buf; for (size_t i = 0; i < column.size(); ++i) { - const auto& s = assert_cast(column).getDataAt(i); + const auto& s = assert_cast(*column.convertToFullColumnIfConst().get()) + .getDataAt(i); writeStringBinary(s, buf); } write_binary(buf, pcolumn); diff --git a/be/src/vec/data_types/data_type_string.h b/be/src/vec/data_types/data_type_string.h index 2bea631e7a6f90..fd0bef02edb7f8 100644 --- a/be/src/vec/data_types/data_type_string.h +++ b/be/src/vec/data_types/data_type_string.h @@ -76,6 +76,7 @@ class DataTypeString final : public IDataType { bool isCategorial() const override { return true; } bool canBeInsideNullable() const override { return true; } bool canBeInsideLowCardinality() const override { return true; } + std::string to_string(const IColumn& column, size_t row_num) const; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_types_decimal.cpp b/be/src/vec/data_types/data_types_decimal.cpp index b2d657b9b36d40..9cbd858fec61c8 100644 --- a/be/src/vec/data_types/data_types_decimal.cpp +++ b/be/src/vec/data_types/data_types_decimal.cpp @@ -52,6 +52,15 @@ bool DataTypeDecimal::equals(const IDataType& rhs) const { return false; } +template +std::string DataTypeDecimal::to_string(const IColumn& column, size_t row_num) const { + T value = assert_cast(*column.convertToFullColumnIfConst().get()) + .getData()[row_num]; + std::ostringstream buf; + writeText(value, scale, buf); + return buf.str(); +} + //template //void DataTypeDecimal::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const //{ @@ -190,7 +199,9 @@ template void DataTypeDecimal::serialize(const IColumn& column, PColumn* pcolumn) const { std::ostringstream buf; for (size_t i = 0; i < column.size(); ++i) { - const FieldType& x = assert_cast(column).getElement(i); + const FieldType& x = + assert_cast(*column.convertToFullColumnIfConst().get()) + .getElement(i); writeBinary(x, buf); } diff --git a/be/src/vec/data_types/data_types_decimal.h b/be/src/vec/data_types/data_types_decimal.h index 766997cf58e9e0..a6cf5e08e44915 100644 --- a/be/src/vec/data_types/data_types_decimal.h +++ b/be/src/vec/data_types/data_types_decimal.h @@ -151,6 +151,7 @@ class DataTypeDecimal final : public IDataType { bool isSummable() const override { return true; } bool canBeUsedInBooleanContext() const override { return true; } bool canBeInsideNullable() const override { return true; } + std::string to_string(const IColumn& column, size_t row_num) const; /// Decimal specific diff --git a/be/src/vec/exec/vexcept_node.cpp b/be/src/vec/exec/vexcept_node.cpp new file mode 100644 index 00000000000000..ec028477306aab --- /dev/null +++ b/be/src/vec/exec/vexcept_node.cpp @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vexcept_node.h" + +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { +namespace vectorized { +VExceptNode::VExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : VSetOperationNode(pool, tnode, descs) {} + +Status VExceptNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(VSetOperationNode::init(tnode, state)); + DCHECK(tnode.__isset.union_node); + return Status::OK(); +} + +Status VExceptNode::prepare(RuntimeState* state) { + return VSetOperationNode::prepare(state); +} + +Status VExceptNode::open(RuntimeState* state) { + return VSetOperationNode::open(state); +} + +Status VExceptNode::get_next(RuntimeState* state, Block* block, bool* eos) { + return Status::NotSupported("Not Implemented."); +} + +Status VExceptNode::close(RuntimeState* state) { + return VSetOperationNode::close(state); +} + +void VExceptNode::debug_string(int indentation_level, std::stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << " _child_expr_lists=["; + for (int i = 0; i < _child_expr_lists.size(); ++i) { + *out << VExpr::debug_string(_child_expr_lists[i]) << ", "; + } + *out << "] \n"; + ExecNode::debug_string(indentation_level, out); + *out << ")" << std::endl; +} +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/vexcept_node.h b/be/src/vec/exec/vexcept_node.h new file mode 100644 index 00000000000000..7d7fb1bf0515c7 --- /dev/null +++ b/be/src/vec/exec/vexcept_node.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/exec/vset_operation_node.h" + +namespace doris { +namespace vectorized { + +class VExceptNode : public VSetOperationNode { +public: + VExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); + virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state); + virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos); + virtual Status close(RuntimeState* state); +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/vintersect_node.cpp b/be/src/vec/exec/vintersect_node.cpp new file mode 100644 index 00000000000000..734d1f933dd693 --- /dev/null +++ b/be/src/vec/exec/vintersect_node.cpp @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vintersect_node.h" + +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { +namespace vectorized { + +VIntersectNode::VIntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : VSetOperationNode(pool, tnode, descs) {} +Status VIntersectNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(VSetOperationNode::init(tnode, state)); + DCHECK(tnode.__isset.union_node); + return Status::OK(); +} + +Status VIntersectNode::prepare(RuntimeState* state) { + return VSetOperationNode::prepare(state); +} + +Status VIntersectNode::open(RuntimeState* state) { + return VSetOperationNode::open(state); +} + +Status VIntersectNode::get_next(RuntimeState* state, Block* block, bool* eos) { + return Status::NotSupported("Not Implemented."); +} + +Status VIntersectNode::close(RuntimeState* state) { + return VSetOperationNode::close(state); +} + +void VIntersectNode::debug_string(int indentation_level, std::stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << " _child_expr_lists=["; + for (int i = 0; i < _child_expr_lists.size(); ++i) { + *out << VExpr::debug_string(_child_expr_lists[i]) << ", "; + } + *out << "] \n"; + ExecNode::debug_string(indentation_level, out); + *out << ")" << std::endl; +} +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/vintersect_node.h b/be/src/vec/exec/vintersect_node.h new file mode 100644 index 00000000000000..6a8dedb33da383 --- /dev/null +++ b/be/src/vec/exec/vintersect_node.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/exec/vset_operation_node.h" + +namespace doris { +namespace vectorized { + +class VIntersectNode : public VSetOperationNode { +public: + VIntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); + virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state); + virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos); + virtual Status close(RuntimeState* state); +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 580b8a6182b315..25f6734a9a2400 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -489,7 +489,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { // reach scan node limit if (reached_limit()) { int num_rows_over = _num_rows_returned - _limit; - // TODO (yangzhg) impliments limit block->set_num_rows(block->rows() - num_rows_over); _num_rows_returned -= num_rows_over; diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp new file mode 100644 index 00000000000000..0502fa8e9812c3 --- /dev/null +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vset_operation_node.h" + +#include "vec/exprs/vexpr.h" + +namespace doris { + +namespace vectorized { + +VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs), + _const_expr_list_idx(0), + _child_idx(0), + _child_row_idx(0), + _child_eos(false), + _to_close_child_idx(-1) {} + +Status VSetOperationNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + for (auto& exprs : _const_expr_lists) { + VExpr::close(exprs, state); + } + for (auto& exprs : _child_expr_lists) { + VExpr::close(exprs, state); + } + return ExecNode::close(state); +} +Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + DCHECK_EQ(_conjunct_ctxs.size(), 0); + // Create const_expr_ctx_lists_ from thrift exprs. + auto& const_texpr_lists = tnode.union_node.const_expr_lists; + for (auto& texprs : const_texpr_lists) { + std::vector ctxs; + RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, texprs, &ctxs)); + _const_expr_lists.push_back(ctxs); + } + // Create result_expr_ctx_lists_ from thrift exprs. + auto& result_texpr_lists = tnode.union_node.result_expr_lists; + for (auto& texprs : result_texpr_lists) { + std::vector ctxs; + RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, texprs, &ctxs)); + _child_expr_lists.push_back(ctxs); + } + return Status::OK(); +} + +Status VSetOperationNode::open(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + // open const expr lists. + for (const std::vector& exprs : _const_expr_lists) { + RETURN_IF_ERROR(VExpr::open(exprs, state)); + } + // open result expr lists. + for (const std::vector& exprs : _child_expr_lists) { + RETURN_IF_ERROR(VExpr::open(exprs, state)); + } + + // Ensures that rows are available for clients to fetch after this open() has + // succeeded. + if (!_children.empty()) RETURN_IF_ERROR(child(_child_idx)->open(state)); + + return Status::OK(); +} +Status VSetOperationNode::prepare(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::prepare(state)); + _materialize_exprs_evaluate_timer = + ADD_TIMER(_runtime_profile, "MaterializeExprsEvaluateTimer"); + // Prepare const expr lists. + for (const std::vector& exprs : _const_expr_lists) { + RETURN_IF_ERROR(VExpr::prepare(exprs, state, row_desc(), expr_mem_tracker())); + } + + // Prepare result expr lists. + for (int i = 0; i < _child_expr_lists.size(); ++i) { + RETURN_IF_ERROR(VExpr::prepare(_child_expr_lists[i], state, child(i)->row_desc(), + expr_mem_tracker())); + } + return Status::OK(); +} + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h new file mode 100644 index 00000000000000..1f81c58f88e016 --- /dev/null +++ b/be/src/vec/exec/vset_operation_node.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "codegen/doris_ir.h" +#include "exec/exec_node.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" + +namespace doris { + +namespace vectorized { + +class VSetOperationNode : public ExecNode { +public: + VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); + virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state); + virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { + return Status::NotSupported("Not Implemented get RowBatch in vecorized execution."); + } + virtual Status close(RuntimeState* state); + virtual void debug_string(int indentation_level, std::stringstream* out) const {}; + +protected: + /// Const exprs materialized by this node. These exprs don't refer to any children. + /// Only materialized by the first fragment instance to avoid duplication. + std::vector> _const_expr_lists; + + /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. + std::vector> _child_expr_lists; + + /// Index of current const result expr list. + int _const_expr_list_idx; + + /// Index of current child. + int _child_idx; + + /// Index of current row in child_row_block_. + int _child_row_idx; + + /// Saved from the last to GetNext() on the current child. + bool _child_eos; + + /// Index of the child that needs to be closed on the next GetNext() call. Should be set + /// to -1 if no child needs to be closed. + int _to_close_child_idx; + + // Time spent to evaluates exprs and materializes the results + RuntimeProfile::Counter* _materialize_exprs_evaluate_timer = nullptr; +}; + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp new file mode 100644 index 00000000000000..310e160f75b5f5 --- /dev/null +++ b/be/src/vec/exec/vunion_node.cpp @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vunion_node.h" + +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris { + +namespace vectorized { + +VUnionNode::VUnionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : VSetOperationNode(pool, tnode, descs), + _first_materialized_child_idx(tnode.union_node.first_materialized_child_idx) {} + +Status VUnionNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(VSetOperationNode::init(tnode, state)); + DCHECK(tnode.__isset.union_node); + return Status::OK(); +} + +Status VUnionNode::prepare(RuntimeState* state) { + return VSetOperationNode::prepare(state); +} + +Status VUnionNode::open(RuntimeState* state) { + return VSetOperationNode::open(state); +} + +Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) { + DCHECK(!reached_limit()); + DCHECK(!is_in_subplan()); + DCHECK_LT(_child_idx, _children.size()); + DCHECK(is_child_passthrough(_child_idx)); + if (_child_eos) { + RETURN_IF_ERROR(child(_child_idx)->open(state)); + _child_eos = false; + } + DCHECK_EQ(block->rows(), 0); + RETURN_IF_ERROR(child(_child_idx)->get_next(state, block, &_child_eos)); + if (_child_eos) { + // Even though the child is at eos, it's not OK to close() it here. Once we close + // the child, the row batches that it produced are invalid. Marking the batch as + // needing a deep copy let's us safely close the child in the next get_next() call. + // TODO: Remove this as part of IMPALA-4179. + _to_close_child_idx = _child_idx; + ++_child_idx; + } + return Status::OK(); +} + +Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { + // Fetch from children, evaluate corresponding exprs and materialize. + DCHECK(!reached_limit()); + DCHECK_LT(_child_idx, _children.size()); + MutableBlock mblock; + while (has_more_materialized() && mblock.rows() <= state->batch_size()) { + // The loop runs until we are either done iterating over the children that require + // materialization, or the row batch is at capacity. + DCHECK(!is_child_passthrough(_child_idx)); + // Child row batch was either never set or we're moving on to a different child. + DCHECK_LT(_child_idx, _children.size()); + Block child_block; + // open the current child unless it's the first child, which was already opened in + // VUnionNode::open(). + if (_child_eos) { + RETURN_IF_ERROR(child(_child_idx)->open(state)); + _child_eos = false; + _child_row_idx = 0; + } + // The first batch from each child is always fetched here. + RETURN_IF_ERROR(child(_child_idx)->get_next(state, &child_block, &_child_eos)); + SCOPED_TIMER(_materialize_exprs_evaluate_timer); + if (child_block.rows() > 0) { + mblock.merge(materialize_block(&child_block)); + } + // It shouldn't be the case that we reached the limit because we shouldn't have + // incremented '_num_rows_returned' yet. + DCHECK(!reached_limit()); + if (_child_eos && _child_row_idx == mblock.rows()) { + // Unless we are inside a subplan expecting to call open()/get_next() on the child + // again, the child can be closed at this point. + if (!is_in_subplan()) { + child(_child_idx)->close(state); + } + ++_child_idx; + } + } + block->swap(mblock.to_block()); + + DCHECK_LE(_child_idx, _children.size()); + return Status::OK(); +} + +Status VUnionNode::get_next_const(RuntimeState* state, Block* block) { + DCHECK_EQ(state->per_fragment_instance_idx(), 0); + DCHECK_LT(_const_expr_list_idx, _const_expr_lists.size()); + MutableBlock mblock; + for (; _const_expr_list_idx < _const_expr_lists.size(); ++_const_expr_list_idx) { + Block tmp_block; + for (size_t i = 0; i < _const_expr_lists[_const_expr_list_idx].size(); ++i) { + int result_column_num = -1; + _const_expr_lists[_const_expr_list_idx][i]->execute(&tmp_block, &result_column_num); + } + mblock.merge(tmp_block); + } + block->swap(mblock.to_block()); + return Status::OK(); +} + +Status VUnionNode::get_next(RuntimeState* state, Block* block, bool* eos) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); + RETURN_IF_CANCELLED(state); + // TODO(zc) + // RETURN_IF_ERROR(QueryMaintenance(state)); + + if (_to_close_child_idx != -1) { + // The previous child needs to be closed if passthrough was enabled for it. In the non + // passthrough case, the child was already closed in the previous call to get_next(). + DCHECK(is_child_passthrough(_to_close_child_idx)); + DCHECK(!is_in_subplan()); + child(_to_close_child_idx)->close(state); + _to_close_child_idx = -1; + } + + // Save the number of rows in case get_next() is called with a non-empty batch, which can + // happen in a subplan. + int num_rows_before = block->rows(); + + if (has_more_passthrough()) { + RETURN_IF_ERROR(get_next_pass_through(state, block)); + } else if (has_more_materialized()) { + RETURN_IF_ERROR(get_next_materialized(state, block)); + } else if (has_more_const(state)) { + RETURN_IF_ERROR(get_next_const(state, block)); + } + + int num_rows_added = block->rows() - num_rows_before; + DCHECK_GE(num_rows_added, 0); + if (_limit != -1 && _num_rows_returned + num_rows_added > _limit) { + // Truncate the row batch if we went over the limit. + num_rows_added = _limit - _num_rows_returned; + block->set_num_rows(num_rows_before + num_rows_added); + DCHECK_GE(num_rows_added, 0); + } + _num_rows_returned += num_rows_added; + + *eos = reached_limit() || + (!has_more_passthrough() && !has_more_materialized() && !has_more_const(state)); + + COUNTER_SET(_rows_returned_counter, _num_rows_returned); + return Status::OK(); +} + +Status VUnionNode::close(RuntimeState* state) { + return VSetOperationNode::close(state); +} + +void VUnionNode::debug_string(int indentation_level, std::stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << "_union(_first_materialized_child_idx=" << _first_materialized_child_idx + << " _child_expr_lists=["; + for (int i = 0; i < _child_expr_lists.size(); ++i) { + *out << VExpr::debug_string(_child_expr_lists[i]) << ", "; + } + *out << "] \n"; + ExecNode::debug_string(indentation_level, out); + *out << ")" << std::endl; +} +Block VUnionNode::materialize_block(Block* src_block) { + const std::vector& child_exprs = _child_expr_lists[_child_idx]; + ColumnsWithTypeAndName colunms; + for (size_t i = 0; i < child_exprs.size(); ++i) { + int result_column_id = -1; + child_exprs[i]->execute(src_block, &result_column_id); + colunms.emplace_back(src_block->getByPosition(result_column_id)); + } + _child_row_idx += src_block->rows(); + return {colunms}; +} + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h new file mode 100644 index 00000000000000..d7f3c5e0029041 --- /dev/null +++ b/be/src/vec/exec/vunion_node.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/exec/vset_operation_node.h" + +namespace doris { +namespace vectorized { + +class VUnionNode : public VSetOperationNode { +public: + VUnionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); + virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state); + virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos); + virtual Status close(RuntimeState* state); + +private: + /// Index of the first non-passthrough child; i.e. a child that needs materialization. + /// 0 when all children are materialized, '_children.size()' when no children are + /// materialized. + const int _first_materialized_child_idx; + + /// GetNext() for the passthrough case. We pass 'block' directly into the GetNext() + /// call on the child. + Status get_next_pass_through(RuntimeState* state, Block* block); + + /// GetNext() for the materialized case. Materializes and evaluates rows from each + /// non-passthrough child. + Status get_next_materialized(RuntimeState* state, Block* block); + + /// GetNext() for the constant expression case. + Status get_next_const(RuntimeState* state, Block* block); + + /// Evaluates exprs for the current child and materializes the results into 'tuple_buf', + /// which is attached to 'dst_block'. Runs until 'dst_block' is at capacity, or all rows + /// have been consumed from the current child block. Updates '_child_row_idx'. + Block materialize_block(Block* dst_block); + + Status get_error_msg(const std::vector& exprs); + + /// Returns true if the child at 'child_idx' can be passed through. + bool is_child_passthrough(int child_idx) const { + DCHECK_LT(child_idx, _children.size()); + return child_idx < _first_materialized_child_idx; + } + + /// Returns true if there are still rows to be returned from passthrough children. + bool has_more_passthrough() const { return _child_idx < _first_materialized_child_idx; } + + /// Returns true if there are still rows to be returned from children that need + /// materialization. + bool has_more_materialized() const { + return _first_materialized_child_idx != _children.size() && _child_idx < _children.size(); + } + + /// Returns true if there are still rows to be returned from constant expressions. + bool has_more_const(const RuntimeState* state) const { + return state->per_fragment_instance_idx() == 0 && + _const_expr_list_idx < _const_expr_lists.size(); + } + + virtual void debug_string(int indentation_level, std::stringstream* out) const; +}; + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index d80ca04a541fdc..b4281ddcbcf546 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -131,5 +131,22 @@ void AggFnEvaluator::execute_single_merge(AggregateDataPtr place, ConstAggregate void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn* column) { _function->insertResultInto(place, *column); } +std::string AggFnEvaluator::debug_string(const std::vector& exprs) { + std::stringstream out; + out << "["; + for (int i = 0; i < exprs.size(); ++i) { + out << (i == 0 ? "" : " ") << exprs[i]->debug_string(); + } + + out << "]"; + return out.str(); +} + +std::string AggFnEvaluator::debug_string() const { + std::stringstream out; + out << "AggFnEvaluator("; + out << ")"; + return out.str(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index a900b98a7f294d..4da46e18381a7b 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -57,6 +57,8 @@ class AggFnEvaluator { DataTypePtr& data_type() { return _data_type; } const AggregateFunctionPtr& function() { return _function; } + static std::string debug_string(const std::vector& exprs); + std::string debug_string() const; private: const TFunction _fn; diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp index ce04e50b557251..26fe84f2eca772 100644 --- a/be/src/vec/exprs/vectorized_fn_call.cpp +++ b/be/src/vec/exprs/vectorized_fn_call.cpp @@ -78,4 +78,23 @@ doris::Status VectorizedFnCall::execute(doris::vectorized::Block* block, int* re const std::string& VectorizedFnCall::expr_name() const { return _expr_name; } +std::string VectorizedFnCall::debug_string() const { + std::stringstream out; + out << "VAggFn("; + for (VExpr* input_expr : children()) { + out << " " << input_expr->debug_string() << ")"; + } + out << ")"; + return out.str(); +} + +std::string VectorizedFnCall::debug_string(const std::vector& agg_fns) { + std::stringstream out; + out << "["; + for (int i = 0; i < agg_fns.size(); ++i) { + out << (i == 0 ? "" : " ") << agg_fns[i]->debug_string(); + } + out << "]"; + return out.str(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vectorized_fn_call.h b/be/src/vec/exprs/vectorized_fn_call.h index 043720e035c96b..5419d89f1fadd4 100644 --- a/be/src/vec/exprs/vectorized_fn_call.h +++ b/be/src/vec/exprs/vectorized_fn_call.h @@ -33,6 +33,8 @@ class VectorizedFnCall final : public VExpr { return pool->add(new VectorizedFnCall(*this)); } virtual const std::string& expr_name() const override; + virtual std::string debug_string() const; + static std::string debug_string(const std::vector& exprs); private: FunctionBasePtr _function; diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 31c72cab40ff43..9ba5912288a6ab 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -187,14 +187,14 @@ Status VExpr::open(const std::vector& ctxs, RuntimeState* state) } Status VExpr::clone_if_not_exists(const std::vector& ctxs, RuntimeState* state, - std::vector* new_ctxs) { + std::vector* new_ctxs) { DCHECK(new_ctxs != NULL); if (!new_ctxs->empty()) { // 'ctxs' was already cloned into '*new_ctxs', nothing to do. DCHECK_EQ(new_ctxs->size(), ctxs.size()); -// for (int i = 0; i < new_ctxs->size(); ++i) { -// DCHECK((*new_ctxs)[i]->_is_clone); -// } + // for (int i = 0; i < new_ctxs->size(); ++i) { + // DCHECK((*new_ctxs)[i]->_is_clone); + // } return Status::OK(); } new_ctxs->resize(ctxs.size()); @@ -203,4 +203,38 @@ Status VExpr::clone_if_not_exists(const std::vector& ctxs, Runtim } return Status::OK(); } +std::string VExpr::debug_string() const { + // TODO: implement partial debug string for member vars + std::stringstream out; + out << " type=" << _type.debug_string(); + out << " codegen=" + << "false"; + + if (!_children.empty()) { + out << " children=" << debug_string(_children); + } + + return out.str(); +} + +std::string VExpr::debug_string(const std::vector& exprs) { + std::stringstream out; + out << "["; + + for (int i = 0; i < exprs.size(); ++i) { + out << (i == 0 ? "" : " ") << exprs[i]->debug_string(); + } + + out << "]"; + return out.str(); +} + +std::string VExpr::debug_string(const std::vector& ctxs) { + std::vector exprs; + for (int i = 0; i < ctxs.size(); ++i) { + exprs.push_back(ctxs[i]->root()); + } + return debug_string(exprs); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index d231c2d2db743c..075b8095e62057 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -62,7 +62,7 @@ class VExpr { static Status open(const std::vector& ctxs, RuntimeState* state); static Status clone_if_not_exists(const std::vector& ctxs, RuntimeState* state, - std::vector* new_ctxs); + std::vector* new_ctxs); static void close(const std::vector& ctxs, RuntimeState* state); @@ -75,8 +75,19 @@ class VExpr { static Status create_tree_from_thrift(ObjectPool* pool, const std::vector& nodes, VExpr* parent, int* node_idx, VExpr** root_expr, VExprContext** ctx); + const std::vector& children() const { return _children; } + virtual std::string debug_string() const; + static std::string debug_string(const std::vector& exprs); + static std::string debug_string(const std::vector& ctxs); protected: + /// Simple debug string that provides no expr subclass-specific information + std::string debug_string(const std::string& expr_name) const { + std::stringstream out; + out << expr_name << "(" << VExpr::debug_string() << ")"; + return out.str(); + } + TExprNodeType::type _node_type; TypeDescriptor _type; DataTypePtr _data_type; diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp index 3d183db1d01aa7..b37870e360f7b1 100644 --- a/be/src/vec/exprs/vliteral.cpp +++ b/be/src/vec/exprs/vliteral.cpp @@ -123,6 +123,9 @@ VLiteral::~VLiteral() {} Status VLiteral::execute(vectorized::Block* block, int* result_column_id) { int rows = block->rows(); + if (rows < 1) { + rows = 1; + } size_t res = block->columns(); block->insert({_column_ptr->cloneResized(rows), _data_type, _expr_name}); *result_column_id = res; diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp index a3cb80ddd9d298..d3cb712ea3ba9c 100644 --- a/be/src/vec/exprs/vslot_ref.cpp +++ b/be/src/vec/exprs/vslot_ref.cpp @@ -67,5 +67,9 @@ Status VSlotRef::execute(Block* block, int* result_column_id) { const std::string& VSlotRef::expr_name() const { return *_column_name; } - +std::string VSlotRef::debug_string() const { + std::stringstream out; + out << "SlotRef(slot_id=" << _slot_id << VExpr::debug_string() << ")"; + return out.str(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h index df5ddb05d16b97..d63b17bdae9538 100644 --- a/be/src/vec/exprs/vslot_ref.h +++ b/be/src/vec/exprs/vslot_ref.h @@ -35,6 +35,7 @@ class VSlotRef final : public VExpr { } virtual const std::string& expr_name() const override; + virtual std::string debug_string() const; private: FunctionPtr _function; diff --git a/be/src/vec/io/io_helper.h b/be/src/vec/io/io_helper.h index b4d481bf288dbc..0caabbdf87dc87 100644 --- a/be/src/vec/io/io_helper.h +++ b/be/src/vec/io/io_helper.h @@ -30,8 +30,47 @@ #include "vec/io/var_int.h" #define DEFAULT_MAX_STRING_SIZE (1ULL << 30) +#define WRITE_HELPERS_MAX_INT_WIDTH 40U namespace doris::vectorized { + +template +inline T decimalScaleMultiplier(UInt32 scale); +template <> +inline Int32 decimalScaleMultiplier(UInt32 scale) { + return common::exp10_i32(scale); +} +template <> +inline Int64 decimalScaleMultiplier(UInt32 scale) { + return common::exp10_i64(scale); +} +template <> +inline Int128 decimalScaleMultiplier(UInt32 scale) { + return common::exp10_i128(scale); +} + +template +void writeText(Decimal value, UInt32 scale, std::ostream& ostr) { + if (value < Decimal(0)) { + value *= Decimal(-1); + ostr << '-'; + } + + T whole_part = value; + if (scale) whole_part = value / decimalScaleMultiplier(scale); + if constexpr (std::is_same::value || std::is_same::value) { + // int128 + } else { + ostr << whole_part; + } + if (scale) { + ostr << '.'; + String str_fractional(scale, '0'); + for (Int32 pos = scale - 1; pos >= 0; --pos, value /= Decimal(10)) + str_fractional[pos] += value % Decimal(10); + ostr.write(str_fractional.data(), scale); + } +} /// Methods for output in binary format. /// Write POD-type in native format. It's recommended to use only with packed (dense) data types. diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 487039746f2c62..bf431ba1af1276 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -193,6 +193,7 @@ VDataStreamRecvr::VDataStreamRecvr( _total_buffer_limit(total_buffer_limit), _row_desc(row_desc), _is_merging(is_merging), + _is_closed(false), _num_buffered_bytes(0), _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { @@ -274,6 +275,10 @@ void VDataStreamRecvr::cancel_stream() { } void VDataStreamRecvr::close() { + if (_is_closed) { + return; + } + _is_closed = true; for (int i = 0; i < _sender_queues.size(); ++i) { _sender_queues[i]->close(); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index e5dc3a097d8c0e..d8b8943867da67 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -90,6 +90,7 @@ class VDataStreamRecvr { // True if this reciver merges incoming rows from different senders. Per-sender // row batch queues are maintained in this case. bool _is_merging; + bool _is_closed; std::atomic _num_buffered_bytes; std::shared_ptr _mem_tracker; diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index ea747ece87569e..65dd3a8f9ab451 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -236,6 +236,50 @@ TEST(BlockTest, SerializeAndDeserializeBlock) { } } +TEST(BlockTest, DumpData) { + auto vec = vectorized::ColumnVector::create(); + auto& int32_data = vec->getData(); + for (int i = 0; i < 1024; ++i) { + int32_data.push_back(i); + } + vectorized::DataTypePtr int32_type(std::make_shared()); + vectorized::ColumnWithTypeAndName test_int(vec->getPtr(), int32_type, "test_int"); + + auto strcol = vectorized::ColumnString::create(); + for (int i = 0; i < 1024; ++i) { + std::string is = std::to_string(i); + strcol->insertData(is.c_str(), is.size()); + } + vectorized::DataTypePtr string_type(std::make_shared()); + vectorized::ColumnWithTypeAndName test_string(strcol->getPtr(), string_type, "test_string"); + + vectorized::DataTypePtr decimal_data_type(doris::vectorized::createDecimal(27, 9)); + auto decimal_column = decimal_data_type->createColumn(); + auto& decimal_data = ((vectorized::ColumnDecimal>*) + decimal_column.get()) + ->getData(); + for (int i = 0; i < 1024; ++i) { + __int128_t value = i; + for (int j = 0; j < 9; ++j) { + value *= 10; + } + decimal_data.push_back(value); + } + vectorized::ColumnWithTypeAndName test_decimal(decimal_column->getPtr(), decimal_data_type, + "test_decimal"); + + auto column_vector_int32 = vectorized::ColumnVector::create(); + auto column_nullable_vector = makeNullable(std::move(column_vector_int32)); + auto mutable_nullable_vector = std::move(*column_nullable_vector).mutate(); + for (int i = 0; i < 4096; i++) { + mutable_nullable_vector->insert(vectorized::castToNearestFieldType(i)); + } + auto nint32_type = makeNullable(std::make_shared()); + vectorized::ColumnWithTypeAndName test_nullable_int32(mutable_nullable_vector->getPtr(), + nint32_type, "test_nullable_int32"); + vectorized::Block block({test_int, test_string, test_decimal, test_nullable_int32}); + EXPECT_GT(block.dumpData().size(), 1); +} } // namespace doris int main(int argc, char** argv) {