Skip to content

Commit

Permalink
[Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/…
Browse files Browse the repository at this point in the history
…vparquet/vbroker scanner

1. fix bug of vjson scanner not support `range_from_file_path`
2. fix bug of vjson/vbrocker scanner core dump by src/dest slot nullable is different
3. fix bug of vparquest filter_block reference of column in not 1
4. refactor code to simple all the code
  • Loading branch information
lihaopeng committed May 19, 2022
1 parent 1cc9653 commit b3ca1e6
Show file tree
Hide file tree
Showing 22 changed files with 268 additions and 358 deletions.
184 changes: 153 additions & 31 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {

BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: _state(state),
_params(params),
_ranges(ranges),
_broker_addresses(broker_addresses),
_next_range(0),
_counter(counter),
_src_tuple(nullptr),
_src_tuple_row(nullptr),
Expand Down Expand Up @@ -71,6 +77,21 @@ Status BaseScanner::open() {
_rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)");
_materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)");

const auto& range = _ranges[0];
_num_of_columns_from_file = range.__isset.num_of_columns_from_file
? implicit_cast<int>(range.num_of_columns_from_file)
: implicit_cast<int>(_src_slot_descs.size());

// check consistency
if (range.__isset.num_of_columns_from_file) {
int size = range.columns_from_path.size();
for (const auto& r : _ranges) {
if (r.columns_from_path.size() != size) {
return Status::InternalError("ranges have different number of columns.");
}
}
}
return Status::OK();
}

Expand Down Expand Up @@ -272,59 +293,136 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
}
void* slot = dest_tuple->get_slot(slot_desc->tuple_offset());
RawValue::write(value, slot, slot_desc->type(), mem_pool);
continue;
}
_success = true;
return Status::OK();
}

Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) {
Status BaseScanner::_filter_src_block() {
auto origin_column_num = _src_block.columns();
// filter block
if (!_vpre_filter_ctxs.empty()) {
for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
auto old_rows = temp_block->rows();
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num));
_counter->num_rows_unselected += old_rows - temp_block->rows();
auto old_rows = _src_block.rows();
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block,
origin_column_num));
_counter->num_rows_unselected += old_rows - _src_block.rows();
}
}
return Status::OK();
}

Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) {
Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
// Do vectorized expr here
Status status;
if (!_dest_vexpr_ctx.empty()) {
*output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
_dest_vexpr_ctx, *temp_block, status);
if (UNLIKELY(output_block->rows() == 0)) {
return status;
int ctx_idx = 0;
size_t rows = _src_block.rows();
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
auto& filter_map = filter_column->get_data();

for (auto slot_desc : _dest_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx++;

auto* ctx = _dest_vexpr_ctx[dest_index];
int result_column_id = 0;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
auto column_ptr = _src_block.get_by_position(result_column_id).column;

// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
// is likely to be nullable
if (LIKELY(column_ptr->is_nullable())) {
auto nullable_column =
reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
for (int i = 0; i < rows; ++i) {
if (filter_map[i] && nullable_column->is_null_at(i)) {
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
!_src_block.get_by_position(dest_index).column->is_null_at(i)) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block.dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
// Type of the slot is must be Varchar in _temp_block.
auto raw_value =
_src_block.get_by_position(ctx_idx).column->get_data_at(
i);
std::string raw_string = raw_value.to_string();
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
"column({}) value is incorrect while strict "
"mode is {}, "
"src value is {}",
slot_desc->col_name(), _strict_mode, raw_string);
return fmt::to_string(error_msg);
},
&_scanner_eof));
filter_map[i] = false;
} else if (!slot_desc->is_nullable()) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block.dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
"column({}) values is null while columns is not "
"nullable",
slot_desc->col_name());
return fmt::to_string(error_msg);
},
&_scanner_eof));
filter_map[i] = false;
}
}
}
if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr();
} else if (slot_desc->is_nullable()) {
column_ptr = vectorized::make_nullable(column_ptr);
}
dest_block->insert(vectorized::ColumnWithTypeAndName(
std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}

// after do the dest block insert operation, clear _src_block to remove the reference of origin column
_src_block.clear();

size_t dest_size = dest_block->columns();
// do filter
dest_block->insert(vectorized::ColumnWithTypeAndName(
std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(),
"filter column"));
RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size));
_counter->num_rows_filtered += rows - dest_block->rows();

return Status::OK();
}

Status BaseScanner::fill_dest_block(vectorized::Block* dest_block,
std::vector<vectorized::MutableColumnPtr>& columns) {
if (columns.empty() || columns[0]->size() == 0) {
return Status::OK();
}

std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block());
auto n_columns = 0;
for (const auto slot_desc : _src_slot_descs) {
temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
// TODO: opt the reuse of src_block or dest_block column. some case we have to
// shallow copy the column of src_block to dest block
Status BaseScanner::_init_src_block() {
DCHECK(_src_block.columns() == 0);
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor* slot_desc = _src_slot_descs[i];
if (slot_desc == nullptr) {
continue;
}
auto data_type = slot_desc->get_data_type_ptr();
_src_block.insert(vectorized::ColumnWithTypeAndName(
data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}

RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size()));
return Status::OK();
}

