Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] (vec) instead of converting line to src tuple for stream load in vectorized. #9314

Merged
merged 4 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,18 @@ Status BaseScanner::init_expr_ctxes() {

// preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
if (!_pre_filter_texprs.empty()) {
RETURN_IF_ERROR(
Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs));
RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
if (_state->enable_vectorized_exec()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
_state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc,
_mem_tracker));
RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state));
} else {
RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs,
&_pre_filter_ctxs));
RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
}
}

// Construct dest slots information
Expand All @@ -133,11 +141,22 @@ Status BaseScanner::init_expr_ctxes() {
<< ", name=" << slot_desc->col_name();
return Status::InternalError(ss.str());
}
ExprContext* ctx = nullptr;
RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);

if (_state->enable_vectorized_exec()) {
vectorized::VExprContext* ctx = nullptr;
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
RETURN_IF_ERROR(ctx->open(_state));
_dest_vexpr_ctx.emplace_back(ctx);
} else {
ExprContext* ctx = nullptr;
RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else {}


if (has_slot_id_map) {
auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
Expand Down Expand Up @@ -284,6 +303,10 @@ void BaseScanner::close() {
if (!_pre_filter_ctxs.empty()) {
Expr::close(_pre_filter_ctxs, _state);
}

if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) {
vectorized::VExpr::close(_vpre_filter_ctxs, _state);
}
}

} // namespace doris
15 changes: 12 additions & 3 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "common/status.h"
#include "exprs/expr.h"
#include "vec/exprs/vexpr.h"
#include "runtime/tuple.h"
#include "util/runtime_profile.h"

Expand Down Expand Up @@ -52,7 +53,12 @@ class BaseScanner {
public:
BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); };
virtual ~BaseScanner() {
Expr::close(_dest_expr_ctx, _state);
if (_state->enable_vectorized_exec()) {
vectorized::VExpr::close(_dest_vexpr_ctx, _state);
}
};

virtual Status init_expr_ctxes();
// Open this scanner, will initialize information need to
Expand All @@ -62,8 +68,8 @@ class BaseScanner {
virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) = 0;

// Get next block
virtual Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) {
return Status::NotSupported("Not Implemented get next");
virtual Status get_next(vectorized::Block* block, bool* eof) {
return Status::NotSupported("Not Implemented get block");
}

// Close this scanner
Expand Down Expand Up @@ -95,6 +101,9 @@ class BaseScanner {
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
// for vectorized
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
Expand Down
18 changes: 14 additions & 4 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,7 @@ Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool*
return fill_dest_tuple(tuple, tuple_pool, fill_tuple);
}

// Convert one row to this tuple
Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
Status BrokerScanner::_line_split_to_values(const Slice& line) {
bool is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
if (!is_proto_format && !validate_utf8(line.data, line.size)) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
Expand Down Expand Up @@ -546,6 +545,17 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
}
}

_success = true;
return Status::OK();
}

// Convert one row to this tuple
Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
RETURN_IF_ERROR(_line_split_to_values(line));
if (!_success) {
return Status::OK();
}

for (int i = 0; i < _split_values.size(); ++i) {
auto slot_desc = _src_slot_descs[i];
const Slice& value = _split_values[i];
Expand All @@ -560,11 +570,11 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
str_slot->len = value.size;
}

const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.num_of_columns_from_file) {
fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path);
fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path);
}

