Skip to content

Commit

Permalink
add vectorized union node (apache#22)
Browse files Browse the repository at this point in the history
* add vectorized union node
  • Loading branch information
yangzhg authored and HappenLee committed Jul 1, 2021
1 parent f57ae0b commit 7185e6e
Show file tree
Hide file tree
Showing 39 changed files with 1,081 additions and 29 deletions.
18 changes: 11 additions & 7 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"

#include "vec/core/block.h"
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/volap_scan_node.h"
#include "vec/exec/vexchange_node.h"
#include "vec/exec/volap_scan_node.h"
#include "vec/exec/vsort_node.h"
#include "vec/exec/vunion_node.h"
#include "vec/exprs/vexpr.h"

namespace doris {
Expand Down Expand Up @@ -134,8 +134,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_rows_returned_counter(NULL),
_rows_returned_rate(NULL),
_memory_used_counter(NULL),
_is_closed(false) {
}
_is_closed(false) {}

ExecNode::~ExecNode() {}

Expand Down Expand Up @@ -165,8 +164,9 @@ void ExecNode::push_down_predicate(RuntimeState* state, std::list<ExprContext*>*
}

Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
init_runtime_profile(state->enable_vectorized_exec()? "V" + print_plan_node_type(tnode.node_type) :
print_plan_node_type(tnode.node_type));
init_runtime_profile(state->enable_vectorized_exec()
? "V" + print_plan_node_type(tnode.node_type)
: print_plan_node_type(tnode.node_type));

if (tnode.__isset.vconjunct) {
_vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
Expand Down Expand Up @@ -456,7 +456,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::UNION_NODE:
*node = pool->add(new UnionNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
*node = pool->add(new vectorized::VUnionNode(pool, tnode, descs));
} else {
*node = pool->add(new UnionNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::INTERSECT_NODE:
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
_collect_query_statistics();
Status status;
{
boost::lock_guard<boost::mutex> l(_status_lock);
std::lock_guard<std::mutex> l(_status_lock);
status = _status;
}
status = _sink->close(runtime_state(), status);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ set(VEC_FILES
exec/vsort_exec_exprs.cpp
exec/volap_scanner.cpp
exec/vexchange_node.cpp
exec/vset_operation_node.cpp
exec/vunion_node.cpp
exprs/vectorized_agg_fn.cpp
exprs/vectorized_fn_call.cpp
exprs/vexpr.cpp
Expand Down
112 changes: 104 additions & 8 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

#include "vec/core/block.h"

#include <iomanip>
#include <iterator>
#include <memory>

#include "fmt/format.h"
#include "gen_cpp/data.pb.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -381,12 +383,51 @@ std::string Block::dumpNames() const {
return out.str();
}

std::string Block::dumpData() const {
// WriteBufferFromOwnString out;
std::stringstream out;
std::string Block::dumpData(size_t row_limit) const {
if (rows() == 0) {
return "empty block.";
}
std::vector<std::string> headers;
std::vector<size_t> headers_size;
for (auto it = data.begin(); it != data.end(); ++it) {
if (it != data.begin()) out << ", ";
out << it->name;
std::string s = fmt::format("{}({})", it->name, it->type->getName());
headers_size.push_back(s.size() > 15 ? s.size() : 15);
headers.emplace_back(s);
}

std::stringstream out;
// header upper line
auto line = [&]() {
for (size_t i = 0; i < columns(); ++i) {
out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-";
}
out << std::setw(1) << "+" << std::endl;
};
line();
// header text
for (size_t i = 0; i < columns(); ++i) {
out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i])
<< headers[i];
}
out << std::setw(1) << "|" << std::endl;
// header bottom line
line();
// content
for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) {
for (size_t i = 0; i < columns(); ++i) {
std::string s = data[i].to_string(row_num);
if (s.length() > headers_size[i]) {
s = s.substr(0, headers_size[i] - 3) + "...";
}
out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
<< std::right << s;
}
out << std::setw(1) << "|" << std::endl;
}
// bottom line
line();
if (row_limit < rows()) {
out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl;
}
return out.str();
}
Expand Down Expand Up @@ -658,6 +699,12 @@ void Block::swap(Block& other) noexcept {
index_by_name.swap(other.index_by_name);
}

void Block::swap(Block&& other) noexcept {
clear();
data = std::move(other.data);
initializeIndexByName();
}

void Block::updateHash(SipHash& hash) const {
for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no)
for (const auto& col : data) col.column->updateHashWithValue(row_no, hash);
Expand Down Expand Up @@ -696,9 +743,11 @@ void Block::serialize(PBlock* pblock) const {
}
}

int MutableBlock::rows() {
size_t MutableBlock::rows() const {
for (const auto& column : _columns)
if (column) return column->size();
if (column) {
return column->size();
}

return 0;
}
Expand All @@ -715,7 +764,54 @@ Block MutableBlock::to_block() {
for (int i = 0; i < _columns.size(); ++i) {
columns_with_schema.emplace_back(std::move(_columns[i]), _data_types[i], "");
}
return columns_with_schema;
return {columns_with_schema};
}
std::string MutableBlock::dumpData(size_t row_limit) const {
if (rows() == 0) {
return "empty block.";
}
std::vector<std::string> headers;
std::vector<size_t> headers_size;
for (size_t i = 0; i < columns(); ++i) {
std::string s = _data_types[i]->getName();
headers_size.push_back(s.size() > 15 ? s.size() : 15);
headers.emplace_back(s);
}

std::stringstream out;
// header upper line
auto line = [&]() {
for (size_t i = 0; i < columns(); ++i) {
out << std::setfill('-') << std::setw(1) << "+" << std::setw(headers_size[i]) << "-";
}
out << std::setw(1) << "+" << std::endl;
};
line();
// header text
for (size_t i = 0; i < columns(); ++i) {
out << std::setfill(' ') << std::setw(1) << "|" << std::left << std::setw(headers_size[i])
<< headers[i];
}
out << std::setw(1) << "|" << std::endl;
// header bottom line
line();
// content
for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) {
for (size_t i = 0; i < columns(); ++i) {
std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num);
if (s.length() > headers_size[i]) {
s = s.substr(0, headers_size[i] - 3) + "...";
}
out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i])
<< std::right << s;
}
out << std::setw(1) << "|" << std::endl;
}
// bottom line
line();
if (row_limit < rows()) {
out << rows() << " rows in block, only show first " << row_limit << " rows." << std::endl;
}
return out.str();
}
} // namespace doris::vectorized
54 changes: 52 additions & 2 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class Block {

void clear();
void swap(Block& other) noexcept;
void swap(Block&& other) noexcept;

/** Updates SipHash of the Block, using update method of columns.
* Returns hash for block, that could be used to differentiate blocks
Expand All @@ -155,7 +156,7 @@ class Block {
void updateHash(SipHash& hash) const;

/** Get block data in string. */
std::string dumpData() const;
std::string dumpData(size_t row_limit = 100) const;

static void filter_block(Block* block, int filter_conlumn_id, int column_to_keep);
// serialize block to PRowBatch
Expand Down Expand Up @@ -191,18 +192,67 @@ class MutableBlock {

MutableBlock(MutableColumns&& columns, DataTypes&& data_types)
: _columns(std::move(columns)), _data_types(std::move(data_types)) {}
MutableBlock(Block* block)
: _columns(block->mutateColumns()), _data_types(block->getDataTypes()) {}

int rows();
size_t rows() const;
size_t columns() const { return _columns.size(); }

bool empty() { return rows() == 0; }

MutableColumns& mutable_columns() { return _columns; }

DataTypes& data_types() { return _data_types; }

void merge(Block&& block) {
if (_columns.size() == 0 && _data_types.size() == 0) {
_data_types = std::move(block.getDataTypes());
_columns.resize(block.columns());
for (size_t i = 0; i < block.columns(); ++i) {
if (block.getByPosition(i).column) {
_columns[i] =
(*std::move(
block.getByPosition(i).column->convertToFullColumnIfConst()))
.mutate();
} else {
_columns[i] = _data_types[i]->createColumn();
}
}
} else {
for (int i = 0; i < _columns.size(); ++i) {
_columns[i]->insertRangeFrom(
*block.getByPosition(i).column->convertToFullColumnIfConst().get(), 0,
block.rows());
}
}
}
void merge(Block& block) {
if (_columns.size() == 0 && _data_types.size() == 0) {
_data_types = block.getDataTypes();
_columns.resize(block.columns());
for (size_t i = 0; i < block.columns(); ++i) {
if (block.getByPosition(i).column) {
_columns[i] =
(*std::move(
block.getByPosition(i).column->convertToFullColumnIfConst()))
.mutate();
} else {
_columns[i] = _data_types[i]->createColumn();
}
}
} else {
for (int i = 0; i < _columns.size(); ++i) {
_columns[i]->insertRangeFrom(
*block.getByPosition(i).column->convertToFullColumnIfConst().get(), 0,
block.rows());
}
}
}

Block to_block();

void add_row(const Block* block, int row);
std::string dumpData(size_t row_limit = 100) const;

void clear() {
_columns.clear();
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/core/column_with_type_and_name.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,8 @@ String ColumnWithTypeAndName::dumpStructure() const {
dumpStructure(out);
return out.str();
}
std::string ColumnWithTypeAndName::to_string(size_t row_num) const {
return type->to_string(*column.get(), row_num);
}

} // namespace doris::vectorized
1 change: 1 addition & 0 deletions be/src/vec/core/column_with_type_and_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct ColumnWithTypeAndName {

void dumpStructure(std::ostream& out) const;
String dumpStructure() const;
std::string to_string(size_t row_num) const;
};

} // namespace doris::vectorized
5 changes: 5 additions & 0 deletions be/src/vec/data_types/data_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ void IDataType::to_string(const IColumn& column, size_t row_num, BufferWritable&
ErrorCodes::NOT_IMPLEMENTED);
}

