Skip to content

Commit

Permalink
[Vectorized](stream-load-vec) Support stream load in vectorized engine (
Browse files Browse the repository at this point in the history
apache#8709) (apache#9280)

Implement vectorized stream load.
Added fe configuration option `enable_vectorized_load` to enable vectorized stream load.

    Co-authored-by: tengjp@outlook.com
    Co-authored-by: mrhhsg@gmail.com
    Co-authored-by: minghong.zhou@163.com
    Co-authored-by: HappenLee <happenlee@hotmail.com>
    Co-authored-by: zhoubintao <35688959+zbtzbtzbt@users.noreply.github.com>
  • Loading branch information
HappenLee authored and minghong.zhou committed May 6, 2022
1 parent d1da5cb commit 1618d7f
Show file tree
Hide file tree
Showing 65 changed files with 5,146 additions and 620 deletions.
10 changes: 10 additions & 0 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class MemTracker;
class RuntimeState;
class ExprContext;

namespace vectorized {
class IColumn;
using MutableColumnPtr = IColumn::MutablePtr;
}

// The counter will be passed to each scanner.
// Note that this struct is not thread safe.
// So if we support concurrent scan in the future, we need to modify this struct.
Expand All @@ -56,6 +61,11 @@ class BaseScanner {
// Get next tuple
virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool *fill_tuple) = 0;

// Get next block
virtual Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) {
return Status::NotSupported("Not Implemented get block");
}

// Close this scanner
virtual void close() = 0;
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);
Expand Down
14 changes: 10 additions & 4 deletions be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <sstream>

#include "common/object_pool.h"
#include "exec/broker_scanner.h"
#include "vec/exec/vbroker_scanner.h"
#include "exec/json_scanner.h"
#include "exec/orc_scanner.h"
#include "exec/parquet_scanner.h"
Expand Down Expand Up @@ -238,9 +238,15 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
_pre_filter_texprs, counter);
break;
default:
scan = new BrokerScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
if (_vectorized) {
scan = new vectorized::VBrokerScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
} else {
scan = new BrokerScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
}
}
std::unique_ptr<BaseScanner> scanner(scan);
return scanner;
Expand Down
13 changes: 7 additions & 6 deletions be/src/exec/broker_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class BrokerScanNode : public ScanNode {
// Write debug string of this into out.
virtual void debug_string(int indentation_level, std::stringstream* out) const override;

private:
// Update process status to one failed status,
// NOTE: Must hold the mutex of this scan node
bool update_status(const Status& new_status) {
Expand All @@ -76,8 +75,12 @@ class BrokerScanNode : public ScanNode {
return false;
}

std::unique_ptr<BaseScanner> create_scanner(const TBrokerScanRange& scan_range,
ScannerCounter* counter);

private:
// Create scanners to do scan job
Status start_scanners();
virtual Status start_scanners();

// One scanner worker, This scanner will handle 'length' ranges start from start_idx
void scanner_worker(int start_idx, int length);
Expand All @@ -86,10 +89,8 @@ class BrokerScanNode : public ScanNode {
Status scanner_scan(const TBrokerScanRange& scan_range,
const std::vector<ExprContext*>& conjunct_ctxs, ScannerCounter* counter);

std::unique_ptr<BaseScanner> create_scanner(const TBrokerScanRange& scan_range,
ScannerCounter* counter);

private:
protected:
bool _vectorized = false;
TupleId _tuple_id;
RuntimeState* _runtime_state;
TupleDescriptor* _tuple_desc;
Expand Down
8 changes: 1 addition & 7 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
#include "exec/plain_binary_line_reader.h"
#include "exec/plain_text_line_reader.h"
#include "exec/s3_reader.h"
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
#include "exprs/expr.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -82,10 +80,6 @@ BrokerScanner::~BrokerScanner() {

Status BrokerScanner::open() {
RETURN_IF_ERROR(BaseScanner::open()); // base default function
_text_converter.reset(new (std::nothrow) TextConverter('\\'));
if (_text_converter == nullptr) {
return Status::InternalError("No memory error.");
}
return Status::OK();
}

Expand Down Expand Up @@ -272,7 +266,7 @@ Status BrokerScanner::open_line_reader() {
return Status::InternalError(ss.str());
}
size += 1;
// not first range will always skip one line
// not first range will always skip one line
_skip_lines = 1;
}

Expand Down
16 changes: 8 additions & 8 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class BrokerScanner : public BaseScanner {
const TBrokerScanRangeParams& params, const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
~BrokerScanner();
virtual ~BrokerScanner();

// Open this scanner, will initialize information need to
Status open() override;
Expand All @@ -67,12 +67,16 @@ class BrokerScanner : public BaseScanner {
// Close this scanner
void close() override;

protected:
// Read next buffer from reader
Status open_next_reader();

Status _line_to_src_tuple(const Slice& line);

private:
Status open_file_reader();
Status create_decompressor(TFileFormatType::type type);
Status open_line_reader();
// Read next buffer from reader
Status open_next_reader();

// Split one text line to values
void split_line(const Slice& line);
Expand All @@ -88,14 +92,10 @@ class BrokerScanner : public BaseScanner {
// output is tuple
Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);

Status _line_to_src_tuple(const Slice& line);

private:
protected:
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;

std::unique_ptr<TextConverter> _text_converter;

std::string _value_separator;
std::string _line_delimiter;
TFileFormatType::type _file_format_type;
Expand Down
8 changes: 7 additions & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
#include "vec/exec/vunion_node.h"
#include "vec/exec/meta_scan_node.h"
#include "vec/exec/decode_node.h"
#include "vec/exec/vbroker_scan_node.h"
#include "vec/exprs/vexpr.h"

namespace doris {
Expand Down Expand Up @@ -394,6 +395,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
case TPlanNodeType::SELECT_NODE:
case TPlanNodeType::REPEAT_NODE:
case TPlanNodeType::TABLE_FUNCTION_NODE:
case TPlanNodeType::BROKER_SCAN_NODE:
break;
default: {
const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
Expand Down Expand Up @@ -557,7 +559,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

case TPlanNodeType::BROKER_SCAN_NODE:
*node = pool->add(new BrokerScanNode(pool, tnode, descs));
if (state->enable_vectorized_exec()) {
*node = pool->add(new vectorized::VBrokerScanNode(pool, tnode, descs));
} else {
*node = pool->add(new BrokerScanNode(pool, tnode, descs));
}
return Status::OK();

case TPlanNodeType::REPEAT_NODE:
Expand Down
Loading

0 comments on commit 1618d7f

Please sign in to comment.