_success = true;
return Status::OK();
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ class BrokerScanner : public BaseScanner {
virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof,
bool* fill_tuple) override;

Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) override {
return Status::NotSupported("Not Implemented get columns");
Status get_next(vectorized::Block* block, bool* eof) override {
return Status::NotSupported("Not Implemented get block");
}

// Close this scanner
Expand All @@ -78,6 +78,8 @@ class BrokerScanner : public BaseScanner {

Status _line_to_src_tuple(const Slice& line);

Status _line_split_to_values(const Slice& line);

private:
Status open_file_reader();
Status create_decompressor(TFileFormatType::type type);
Expand Down
188 changes: 96 additions & 92 deletions be/src/vec/exec/vbroker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,43 +61,69 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block,
return Status::OK();
}

std::shared_ptr<vectorized::Block> scanner_block;
{
std::unique_lock<std::mutex> l(_batch_queue_lock);
while (_process_status.ok() && !_runtime_state->is_cancelled() &&
_num_running_scanners > 0 && _block_queue.empty()) {
SCOPED_TIMER(_wait_scanner_timer);
_queue_reader_cond.wait_for(l, std::chrono::seconds(1));
}
if (!_process_status.ok()) {
// Some scanner process failed.
return _process_status;
const int batch_size = _runtime_state->batch_size();
while (true) {
std::shared_ptr<vectorized::Block> scanner_block;
{
std::unique_lock<std::mutex> l(_batch_queue_lock);
while (_process_status.ok() && !_runtime_state->is_cancelled() &&
_num_running_scanners > 0 && _block_queue.empty()) {
SCOPED_TIMER(_wait_scanner_timer);
_queue_reader_cond.wait_for(l, std::chrono::seconds(1));
}
if (!_process_status.ok()) {
// Some scanner process failed.
return _process_status;
}
if (_runtime_state->is_cancelled()) {
if (update_status(Status::Cancelled("Cancelled"))) {
_queue_writer_cond.notify_all();
}
return _process_status;
}
if (!_block_queue.empty()) {
scanner_block = _block_queue.front();
_block_queue.pop_front();
}
}
if (_runtime_state->is_cancelled()) {
if (update_status(Status::Cancelled("Cancelled"))) {
_queue_writer_cond.notify_all();

// All scanner has been finished, and all cached batch has been read
if (!scanner_block) {
if (_mutable_block && !_mutable_block->empty()) {
*block = _mutable_block->to_block();
reached_limit(block, eos);
LOG_IF(INFO, *eos) << "VBrokerScanNode ReachedLimit.";
}
return _process_status;
_scan_finished.store(true);
*eos = true;
return Status::OK();
}
if (!_block_queue.empty()) {
scanner_block = _block_queue.front();
_block_queue.pop_front();
// notify one scanner
_queue_writer_cond.notify_one();

if (UNLIKELY(!_mutable_block)) {
_mutable_block.reset(new MutableBlock(scanner_block->clone_empty()));
}
}

// All scanner has been finished, and all cached batch has been read
if (scanner_block == nullptr) {
_scan_finished.store(true);
*eos = true;
return Status::OK();
if (_mutable_block->rows() + scanner_block->rows() < batch_size) {
// merge scanner_block into _mutable_block
_mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
continue;
} else {
if (_mutable_block->empty()) {
// directly use scanner_block
*block = *scanner_block;
} else {
// copy _mutable_block firstly, then merge scanner_block into _mutable_block for next.
*block = _mutable_block->to_block();
_mutable_block->set_muatable_columns(scanner_block->clone_empty_columns());
_mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
}
break;
}
}

// notify one scanner
_queue_writer_cond.notify_one();

reached_limit(scanner_block.get(), eos);
*block = *scanner_block;

reached_limit(block, eos);
if (*eos) {
_scan_finished.store(true);
_queue_writer_cond.notify_all();
Expand All @@ -120,75 +146,53 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, counter);
RETURN_IF_ERROR(scanner->open());
bool scanner_eof = false;

const int batch_size = _runtime_state->batch_size();
size_t slot_num = _tuple_desc->slots().size();

while (!scanner_eof) {
std::shared_ptr<vectorized::Block> block(new vectorized::Block());
std::vector<vectorized::MutableColumnPtr> columns(slot_num);
for (int i = 0; i < slot_num; i++) {
columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
RETURN_IF_CANCELLED(_runtime_state);
// If we have finished all works
if (_scan_finished.load() || !_process_status.ok()) {
return Status::OK();
}

while (columns[0]->size() < batch_size && !scanner_eof) {
RETURN_IF_CANCELLED(_runtime_state);
// If we have finished all works
if (_scan_finished.load()) {
return Status::OK();
}

RETURN_IF_ERROR(scanner->get_next(columns, &scanner_eof));
if (scanner_eof) {
break;
}
std::shared_ptr<vectorized::Block> block(new vectorized::Block());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here is shared_ptr ? not the unique_ptr?

RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
if (block->rows() == 0) {
continue;
}
auto old_rows = block->rows();
RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
_tuple_desc->slots().size()));
counter->num_rows_unselected += old_rows - block->rows();
if (block->rows() == 0) {
continue;
}

if (!columns[0]->empty()) {
auto n_columns = 0;
for (const auto slot_desc : _tuple_desc->slots()) {
block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}

auto old_rows = block->rows();

RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
_tuple_desc->slots().size()));

counter->num_rows_unselected += old_rows - block->rows();

std::unique_lock<std::mutex> l(_batch_queue_lock);
while (_process_status.ok() && !_scan_finished.load() &&
!_runtime_state->is_cancelled() &&
// stop pushing more batch if
// 1. too many batches in queue, or
// 2. at least one batch in queue and memory exceed limit.
(_block_queue.size() >= _max_buffered_batches ||
(mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
}
// Process already set failed, so we just return OK
if (!_process_status.ok()) {
return Status::OK();
}
// Scan already finished, just return
if (_scan_finished.load()) {
return Status::OK();
}
// Runtime state is canceled, just return cancel
if (_runtime_state->is_cancelled()) {
return Status::Cancelled("Cancelled");
}
// Queue size Must be smaller than _max_buffered_batches
_block_queue.push_back(block);

// Notify reader to
_queue_reader_cond.notify_one();
std::unique_lock<std::mutex> l(_batch_queue_lock);
while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() &&
// stop pushing more batch if
// 1. too many batches in queue, or
// 2. at least one batch in queue and memory exceed limit.
(_block_queue.size() >= _max_buffered_batches ||
(mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
_queue_writer_cond.wait_for(l, std::chrono::seconds(1));
}
}
// Process already set failed, so we just return OK
if (!_process_status.ok()) {
return Status::OK();
}
// Scan already finished, just return
if (_scan_finished.load()) {
return Status::OK();
}
// Runtime state is canceled, just return cancel
if (_runtime_state->is_cancelled()) {
return Status::Cancelled("Cancelled");
}
// Queue size Must be smaller than _max_buffered_batches
_block_queue.push_back(block);

// Notify reader to
_queue_reader_cond.notify_one();
}
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/vbroker_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class VBrokerScanNode final : public BrokerScanNode {
Status scanner_scan(const TBrokerScanRange& scan_range, ScannerCounter* counter);

std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
std::unique_ptr<MutableBlock> _mutable_block;
};
} // namespace vectorized
} // namespace doris
Loading