std::string IDataType::to_string(const IColumn& column, size_t row_num) const {
throw Exception("Data type " + getName() + "to_string not implement.",
ErrorCodes::NOT_IMPLEMENTED);
}

void IDataType::insertDefaultInto(IColumn& column) const {
column.insertDefault();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/data_types/data_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class IDataType : private boost::noncopyable {
virtual TypeIndex getTypeId() const = 0;

virtual void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const;
virtual std::string to_string(const IColumn& column, size_t row_num) const;

/** Binary serialization for range of values in column - for writing to disk/network, etc.
*
Expand Down
14 changes: 13 additions & 1 deletion be/src/vec/data_types/data_type_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ bool DataTypeNullable::onlyNull() const {
return typeid_cast<const DataTypeNothing*>(nested_data_type.get());
}

std::string DataTypeNullable::to_string(const IColumn& column, size_t row_num) const {
const ColumnNullable& col =
assert_cast<const ColumnNullable&>(*column.convertToFullColumnIfConst().get());

if (col.isNullAt(row_num)) {
return "\\N";
} else {
return nested_data_type->to_string(col.getNestedColumn(), row_num);
}
}

// void DataTypeNullable::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
// {
// path.push_back(Substream::NullMap);
Expand Down Expand Up @@ -477,7 +488,8 @@ bool DataTypeNullable::onlyNull() const {
// }

void DataTypeNullable::serialize(const IColumn& column, PColumn* pcolumn) const {
const ColumnNullable& col = assert_cast<const ColumnNullable&>(column);
const ColumnNullable& col =
assert_cast<const ColumnNullable&>(*column.convertToFullColumnIfConst().get());
for (size_t i = 0; i < column.size(); ++i) {
bool is_null = col.isNullAt(i);
pcolumn->add_is_null(is_null);
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/data_types/data_type_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class DataTypeNullable final : public IDataType {
bool canBeInsideLowCardinality() const override {
return nested_data_type->canBeInsideLowCardinality();
}
std::string to_string(const IColumn& column, size_t row_num) const;

const DataTypePtr& getNestedType() const { return nested_data_type; }

Expand Down
Loading

0 comments on commit 7185e6e

Please sign in to comment.