if (_dest_vexpr_ctx.empty()) {
*dest_block = *temp_block;
} else {
RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get()));
Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) {
*eof = _scanner_eof;
_fill_columns_from_path();
if (LIKELY(_src_block.rows() > 0)) {
RETURN_IF_ERROR(BaseScanner::_filter_src_block());
RETURN_IF_ERROR(BaseScanner::_materialize_dest_block(dest_block));
}

return Status::OK();
Expand All @@ -337,7 +435,7 @@ void BaseScanner::fill_slots_of_columns_from_path(
auto slot_desc = _src_slot_descs.at(i + start);
_src_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = _src_tuple->get_slot(slot_desc->tuple_offset());
StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
auto* str_slot = reinterpret_cast<StringValue*>(slot);
const std::string& column_from_path = columns_from_path[i];
str_slot->ptr = const_cast<char*>(column_from_path.c_str());
str_slot->len = column_from_path.size();
Expand All @@ -360,4 +458,28 @@ void BaseScanner::close() {
}
}

void BaseScanner::_fill_columns_from_path() {
const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.num_of_columns_from_file) {
size_t start = range.num_of_columns_from_file;
size_t rows = _src_block.rows();

for (size_t i = 0; i < range.columns_from_path.size(); ++i) {
auto slot_desc = _src_slot_descs.at(i + start);
if (slot_desc == nullptr) continue;
auto is_nullable = slot_desc->is_nullable();
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
is_nullable);
auto data_column = data_type->create_column();
const std::string& column_from_path = range.columns_from_path[i];
for (size_t j = 0; j < rows; ++j) {
data_column->insert_data(const_cast<char*>(column_from_path.c_str()),
column_from_path.size());
}
_src_block.insert(vectorized::ColumnWithTypeAndName(std::move(data_column), data_type,
slot_desc->col_name()));
}
}
}

} // namespace doris
30 changes: 20 additions & 10 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ struct ScannerCounter {
class BaseScanner {
public:
BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);

virtual ~BaseScanner() {
Expr::close(_dest_expr_ctx, _state);
if (_state->enable_vectorized_exec()) {
Expand All @@ -77,21 +80,22 @@ class BaseScanner {
virtual void close() = 0;
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);

Status fill_dest_block(vectorized::Block* dest_block,
std::vector<vectorized::MutableColumnPtr>& columns);

void fill_slots_of_columns_from_path(int start,
const std::vector<std::string>& columns_from_path);

void free_expr_local_allocations();

Status filter_block(vectorized::Block* temp_block, size_t slot_num);

Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block);

protected:
Status _fill_dest_block(vectorized::Block* dest_block, bool* eof);
virtual Status _init_src_block();

RuntimeState* _state;
const TBrokerScanRangeParams& _params;

//const TBrokerScanRangeParams& _params;
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;
int _next_range;
// used for process stat
ScannerCounter* _counter;

Expand All @@ -109,9 +113,6 @@ class BaseScanner {
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
// for vectorized
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
Expand All @@ -135,7 +136,16 @@ class BaseScanner {
bool _success = false;
bool _scanner_eof = false;

// for vectorized load
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
vectorized::Block _src_block;
int _num_of_columns_from_file;

private:
Status _filter_src_block();
void _fill_columns_from_path();
Status _materialize_dest_block(vectorized::Block* output_block);
Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
};

Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,10 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: BaseScanner(state, profile, params, pre_filter_texprs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
: BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_decompressor(nullptr),
_next_range(0),
_cur_line_reader_eof(false),
_skip_lines(0) {
if (params.__isset.column_separator_length && params.column_separator_length > 1) {
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ class BrokerScanner : public BaseScanner {
Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);

protected:
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;

std::string _value_separator;
std::string _line_delimiter;
TFileFormatType::type _file_format_type;
Expand All @@ -113,7 +110,6 @@ class BrokerScanner : public BaseScanner {
FileReader* _cur_file_reader;
LineReader* _cur_line_reader;
Decompressor* _cur_decompressor;
int _next_range;
bool _cur_line_reader_eof;

// When we fetch range start from 0, header_type="csv_with_names" skip first line
Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,10 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: BaseScanner(state, profile, params, pre_filter_texprs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
: BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_json_reader(nullptr),
_next_range(0),
_cur_reader_eof(false),
_read_json_by_line(false) {
if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) {
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ class JsonScanner : public BaseScanner {
bool& num_as_string, bool& fuzzy_parse);

protected:
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;

std::string _jsonpath;
std::string _jsonpath_file;

Expand All @@ -91,7 +88,6 @@ class JsonScanner : public BaseScanner {
FileReader* _cur_file_reader;
LineReader* _cur_line_reader;
JsonReader* _cur_json_reader;
int _next_range;
bool _cur_reader_eof;
bool _read_json_by_line;

Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/orc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,8 @@ ORCScanner::ORCScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: BaseScanner(state, profile, params, pre_filter_texprs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
: BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
// _splittable(params.splittable),
_next_range(0),
_cur_file_eof(true),
_total_groups(0),
_current_group(0),
Expand Down
Loading

0 comments on commit b3ca1e6

Please sign in to comment.