Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]add vectorized vjson_scanner #9311

Merged
merged 9 commits into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ Status BaseScanner::init_expr_ctxes() {
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);
}

if (has_slot_id_map) {
auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
Expand Down Expand Up @@ -279,6 +278,58 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
return Status::OK();
}

Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) {
// filter block
if (!_vpre_filter_ctxs.empty()) {
for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
auto old_rows = temp_block->rows();
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num));
_counter->num_rows_unselected += old_rows - temp_block->rows();
}
}
return Status::OK();
}

Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) {
// Do vectorized expr here
Status status;
if (!_dest_vexpr_ctx.empty()) {
*output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
_dest_vexpr_ctx, *temp_block, status);
if (UNLIKELY(output_block->rows() == 0)) {
return status;
}
}

return Status::OK();
}

Status BaseScanner::fill_dest_block(vectorized::Block* dest_block,
std::vector<vectorized::MutableColumnPtr>& columns) {
if (columns.empty() || columns[0]->size() == 0) {
return Status::OK();
}

std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block());
auto n_columns = 0;
for (const auto slot_desc : _src_slot_descs) {
temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}

RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size()));

if (_dest_vexpr_ctx.empty()) {
*dest_block = *temp_block;
} else {
RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get()));
}

return Status::OK();
}

void BaseScanner::fill_slots_of_columns_from_path(
int start, const std::vector<std::string>& columns_from_path) {
// values of columns from path can not be null
Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "runtime/tuple.h"
#include "util/runtime_profile.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

namespace doris {

Expand Down Expand Up @@ -75,11 +76,18 @@ class BaseScanner {
virtual void close() = 0;
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);

Status fill_dest_block(vectorized::Block* dest_block,
std::vector<vectorized::MutableColumnPtr>& columns);

void fill_slots_of_columns_from_path(int start,
const std::vector<std::string>& columns_from_path);

void free_expr_local_allocations();

Status filter_block(vectorized::Block* temp_block, size_t slot_num);

Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block);

protected:
RuntimeState* _state;
const TBrokerScanRangeParams& _params;
Expand Down
13 changes: 10 additions & 3 deletions be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "vec/exec/vbroker_scanner.h"
#include "vec/exec/vjson_scanner.h"

namespace doris {

Expand Down Expand Up @@ -234,9 +235,15 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
counter);
break;
case TFileFormatType::FORMAT_JSON:
scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses, _pre_filter_texprs,
counter);
if (_vectorized) {
scan = new vectorized::VJsonScanner(
_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges,
scan_range.broker_addresses, _pre_filter_texprs, counter);
} else {
scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
}
break;
default:
if (_vectorized) {
Expand Down
61 changes: 39 additions & 22 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,17 @@ Status JsonScanner::open_next_reader() {
_scanner_eof = true;
return Status::OK();
}
RETURN_IF_ERROR(open_based_reader());
RETURN_IF_ERROR(open_json_reader());
_next_range++;
return Status::OK();
}

Status JsonScanner::open_based_reader() {
RETURN_IF_ERROR(open_file_reader());
if (_read_json_by_line) {
RETURN_IF_ERROR(open_line_reader());
}
RETURN_IF_ERROR(open_json_reader());
_next_range++;

return Status::OK();
}

Expand Down Expand Up @@ -215,6 +218,25 @@ Status JsonScanner::open_json_reader() {
bool num_as_string = false;
bool fuzzy_parse = false;

RETURN_IF_ERROR(
get_range_params(jsonpath, json_root, strip_outer_array, num_as_string, fuzzy_parse));
if (_read_json_by_line) {
_cur_json_reader =
new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader);
} else {
_cur_json_reader =
new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
fuzzy_parse, &_scanner_eof, _cur_file_reader);
}

RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root));
return Status::OK();
}

