Skip to content

Commit

Permalink
add vectorized vjson_scanner and apply vexpr
Browse files Browse the repository at this point in the history
  • Loading branch information
hucheng01 committed May 9, 2022
1 parent fa836c3 commit 3fbc11e
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 52 deletions.
31 changes: 28 additions & 3 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,10 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
}

Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) {
// filter src tuple by preceding filter first
auto old_rows = temp_block->rows();
// filter block
if (!_vpre_filter_ctxs.empty()) {
for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
auto old_rows = temp_block->rows();
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num));
_counter->num_rows_unselected += old_rows - temp_block->rows();
Expand All @@ -292,7 +292,7 @@ Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num)
}

Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) {
// Do vectorized expr here to speed up load
// Do vectorized expr here
Status status;
if (!_dest_vexpr_ctx.empty()) {
*output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
Expand All @@ -305,6 +305,31 @@ Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::B
return Status::OK();
}

Status BaseScanner::fill_dest_block(vectorized::Block* dest_block,
std::vector<vectorized::MutableColumnPtr>& columns) {
if (columns.empty() || columns[0]->size() == 0) {
return Status::OK();
}

std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block());
auto n_columns = 0;
for (const auto slot_desc : _src_slot_descs) {
temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}

RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _src_slot_descs.size()));

if (_dest_vexpr_ctx.empty()) {
*dest_block = *temp_block;
} else {
RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get()));
}

return Status::OK();
}

void BaseScanner::fill_slots_of_columns_from_path(
int start, const std::vector<std::string>& columns_from_path) {
// values of columns from path can not be null
Expand Down
9 changes: 3 additions & 6 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ class BaseScanner {
return Status::NotSupported("Not Implemented get block");
}

virtual Status get_next(vectorized::Block& output_block, bool* eof) {
return Status::NotSupported("Not Implemented get block");
}

// Close this scanner
virtual void close() = 0;
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);

Status fill_dest_block(vectorized::Block* dest_block,
std::vector<vectorized::MutableColumnPtr>& columns);

void fill_slots_of_columns_from_path(int start,
const std::vector<std::string>& columns_from_path);

Expand Down Expand Up @@ -123,8 +122,6 @@ class BaseScanner {
// and will be converted to `_pre_filter_ctxs` when scanner is open.
const std::vector<TExpr> _pre_filter_texprs;
std::vector<ExprContext*> _pre_filter_ctxs;
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;

bool _strict_mode;

Expand Down
3 changes: 0 additions & 3 deletions be/src/vec/exec/vbroker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
return Status::OK();
}

// get block
RETURN_IF_ERROR(scanner->get_next(*(block.get()), &scanner_eof));

std::shared_ptr<vectorized::Block> block(new vectorized::Block());
RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
if (block->rows() == 0) {
Expand Down
42 changes: 7 additions & 35 deletions be/src/vec/exec/vjson_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,12 @@ VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile,
: JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
_cur_vjson_reader(nullptr) {}

VJsonScanner::~VJsonScanner() {
close();
}

Status VJsonScanner::open() {
RETURN_IF_ERROR(BaseScanner::open());
return Status::OK();
}

void VJsonScanner::close() {
BaseScanner::close();
}
VJsonScanner::~VJsonScanner() {}

Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) {
Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) {
SCOPED_TIMER(_read_timer);
Status status;
const int batch_size = _state->batch_size();
size_t slot_num = _src_slot_descs.size();
std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block());
std::vector<vectorized::MutableColumnPtr> columns(slot_num);
auto string_type = make_nullable(std::make_shared<DataTypeString>());
for (int i = 0; i < slot_num; i++) {
Expand Down Expand Up @@ -94,25 +81,10 @@ Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) {
continue;
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
}

if (columns[0]->size() > 0) {
auto n_columns = 0;
for (const auto slot_desc : _src_slot_descs) {
temp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}

RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), slot_num));