Status JsonScanner::get_range_params(std::string& jsonpath, std::string& json_root,
bool& strip_outer_array, bool& num_as_string,
bool& fuzzy_parse) {
const TBrokerRangeDesc& range = _ranges[_next_range];

if (range.__isset.jsonpaths) {
Expand All @@ -232,17 +254,6 @@ Status JsonScanner::open_json_reader() {
if (range.__isset.fuzzy_parse) {
fuzzy_parse = range.fuzzy_parse;
}
if (_read_json_by_line) {
_cur_json_reader =
new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader);
} else {
_cur_json_reader =
new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
fuzzy_parse, &_scanner_eof, _cur_file_reader);
}

RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root));
return Status::OK();
}

Expand Down Expand Up @@ -308,14 +319,8 @@ JsonReader::~JsonReader() {
}

Status JsonReader::init(const std::string& jsonpath, const std::string& json_root) {
// parse jsonpath
if (!jsonpath.empty()) {
Status st = _generate_json_paths(jsonpath, &_parsed_jsonpaths);
RETURN_IF_ERROR(st);
}
if (!json_root.empty()) {
JsonFunctions::parse_json_paths(json_root, &_parsed_json_root);
}
// generate _parsed_jsonpaths and _parsed_json_root
RETURN_IF_ERROR(_parse_jsonpath_and_json_root(jsonpath, json_root));

//improve performance
if (_parsed_jsonpaths.empty()) { // input is a simple json-string
Expand All @@ -330,6 +335,18 @@ Status JsonReader::init(const std::string& jsonpath, const std::string& json_roo
return Status::OK();
}

Status JsonReader::_parse_jsonpath_and_json_root(const std::string& jsonpath,
const std::string& json_root) {
// parse jsonpath
if (!jsonpath.empty()) {
RETURN_IF_ERROR(_generate_json_paths(jsonpath, &_parsed_jsonpaths));
}
if (!json_root.empty()) {
JsonFunctions::parse_json_paths(json_root, &_parsed_json_root);
}
return Status::OK();
}

Status JsonReader::_generate_json_paths(const std::string& jsonpath,
std::vector<std::vector<JsonPath>>* vect) {
rapidjson::Document jsonpaths_doc;
Expand Down
13 changes: 9 additions & 4 deletions be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ class JsonScanner : public BaseScanner {
// Close this scanner
void close() override;

private:
protected:
Status open_file_reader();
Status open_line_reader();
Status open_json_reader();
Status open_next_reader();

private:
Status open_based_reader();
Status get_range_params(std::string& jsonpath, std::string& json_root, bool& strip_outer_array,
bool& num_as_string, bool& fuzzy_parse);

protected:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can delete some redundant protected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will re-check it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can delete some redundant protected

In order to distinguish the function and variables , I prefer to use 2 'protected' key words.

const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;

Expand Down Expand Up @@ -129,7 +133,7 @@ class JsonReader {
Status read_json_row(Tuple* tuple, const std::vector<SlotDescriptor*>& slot_descs,
MemPool* tuple_pool, bool* is_empty_row, bool* eof);

private:
protected:
Status (JsonReader::*_handle_json_callback)(Tuple* tuple,
const std::vector<SlotDescriptor*>& slot_descs,
MemPool* tuple_pool, bool* is_empty_row, bool* eof);
Expand Down Expand Up @@ -158,8 +162,9 @@ class JsonReader {
void _close();
Status _generate_json_paths(const std::string& jsonpath,
std::vector<std::vector<JsonPath>>* vect);
Status _parse_jsonpath_and_json_root(const std::string& jsonpath, const std::string& json_root);

private:
protected:
int _next_line;
int _total_lines;
RuntimeState* _state;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ set(VEC_FILES
exec/vtable_function_node.cpp
exec/vbroker_scan_node.cpp
exec/vbroker_scanner.cpp
exec/vjson_scanner.cpp
exec/join/vhash_join_node.cpp
exprs/vectorized_agg_fn.cpp
exprs/vectorized_fn_call.cpp
Expand Down
Loading