if (_dest_vexpr_ctx.empty()) {
output_block = *(temp_block.get());
} else {
RETURN_IF_ERROR(BaseScanner::execute_exprs(&output_block, temp_block.get()));
}
}
SCOPED_TIMER(_materialize_timer);
RETURN_IF_ERROR(BaseScanner::fill_dest_block(output_block, columns));

*eof = _scanner_eof;
return Status::OK();
Expand Down Expand Up @@ -296,7 +268,7 @@ Status VJsonReader::_set_column_value(rapidjson::Value& objectValue,
} else { // not found
if (slot_desc->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
nullable_column->insert_data(nullptr, 0);
nullable_column->insert_default();
nullcount++;
} else {
RETURN_IF_ERROR(_append_error_msg(
Expand Down Expand Up @@ -360,7 +332,7 @@ Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator v
case rapidjson::Type::kNullType:
if (slot_desc->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
nullable_column->insert_data(nullptr, 0);
nullable_column->insert_default();
} else {
RETURN_IF_ERROR(_append_error_msg(
*value, "Json value is null, but the column `{}` is not nullable.",
Expand Down Expand Up @@ -463,7 +435,7 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue,
// not match in jsondata.
if (slot_descs[i]->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
nullable_column->insert_data(nullptr, 0);
nullable_column->insert_default();
nullcount++;
} else {
RETURN_IF_ERROR(_append_error_msg(
Expand Down
6 changes: 1 addition & 5 deletions be/src/vec/exec/vjson_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ class VJsonScanner : public JsonScanner {

~VJsonScanner();

Status open() override;

void close() override;

Status get_next(vectorized::Block& output_block, bool* eof) override;
Status get_next(vectorized::Block* output_block, bool* eof) override;

private:
Status open_vjson_reader();
Expand Down
166 changes: 166 additions & 0 deletions be/test/vec/exec/vjson_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,5 +592,171 @@ TEST_F(VJsonScannerTest, simple_array_json) {
ASSERT_TRUE(eof);
}

TEST_F(VJsonScannerTest, use_jsonpaths_with_file_reader) {
VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
scan_node.init(_tnode);
auto status = scan_node.prepare(&_runtime_state);
EXPECT_TRUE(status.ok());

// set scan range
std::vector<TScanRangeParams> scan_ranges;
{
TScanRangeParams scan_range_params;

TBrokerScanRange broker_scan_range;
broker_scan_range.params = _params;
TBrokerRangeDesc range;
range.start_offset = 0;
range.size = -1;
range.format_type = TFileFormatType::FORMAT_JSON;
range.strip_outer_array = true;
range.__isset.strip_outer_array = true;
range.splittable = true;
range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
range.file_type = TFileType::FILE_LOCAL;
range.jsonpaths =
"[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", "
"\"$.decimal\"]";
range.__isset.jsonpaths = true;
broker_scan_range.ranges.push_back(range);
scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
scan_ranges.push_back(scan_range_params);
}

scan_node.set_scan_ranges(scan_ranges);
status = scan_node.open(&_runtime_state);
EXPECT_TRUE(status.ok());

bool eof = false;
vectorized::Block block;
status = scan_node.get_next(&_runtime_state, &block, &eof);
EXPECT_TRUE(status.ok());
EXPECT_EQ(2, block.rows());
EXPECT_EQ(6, block.columns());

auto columns = block.get_columns_with_type_and_name();
ASSERT_EQ(columns.size(), 6);
ASSERT_EQ(columns[0].to_string(0), "reference");
ASSERT_EQ(columns[0].to_string(1), "fiction");
ASSERT_EQ(columns[1].to_string(0), "NigelRees");
ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh");
ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury");
ASSERT_EQ(columns[2].to_string(1), "SwordofHonour");

block.clear();
status = scan_node.get_next(&_runtime_state, &block, &eof);
ASSERT_EQ(0, block.rows());
ASSERT_TRUE(eof);
}

TEST_F(VJsonScannerTest, use_jsonpaths_with_line_reader) {
VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
scan_node.init(_tnode);
auto status = scan_node.prepare(&_runtime_state);
EXPECT_TRUE(status.ok());

std::vector<TScanRangeParams> scan_ranges;
{
TScanRangeParams scan_range_params;

TBrokerScanRange broker_scan_range;
broker_scan_range.params = _params;
TBrokerRangeDesc range;
range.start_offset = 0;
range.size = -1;
range.format_type = TFileFormatType::FORMAT_JSON;
range.splittable = true;
range.strip_outer_array = true;
range.__isset.strip_outer_array = true;
range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
range.file_type = TFileType::FILE_LOCAL;
range.jsonpaths =
"[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", "
"\"$.decimal\"]";
range.__isset.jsonpaths = true;
range.read_json_by_line = true;
range.__isset.read_json_by_line = true;
broker_scan_range.ranges.push_back(range);
scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
scan_ranges.push_back(scan_range_params);
}

scan_node.set_scan_ranges(scan_ranges);
status = scan_node.open(&_runtime_state);
EXPECT_TRUE(status.ok());

bool eof = false;
vectorized::Block block;
status = scan_node.get_next(&_runtime_state, &block, &eof);
EXPECT_TRUE(status.ok());
EXPECT_EQ(2, block.rows());
EXPECT_EQ(6, block.columns());

auto columns = block.get_columns_with_type_and_name();
ASSERT_EQ(columns.size(), 6);
ASSERT_EQ(columns[0].to_string(0), "reference");
ASSERT_EQ(columns[0].to_string(1), "fiction");
ASSERT_EQ(columns[1].to_string(0), "NigelRees");
ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh");
ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury");
ASSERT_EQ(columns[2].to_string(1), "SwordofHonour");

block.clear();
status = scan_node.get_next(&_runtime_state, &block, &eof);
ASSERT_EQ(0, block.rows());
ASSERT_TRUE(eof);
}

TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) {
VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
scan_node.init(_tnode);
auto status = scan_node.prepare(&_runtime_state);
EXPECT_TRUE(status.ok());

// set scan range
std::vector<TScanRangeParams> scan_ranges;
{
TScanRangeParams scan_range_params;

TBrokerScanRange broker_scan_range;
broker_scan_range.params = _params;
TBrokerRangeDesc range;
range.start_offset = 0;
range.size = -1;
range.format_type = TFileFormatType::FORMAT_JSON;
range.strip_outer_array = true;
range.__isset.strip_outer_array = true;
range.splittable = true;
range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
range.file_type = TFileType::FILE_LOCAL;
range.jsonpaths = "[\"$.k1\", \"$.k2\", \"$.k3\", \"$.k4\", \"$.k5\", \"$.k6\"]";
range.__isset.jsonpaths = true;
broker_scan_range.ranges.push_back(range);
scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range);
scan_ranges.push_back(scan_range_params);
}

scan_node.set_scan_ranges(scan_ranges);
status = scan_node.open(&_runtime_state);
EXPECT_TRUE(status.ok());

bool eof = false;
vectorized::Block block;
status = scan_node.get_next(&_runtime_state, &block, &eof);
EXPECT_TRUE(status.ok());
EXPECT_EQ(2, block.rows());
EXPECT_EQ(6, block.columns());

auto columns = block.get_columns_with_type_and_name();
ASSERT_EQ(columns.size(), 6);
ASSERT_EQ(columns[0].to_string(0), "\\N");
ASSERT_EQ(columns[0].to_string(1), "\\N");
ASSERT_EQ(columns[1].to_string(0), "\\N");
ASSERT_EQ(columns[1].to_string(1), "\\N");
ASSERT_EQ(columns[2].to_string(0), "\\N");
ASSERT_EQ(columns[2].to_string(1), "\\N");
block.clear();
}

} // namespace vectorized
} // namespace doris

0 comments on commit 3fbc11e

Please sign in to comment.