From 8698554cf42d8d885e0010a7cb60255978489829 Mon Sep 17 00:00:00 2001 From: hucheng01 Date: Tue, 26 Apr 2022 20:53:18 +0800 Subject: [PATCH 1/9] add vectorized vjson_scanner --- be/src/exec/base_scanner.cpp | 1 - be/src/exec/base_scanner.h | 7 + be/src/exec/broker_scan_node.cpp | 2 + be/src/exec/json_scanner.h | 15 +- be/src/vec/CMakeLists.txt | 1 + be/src/vec/exec/vbroker_scan_node.cpp | 3 + be/src/vec/exec/vjson_scanner.cpp | 729 ++++++++++++++++++++++++++ be/src/vec/exec/vjson_scanner.h | 158 ++++++ 8 files changed, 912 insertions(+), 4 deletions(-) create mode 100644 be/src/vec/exec/vjson_scanner.cpp create mode 100644 be/src/vec/exec/vjson_scanner.h diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 8621cf75f83de7..d63d8addd69767 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -156,7 +156,6 @@ Status BaseScanner::init_expr_ctxes() { RETURN_IF_ERROR(ctx->open(_state)); _dest_expr_ctx.emplace_back(ctx); } - if (has_slot_id_map) { auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) { diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 9c4179874e0923..1c98548fc68a57 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -21,6 +21,7 @@ #include "exprs/expr.h" #include "runtime/tuple.h" #include "util/runtime_profile.h" +#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vexpr.h" namespace doris { @@ -70,6 +71,10 @@ class BaseScanner { virtual Status get_next(vectorized::Block* block, bool* eof) { return Status::NotSupported("Not Implemented get block"); } + + virtual Status get_next(vectorized::Block& output_block, bool* eof) { + return Status::NotSupported("Not Implemented get block"); + } // Close this scanner virtual void close() = 0; @@ -112,6 +117,8 @@ class BaseScanner { // and will be converted to `_pre_filter_ctxs` when scanner is open. const std::vector _pre_filter_texprs; std::vector _pre_filter_ctxs; + std::vector _dest_vexpr_ctx; + std::vector _vpre_filter_ctxs; bool _strict_mode; diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 513e653fb760a1..504e75d916aa39 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -21,6 +21,8 @@ #include #include "common/object_pool.h" +#include "vec/exec/vbroker_scanner.h" +#include "vec/exec/vjson_scanner.h" #include "exec/json_scanner.h" #include "exec/orc_scanner.h" #include "exec/parquet_scanner.h" diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index b12c96f3961976..7db81ec0fdb751 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -67,11 +67,17 @@ class JsonScanner : public BaseScanner { // Close this scanner void close() override; -private: +protected: Status open_file_reader(); Status open_line_reader(); Status open_json_reader(); Status open_next_reader(); + + bool get_cur_reader_eof() { return _cur_reader_eof; } + bool get_read_json_by_line() { return _read_json_by_line; } + bool get_skip_next_line() { return _skip_next_line; } + FileReader* get_cur_file_reader() { return _cur_file_reader; } + LineReader* get_cur_line_reader() { return _cur_line_reader; } private: const std::vector& _ranges; @@ -129,7 +135,7 @@ class JsonReader { Status read_json_row(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* is_empty_row, bool* eof); -private: +protected: Status (JsonReader::*_handle_json_callback)(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* is_empty_row, bool* eof); @@ -158,7 +164,10 @@ class JsonReader { void _close(); Status _generate_json_paths(const std::string& jsonpath, std::vector>* vect); - + + rapidjson::Value* get_json_doc() { return _json_doc; } + rapidjson::Document* get_origin_json_doc() { return &_origin_json_doc; } + private: int _next_line; int _total_lines; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 7555e9d0ca9222..22fa489f463dae 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -101,6 +101,7 @@ set(VEC_FILES exec/vtable_function_node.cpp exec/vbroker_scan_node.cpp exec/vbroker_scanner.cpp + exec/vjson_scanner.cpp exec/join/vhash_join_node.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index 0386eb6a100076..1299484bce2893 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -152,6 +152,9 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner if (_scan_finished.load() || !_process_status.ok()) { return Status::OK(); } + + // get block + RETURN_IF_ERROR(scanner->get_next(*(block.get()), &scanner_eof)); std::shared_ptr block(new vectorized::Block()); RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof)); diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp new file mode 100644 index 00000000000000..5eb16e76d399b0 --- /dev/null +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -0,0 +1,729 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vjson_scanner.h" + +#include +#include + +#include "env/env.h" +#include "exec/broker_reader.h" +#include "exec/buffered_reader.h" +#include "exec/local_file_reader.h" +#include "exec/plain_text_line_reader.h" +#include "exec/s3_reader.h" +#include "exprs/expr.h" +#include "exprs/json_functions.h" +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/time.h" + +namespace doris::vectorized { + +VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter) + : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _cur_line_reader(nullptr), + _cur_vjson_reader(nullptr), + _next_range(0), + _cur_reader_eof(false), + _read_json_by_line(false) { +} + +VJsonScanner::~VJsonScanner() { + close(); +} + +Status VJsonScanner::open() { + RETURN_IF_ERROR(BaseScanner::open()); + return Status::OK(); +} + +void VJsonScanner::close() { + BaseScanner::close(); + if (_cur_vjson_reader != nullptr) { + delete _cur_vjson_reader; + _cur_vjson_reader = nullptr; + } +} + +Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { + SCOPED_TIMER(_read_timer); + Status status = Status::OK(); + const int batch_size = _state->batch_size(); + size_t slot_num = _src_slot_descs.size(); + std::shared_ptr temp_block(new vectorized::Block()); + std::vector columns(slot_num); + auto string_type = make_nullable(std::make_shared()); + for (int i = 0; i < slot_num; i++) { + columns[i] = string_type->create_column(); + } + + // Get one line + while (columns[0]->size() < batch_size && !_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_reader_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + break; + } + } + + if (_read_json_by_line && _skip_next_line) { + size_t size = 0; + const uint8_t* line_ptr = nullptr; + RETURN_IF_ERROR(_cur_line_reader->read_line(&line_ptr, &size, &_cur_reader_eof)); + _skip_next_line = false; + continue; + } + + bool is_empty_row = false; + RETURN_IF_ERROR(_cur_vjson_reader->read_json_column(columns, _src_slot_descs, + &is_empty_row, &_cur_reader_eof)); + if (is_empty_row) { + // Read empty row, just continue + continue; + } + COUNTER_UPDATE(_rows_read_counter, 1); + SCOPED_TIMER(_materialize_timer); + } + + if (columns[0]->size() > 0) { + if (!_dest_vexpr_ctx.empty()) { + auto n_columns = 0; + for (const auto slot_desc : _src_slot_descs) { + temp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + // filter src tuple by preceding filter first + if (!_vpre_filter_ctxs.empty()) { + for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { + RETURN_IF_ERROR(VExprContext::filter_block(_vpre_filter_ctx, &output_block, slot_num)); + } + } + + // Do vectorized expr here to speed up load + output_block = VExprContext::get_output_block_after_execute_exprs(_dest_vexpr_ctx, + *(temp_block.get()), status); + if (UNLIKELY(output_block.rows() == 0)) { + return status; + } + } else { + auto n_columns = 0; + for (const auto slot_desc : _src_slot_descs) { + output_block.insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + // filter src tuple by preceding filter first + if (!_vpre_filter_ctxs.empty()) { + for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { + RETURN_IF_ERROR(VExprContext::filter_block(_vpre_filter_ctx, &output_block, slot_num)); + } + } + } + } + + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status VJsonScanner::open_next_reader() { + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + + // init the file reader + RETURN_IF_ERROR(JsonScanner::open_file_reader()); + _cur_file_reader = JsonScanner::get_cur_file_reader(); + _cur_reader_eof = JsonScanner::get_cur_reader_eof(); + _read_json_by_line = JsonScanner::get_read_json_by_line(); + + // init line reader + if (_read_json_by_line) { + RETURN_IF_ERROR(JsonScanner::open_line_reader()); + _cur_line_reader = JsonScanner::get_cur_line_reader(); + _cur_reader_eof = JsonScanner::get_cur_reader_eof(); + _skip_next_line = JsonScanner::get_skip_next_line(); + } + + RETURN_IF_ERROR(open_vjson_reader()); + _next_range++; + + return Status::OK(); +} + +Status VJsonScanner::open_vjson_reader() { + if (_cur_vjson_reader != nullptr) { + delete _cur_vjson_reader; + _cur_vjson_reader = nullptr; + } + std::string json_root = ""; + std::string jsonpath = ""; + bool strip_outer_array = false; + bool num_as_string = false; + bool fuzzy_parse = false; + + const TBrokerRangeDesc& range = _ranges[_next_range]; + + if (range.__isset.jsonpaths) { + jsonpath = range.jsonpaths; + } + if (range.__isset.json_root) { + json_root = range.json_root; + } + if (range.__isset.strip_outer_array) { + strip_outer_array = range.strip_outer_array; + } + if (range.__isset.num_as_string) { + num_as_string = range.num_as_string; + } + if (range.__isset.fuzzy_parse) { + fuzzy_parse = range.fuzzy_parse; + } + + if (_read_json_by_line) { + _cur_vjson_reader = new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, + fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader); + } else { + _cur_vjson_reader = new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, + fuzzy_parse, &_scanner_eof, _cur_file_reader); + } + + RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root)); + return Status::OK(); +} + +VJsonReader::VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, + bool strip_outer_array, bool num_as_string,bool fuzzy_parse, + bool* scanner_eof, FileReader* file_reader, LineReader* line_reader) + : JsonReader(state, counter, profile, strip_outer_array, num_as_string, fuzzy_parse, + scanner_eof, file_reader, line_reader), + _vhandle_json_callback(nullptr), + _next_line(0), + _total_lines(0), + _state(state), + _counter(counter), + _profile(profile), + _strip_outer_array(strip_outer_array), + _fuzzy_parse(fuzzy_parse), + _json_doc(nullptr), + _scanner_eof(scanner_eof) { +} + +VJsonReader::~VJsonReader() { + +} + +Status VJsonReader::init(const std::string& jsonpath, const std::string& json_root) { + // parse jsonpath + if (!jsonpath.empty()) { + Status st = JsonReader::_generate_json_paths(jsonpath, &_parsed_jsonpaths); + RETURN_IF_ERROR(st); + } + if (!json_root.empty()) { + JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); + } + + //improve performance + if (_parsed_jsonpaths.empty()) { // input is a simple json-string + _vhandle_json_callback = &VJsonReader::_vhandle_simple_json; + } else { // input is a complex json-string and a json-path + if (_strip_outer_array) { + _vhandle_json_callback = &VJsonReader::_vhandle_flat_array_complex_json; + } else { + _vhandle_json_callback = &VJsonReader::_vhandle_nested_complex_json; + } + } + + return Status::OK(); +} + +Status VJsonReader::read_json_column(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row, eof); +} + +Status VJsonReader::_vhandle_simple_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + do { + bool valid = false; + if (_next_line >= _total_lines) { // parse json and generic document + size_t size = 0; + Status st = JsonReader::_parse_json_doc(&size, eof); + if (st.is_data_quality_error()) { + continue; // continue to read next + } + RETURN_IF_ERROR(st); // terminate if encounter other errors + if (size == 0 || *eof) { // read all data, then return + *is_empty_row = true; + return Status::OK(); + } + _name_map.clear(); + rapidjson::Value* objectValue = nullptr; + _json_doc = VJsonReader::get_json_doc(); + if (_json_doc->IsArray()) { + _total_lines = _json_doc->Size(); + if (_total_lines == 0) { + // may be passing an empty json, such as "[]" + RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(*_json_doc); }, + [&]() -> std::string { return "Empty json line"; }, _scanner_eof)); + _counter->num_rows_filtered++; + if (*_scanner_eof) { + *is_empty_row = true; + return Status::OK(); + } + continue; + } + objectValue = &(*_json_doc)[0]; + } else { + _total_lines = 1; // only one row + objectValue = _json_doc; + } + _next_line = 0; + if (_fuzzy_parse) { + for (auto v : slot_descs) { + for (int i = 0; i < objectValue->MemberCount(); ++i) { + auto it = objectValue->MemberBegin() + i; + if (v->col_name() == it->name.GetString()) { + _name_map[v->col_name()] = i; + break; + } + } + } + } + } + + if (_json_doc->IsArray()) { // handle case 1 + rapidjson::Value& objectValue = (*_json_doc)[_next_line]; // json object + RETURN_IF_ERROR(_set_column_value(objectValue, columns, slot_descs, &valid)); + } else { // handle case 2 + RETURN_IF_ERROR(_set_column_value(*_json_doc, columns, slot_descs, &valid)); + } + _next_line++; + if (!valid) { + if (*_scanner_eof) { + // When _scanner_eof is true and valid is false, it means that we have encountered + // unqualified data and decided to stop the scan. + *is_empty_row = true; + return Status::OK(); + } + continue; + } + *is_empty_row = false; + break; // get a valid row, then break + } while (_next_line <= _total_lines); + return Status::OK(); +} + +// for simple format json +// set valid to true and return OK if succeed. +// set valid to false and return OK if we met an invalid row. +// return other status if encounter other problmes. +Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, std::vector& columns, + const std::vector& slot_descs, bool* valid) { + if (!objectValue.IsObject()) { + // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, + // not other type of Json format. + RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, + [&]() -> std::string { return "Expect json object value"; }, _scanner_eof)); + _counter->num_rows_filtered++; + *valid = false; // current row is invalid + return Status::OK(); + } + + int nullcount = 0; + int ctx_idx = 0; + for (auto slot_desc : slot_descs) { + int dest_index = ctx_idx++; + auto* column_ptr = columns[dest_index].get(); + rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd(); + + if (_fuzzy_parse) { + auto idx_it = _name_map.find(slot_desc->col_name()); + if (idx_it != _name_map.end() && idx_it->second < objectValue.MemberCount()) { + it = objectValue.MemberBegin() + idx_it->second; + } + } else { + it = objectValue.FindMember( + rapidjson::Value(slot_desc->col_name().c_str(), slot_desc->col_name().size())); + } + + if (it != objectValue.MemberEnd()) { + const rapidjson::Value& value = it->value; + RETURN_IF_ERROR(_write_data_to_column(&value, slot_desc, column_ptr, valid)); + if (!(*valid)) { + return Status::OK(); + } + } else { // not found + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(column_ptr); + nullable_column->insert_data(nullptr, 0); + nullcount++; + // LOG(INFO) << "not found in objectValue"; + } else { + RETURN_IF_ERROR( _state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, + [&]() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", slot_desc->col_name()); + return fmt::to_string(error_msg); + }, _scanner_eof)); + _counter->num_rows_filtered++; + *valid = false; // current row is invalid + break; + } + } + } + + if (nullcount == slot_descs.size()) { + RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, + [&]() -> std::string { return "All fields is null, this is a invalid row."; }, _scanner_eof)); + _counter->num_rows_filtered++; + *valid = false; + return Status::OK(); + } + *valid = true; + return Status::OK(); +} + +Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value, SlotDescriptor* slot_desc, + vectorized::IColumn* column_ptr, bool* valid) { + const char* str_value = nullptr; + uint8_t tmp_buf[128] = {0}; + int32_t wbytes = 0; + + if (slot_desc->is_nullable()) { + auto* nullable_column = + reinterpret_cast(column_ptr); + nullable_column->get_null_map_data().push_back(0); + column_ptr = &nullable_column->get_nested_column(); + } + + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + wbytes = strlen(str_value); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf((char *)tmp_buf, "%u", value->GetUint()); + str_value = (char *)tmp_buf; + } else if (value->IsInt()) { + wbytes = sprintf((char *)tmp_buf, "%d", value->GetInt()); + str_value = (char *)tmp_buf; + } else if (value->IsUint64()) { + wbytes = sprintf((char *)tmp_buf, "%lu", value->GetUint64()); + str_value = (char *)tmp_buf; + } else if (value->IsInt64()) { + wbytes = sprintf((char *)tmp_buf, "%ld", value->GetInt64()); + str_value = (char *)tmp_buf; + } else { + wbytes = sprintf((char *)tmp_buf, "%f", value->GetDouble()); + str_value = (char *)tmp_buf; + } + break; + case rapidjson::Type::kFalseType: + wbytes = 1; + str_value = (char *)"0"; + break; + case rapidjson::Type::kTrueType: + wbytes = 1; + str_value = (char *)"1"; + break; + case rapidjson::Type::kNullType: + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(column_ptr); + nullable_column->insert_data(nullptr, 0); + } else { + RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(*value); }, + [&]() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "Json value is null, but the column `{}` is not nullable.", slot_desc->col_name()); + return fmt::to_string(error_msg); + }, _scanner_eof)); + _counter->num_rows_filtered++; + *valid = false; + return Status::OK(); + } + break; + default: + // for other type like array or object. we convert it to string to save + std::string json_str = JsonReader::_print_json_value(*value); + wbytes = json_str.size(); + str_value = json_str.c_str(); + break; + } + + RETURN_IF_ERROR(_insert_to_column(column_ptr, slot_desc, str_value, wbytes)); + + *valid = true; + return Status::OK(); +} + +Status VJsonReader::_insert_to_column(vectorized::IColumn* column_ptr, SlotDescriptor* slot_desc, + const char* value_ptr, int32_t& wbytes) { + switch (slot_desc->type().type) { + case TYPE_BOOLEAN: { + assert_cast*>(column_ptr)->insert_data(value_ptr, 0); + break; + } + case TYPE_TINYINT: { + assert_cast*>(column_ptr)->insert_data(value_ptr, 0); + break; + } + case TYPE_SMALLINT: { + assert_cast*>(column_ptr)->insert_data(value_ptr, 0); + break; + } + case TYPE_INT: { + assert_cast*>(column_ptr)->insert_data(value_ptr, 0); + break; + } + case TYPE_BIGINT: { + assert_cast*>(column_ptr)->insert_data(value_ptr, 0); + break; + } + case TYPE_LARGEINT: { + assert_cast*>(column_ptr)->insert_data(value_ptr, 0); + break; + } + case TYPE_FLOAT: { + assert_cast*>(column_ptr)->insert_data(value_ptr, 0); + break; + } + case TYPE_DOUBLE: { + assert_cast*>(column_ptr)->insert_data(value_ptr, 0); + break; + } + case TYPE_CHAR: { + assert_cast(column_ptr)->insert_data(value_ptr, wbytes); + break; + } + case TYPE_VARCHAR: + case TYPE_STRING: { + assert_cast(column_ptr)->insert_data(value_ptr, wbytes); + break; + } + case TYPE_OBJECT: { + Slice slice(value_ptr, wbytes); + // insert_default() + auto* target_column = assert_cast(column_ptr); + + target_column->insert_default(); + BitmapValue* pvalue = nullptr; + int pos = target_column->size() - 1; + pvalue = &target_column->get_element(pos); + + if (slice.size != 0) { + BitmapValue value; + value.deserialize(slice.data); + *pvalue = std::move(value); + } else { + *pvalue = std::move(*reinterpret_cast(slice.data)); + } + break; + } + case TYPE_HLL: { + Slice slice(value_ptr, wbytes); + auto* target_column = assert_cast(column_ptr); + + target_column->insert_default(); + HyperLogLog* pvalue = nullptr; + int pos = target_column->size() - 1; + pvalue = &target_column->get_element(pos); + if (slice.size != 0) { + HyperLogLog value; + value.deserialize(slice); + *pvalue = std::move(value); + } else { + *pvalue = std::move(*reinterpret_cast(slice.data)); + } + break; + } + case TYPE_DECIMALV2: { + assert_cast*>(column_ptr) + ->insert_data(value_ptr, 0); + break; + } + case TYPE_DATETIME: { + Slice slice(value_ptr, wbytes); + DateTimeValue value = *reinterpret_cast(slice.data); + VecDateTimeValue date; + date.convert_dt_to_vec_dt(&value); + assert_cast*>(column_ptr) + ->insert_data(reinterpret_cast(&date), 0); + break; + } + case TYPE_DATE: { + Slice slice(value_ptr, wbytes); + DateTimeValue value = *reinterpret_cast(slice.data); + VecDateTimeValue date; + date.convert_dt_to_vec_dt(&value); + assert_cast*>(column_ptr) + ->insert_data(reinterpret_cast(&date), 0); + break; + } + default: { + DCHECK(false) << "bad slot type: " << slot_desc->type(); + break; + } + } + return Status::OK(); +} + +Status VJsonReader::_vhandle_flat_array_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + do { + if (_next_line >= _total_lines) { + size_t size = 0; + Status st = JsonReader::_parse_json_doc(&size, eof); + if (st.is_data_quality_error()) { + continue; // continue to read next + } + RETURN_IF_ERROR(st); // terminate if encounter other errors + if (size == 0 || *eof) { // read all data, then return + *is_empty_row = true; + return Status::OK(); + } + _json_doc = JsonReader::get_json_doc(); + _total_lines = _json_doc->Size(); + _next_line = 0; + + if (_total_lines == 0) { + // meet an empty json array. + *is_empty_row = true; + continue; + } + } + rapidjson::Value& objectValue = (*_json_doc)[_next_line++]; + bool valid = true; + RETURN_IF_ERROR(_write_columns_by_jsonpath(objectValue, slot_descs, columns, &valid)); + if (!valid) { + continue; // process next line + } + *is_empty_row = false; + break; // get a valid row, then break + } while (_next_line <= _total_lines); + return Status::OK(); +} + +Status VJsonReader::_vhandle_nested_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + while (true) { + size_t size = 0; + Status st = JsonReader::_parse_json_doc(&size, eof); + if (st.is_data_quality_error()) { + continue; // continue to read next + } + RETURN_IF_ERROR(st); + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); // read over,then return + } + *is_empty_row = false; + break; // read a valid row + } + bool valid = true; + _json_doc = JsonReader::get_json_doc(); + RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs, columns, &valid)); + if (!valid) { + // there is only one line in this case, so if it return false, just set is_empty_row true + // so that the caller will continue reading next line. + *is_empty_row = true; + } + return Status::OK(); +} + +Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, + const std::vector& slot_descs, + std::vector& columns, + bool* valid) { + int nullcount = 0; + int ctx_idx = 0; + size_t column_num = slot_descs.size(); + for (size_t i = 0; i < column_num; i++) { + int dest_index = ctx_idx++; + auto* column_ptr = columns[dest_index].get(); + _origin_json_doc_ptr = JsonReader::get_origin_json_doc(); + rapidjson::Value* json_values = nullptr; + bool wrap_explicitly = false; + if (LIKELY(i < _parsed_jsonpaths.size())) { + json_values = JsonFunctions::get_json_array_from_parsed_json( + _parsed_jsonpaths[i], &objectValue, _origin_json_doc_ptr->GetAllocator(), &wrap_explicitly); + } + + if (json_values == nullptr) { + // not match in jsondata. + if (slot_descs[i]->is_nullable()) { + auto* nullable_column = reinterpret_cast(column_ptr); + nullable_column->insert_data(nullptr, 0); + nullcount++; + } else { + RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return _print_json_value(objectValue); }, + [&]() -> std::string { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", slot_descs[i]->col_name()); + return fmt::to_string(error_msg); + }, _scanner_eof)); + _counter->num_rows_filtered++; + *valid = false; // current row is invalid + break; + } + } else { + CHECK(json_values->IsArray()); + if (json_values->Size() == 1 && wrap_explicitly) { + // NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array. + // so here we unwrap the array to get the real element. + // if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap. + json_values = &((*json_values)[0]); + } + // RETURN_IF_ERROR(_write_data_to_tuple(json_values, slot_descs[i], tuple, tuple_pool, valid)); + RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], column_ptr, valid)); + if (!(*valid)) { + break; + } + } + } + if (nullcount == column_num) { + RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return _print_json_value(objectValue); }, + [&]() -> std::string { return "All fields is null or not matched, this is a invalid row."; }, _scanner_eof)); + _counter->num_rows_filtered++; + *valid = false; + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h new file mode 100644 index 00000000000000..7fbf4497717066 --- /dev/null +++ b/be/src/vec/exec/vjson_scanner.h @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_VJSON_SCANNER_H_ +#define BE_SRC_VJSON_SCANNER_H_ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "exec/json_scanner.h" +#include "exec/exec_node.h" +#include "runtime/row_batch.h" +#include "runtime/descriptors.h" +#include "runtime/mem_pool.h" +#include "runtime/tuple.h" +#include "runtime/mem_tracker.h" +#include "util/runtime_profile.h" +#include "exprs/expr_context.h" + +namespace doris { +class ExprContext; + +namespace vectorized { +class VJsonReader; + +class VJsonScanner : public JsonScanner { +public: + VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, + ScannerCounter* counter); + + ~VJsonScanner(); + + Status open() override; + + void close() override; + + // Status get_next(std::vector& columns, bool* eof) override; + Status get_next(vectorized::Block& output_block, bool* eof) override; + +private: + Status open_vjson_reader(); + Status open_next_reader(); + +private: + const std::vector& _ranges; + const std::vector& _broker_addresses; + + // Reader + FileReader* _cur_file_reader; + LineReader* _cur_line_reader; + VJsonReader* _cur_vjson_reader; + + int _next_range; + bool _cur_reader_eof; + bool _read_json_by_line; + + // When we fetch range doesn't start from 0, + // we will read to one ahead, and skip the first line + bool _skip_next_line; +}; + +class VJsonReader : public JsonReader { +public: + VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, + bool strip_outer_array, bool num_as_string,bool fuzzy_parse, + bool* scanner_eof, FileReader* file_reader = nullptr, LineReader* line_reader = nullptr); + + ~VJsonReader(); + + Status init(const std::string& jsonpath, const std::string& json_root); + + Status read_json_column(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); + +private: + Status (VJsonReader::*_vhandle_json_callback)(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); + + Status _vhandle_simple_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); + + Status _vhandle_flat_array_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); + + Status _vhandle_nested_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); + + Status _write_columns_by_jsonpath(rapidjson::Value& objectValue, + const std::vector& slot_descs, + std::vector& columns, + bool* valid); + + Status _set_column_value(rapidjson::Value& objectValue, std::vector& columns, + const std::vector& slot_descs, bool* valid); + + Status _write_data_to_column(rapidjson::Value::ConstValueIterator value, SlotDescriptor* slot_desc, + vectorized::IColumn* column_ptr, bool* valid); + + Status _insert_to_column(vectorized::IColumn* column_ptr, SlotDescriptor* slot_desc, + const char* value_ptr, int32_t& wbytes); + +private: + int _next_line; + int _total_lines; + RuntimeState* _state; + ScannerCounter* _counter; + RuntimeProfile* _profile; + + bool _strip_outer_array; + bool _fuzzy_parse; + RuntimeProfile::Counter* _read_timer; + + std::vector> _parsed_jsonpaths; + std::vector _parsed_json_root; + + rapidjson::Document* _origin_json_doc_ptr; // origin json document object from parsed json string + rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root` + std::unordered_map _name_map; + // point to the _scanner_eof of JsonScanner + bool* _scanner_eof; +}; + +} // vectorized +} // namespace doris +#endif From 4fd2994de89eb1709f39bb2c0c220df6318f0b06 Mon Sep 17 00:00:00 2001 From: hucheng01 Date: Fri, 6 May 2022 11:36:44 +0800 Subject: [PATCH 2/9] add vectorized vjson_scanner and apply vexpr --- be/src/exec/base_scanner.cpp | 23 +++ be/src/exec/base_scanner.h | 3 + be/src/exec/json_scanner.h | 15 +- be/src/vec/exec/vjson_scanner.cpp | 316 +++++++++--------------------- be/src/vec/exec/vjson_scanner.h | 38 +--- 5 files changed, 127 insertions(+), 268 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index d63d8addd69767..bd8d5c9f992f54 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -278,6 +278,29 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { return Status::OK(); } +Status BaseScanner::filter_block_and_execute_exprs(vectorized::Block* output_block, + vectorized::Block* temp_block, size_t slot_num) { + Status status; + // filter src tuple by preceding filter first + if (!_vpre_filter_ctxs.empty()) { + for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { + auto old_rows = output_block->rows(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, + output_block, slot_num)); + _counter->num_rows_unselected += old_rows - output_block->rows(); + } + } + + // Do vectorized expr here to speed up load + *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(_dest_vexpr_ctx, + *temp_block, status); + if (UNLIKELY(output_block->rows() == 0)) { + return status; + } + + return Status::OK(); +} + void BaseScanner::fill_slots_of_columns_from_path( int start, const std::vector& columns_from_path) { // values of columns from path can not be null diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 1c98548fc68a57..994230714ed719 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -85,6 +85,9 @@ class BaseScanner { void free_expr_local_allocations(); + Status filter_block_and_execute_exprs(vectorized::Block* output_block, + vectorized::Block* temp_block, size_t slot_num); + protected: RuntimeState* _state; const TBrokerScanRangeParams& _params; diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index 7db81ec0fdb751..9ac628dbdd34fa 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -72,14 +72,8 @@ class JsonScanner : public BaseScanner { Status open_line_reader(); Status open_json_reader(); Status open_next_reader(); - - bool get_cur_reader_eof() { return _cur_reader_eof; } - bool get_read_json_by_line() { return _read_json_by_line; } - bool get_skip_next_line() { return _skip_next_line; } - FileReader* get_cur_file_reader() { return _cur_file_reader; } - LineReader* get_cur_line_reader() { return _cur_line_reader; } -private: +protected: const std::vector& _ranges; const std::vector& _broker_addresses; @@ -164,11 +158,8 @@ class JsonReader { void _close(); Status _generate_json_paths(const std::string& jsonpath, std::vector>* vect); - - rapidjson::Value* get_json_doc() { return _json_doc; } - rapidjson::Document* get_origin_json_doc() { return &_origin_json_doc; } - -private: + +protected: int _next_line; int _total_lines; RuntimeState* _state; diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index 5eb16e76d399b0..64ad60701ddf4d 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -41,14 +41,7 @@ VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter) : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), - _ranges(ranges), - _broker_addresses(broker_addresses), - _cur_file_reader(nullptr), - _cur_line_reader(nullptr), - _cur_vjson_reader(nullptr), - _next_range(0), - _cur_reader_eof(false), - _read_json_by_line(false) { + _cur_vjson_reader(nullptr) { } VJsonScanner::~VJsonScanner() { @@ -62,15 +55,10 @@ Status VJsonScanner::open() { void VJsonScanner::close() { BaseScanner::close(); - if (_cur_vjson_reader != nullptr) { - delete _cur_vjson_reader; - _cur_vjson_reader = nullptr; - } } Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { SCOPED_TIMER(_read_timer); - Status status = Status::OK(); const int batch_size = _state->batch_size(); size_t slot_num = _src_slot_descs.size(); std::shared_ptr temp_block(new vectorized::Block()); @@ -118,19 +106,7 @@ Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { slot_desc->col_name())); } - // filter src tuple by preceding filter first - if (!_vpre_filter_ctxs.empty()) { - for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { - RETURN_IF_ERROR(VExprContext::filter_block(_vpre_filter_ctx, &output_block, slot_num)); - } - } - - // Do vectorized expr here to speed up load - output_block = VExprContext::get_output_block_after_execute_exprs(_dest_vexpr_ctx, - *(temp_block.get()), status); - if (UNLIKELY(output_block.rows() == 0)) { - return status; - } + RETURN_IF_ERROR(filter_block_and_execute_exprs(&output_block, temp_block.get(), slot_num)); } else { auto n_columns = 0; for (const auto slot_desc : _src_slot_descs) { @@ -140,10 +116,12 @@ Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { } // filter src tuple by preceding filter first - if (!_vpre_filter_ctxs.empty()) { + if (!_vpre_filter_ctxs.empty()) { + auto old_rows = output_block.rows(); for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { RETURN_IF_ERROR(VExprContext::filter_block(_vpre_filter_ctx, &output_block, slot_num)); } + _counter->num_rows_unselected += old_rows - output_block.rows(); } } } @@ -162,18 +140,12 @@ Status VJsonScanner::open_next_reader() { return Status::OK(); } - // init the file reader + // init file reader RETURN_IF_ERROR(JsonScanner::open_file_reader()); - _cur_file_reader = JsonScanner::get_cur_file_reader(); - _cur_reader_eof = JsonScanner::get_cur_reader_eof(); - _read_json_by_line = JsonScanner::get_read_json_by_line(); // init line reader if (_read_json_by_line) { RETURN_IF_ERROR(JsonScanner::open_line_reader()); - _cur_line_reader = JsonScanner::get_cur_line_reader(); - _cur_reader_eof = JsonScanner::get_cur_reader_eof(); - _skip_next_line = JsonScanner::get_skip_next_line(); } RETURN_IF_ERROR(open_vjson_reader()); @@ -184,8 +156,7 @@ Status VJsonScanner::open_next_reader() { Status VJsonScanner::open_vjson_reader() { if (_cur_vjson_reader != nullptr) { - delete _cur_vjson_reader; - _cur_vjson_reader = nullptr; + _cur_vjson_reader.reset(); } std::string json_root = ""; std::string jsonpath = ""; @@ -212,11 +183,11 @@ Status VJsonScanner::open_vjson_reader() { } if (_read_json_by_line) { - _cur_vjson_reader = new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, - fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader); + _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, + fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader)); } else { - _cur_vjson_reader = new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, - fuzzy_parse, &_scanner_eof, _cur_file_reader); + _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, + fuzzy_parse, &_scanner_eof, _cur_file_reader)); } RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root)); @@ -228,16 +199,7 @@ VJsonReader::VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimePr bool* scanner_eof, FileReader* file_reader, LineReader* line_reader) : JsonReader(state, counter, profile, strip_outer_array, num_as_string, fuzzy_parse, scanner_eof, file_reader, line_reader), - _vhandle_json_callback(nullptr), - _next_line(0), - _total_lines(0), - _state(state), - _counter(counter), - _profile(profile), - _strip_outer_array(strip_outer_array), - _fuzzy_parse(fuzzy_parse), - _json_doc(nullptr), - _scanner_eof(scanner_eof) { + _vhandle_json_callback(nullptr) { } VJsonReader::~VJsonReader() { @@ -280,26 +242,22 @@ Status VJsonReader::_vhandle_simple_json(std::vector& columns, do { bool valid = false; if (_next_line >= _total_lines) { // parse json and generic document - size_t size = 0; - Status st = JsonReader::_parse_json_doc(&size, eof); + Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { continue; // continue to read next } - RETURN_IF_ERROR(st); // terminate if encounter other errors - if (size == 0 || *eof) { // read all data, then return - *is_empty_row = true; + RETURN_IF_ERROR(st); + if (*is_empty_row == true && st == Status::OK()) { return Status::OK(); - } + } _name_map.clear(); rapidjson::Value* objectValue = nullptr; - _json_doc = VJsonReader::get_json_doc(); if (_json_doc->IsArray()) { _total_lines = _json_doc->Size(); if (_total_lines == 0) { // may be passing an empty json, such as "[]" - RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(*_json_doc); }, - [&]() -> std::string { return "Empty json line"; }, _scanner_eof)); - _counter->num_rows_filtered++; + std::string err_msg("Empty json line"); + RETURN_IF_ERROR(_append_error_msg(*_json_doc, err_msg, nullptr)); if (*_scanner_eof) { *is_empty_row = true; return Status::OK(); @@ -356,10 +314,8 @@ Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, std::vector if (!objectValue.IsObject()) { // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, // not other type of Json format. - RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, - [&]() -> std::string { return "Expect json object value"; }, _scanner_eof)); - _counter->num_rows_filtered++; - *valid = false; // current row is invalid + std::string err_msg("Expect json object value"); + RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); return Status::OK(); } @@ -391,26 +347,19 @@ Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, std::vector auto* nullable_column = reinterpret_cast(column_ptr); nullable_column->insert_data(nullptr, 0); nullcount++; - // LOG(INFO) << "not found in objectValue"; } else { - RETURN_IF_ERROR( _state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, - [&]() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", slot_desc->col_name()); - return fmt::to_string(error_msg); - }, _scanner_eof)); - _counter->num_rows_filtered++; - *valid = false; // current row is invalid + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", slot_desc->col_name()); + std::string err_msg = fmt::to_string(error_msg); + RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); break; } } } if (nullcount == slot_descs.size()) { - RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, - [&]() -> std::string { return "All fields is null, this is a invalid row."; }, _scanner_eof)); - _counter->num_rows_filtered++; - *valid = false; + std::string err_msg("All fields is null, this is a invalid row."); + RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); return Status::OK(); } *valid = true; @@ -466,14 +415,10 @@ Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator v auto* nullable_column = reinterpret_cast(column_ptr); nullable_column->insert_data(nullptr, 0); } else { - RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return JsonReader::_print_json_value(*value); }, - [&]() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "Json value is null, but the column `{}` is not nullable.", slot_desc->col_name()); - return fmt::to_string(error_msg); - }, _scanner_eof)); - _counter->num_rows_filtered++; - *valid = false; + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "Json value is null, but the column `{}` is not nullable.", slot_desc->col_name()); + std::string err_msg = fmt::to_string(error_msg); + RETURN_IF_ERROR(_append_error_msg(*value, err_msg, valid)); return Status::OK(); } break; @@ -494,110 +439,14 @@ Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator v Status VJsonReader::_insert_to_column(vectorized::IColumn* column_ptr, SlotDescriptor* slot_desc, const char* value_ptr, int32_t& wbytes) { switch (slot_desc->type().type) { - case TYPE_BOOLEAN: { - assert_cast*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_TINYINT: { - assert_cast*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_SMALLINT: { - assert_cast*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_INT: { - assert_cast*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_BIGINT: { - assert_cast*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_LARGEINT: { - assert_cast*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_FLOAT: { - assert_cast*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_DOUBLE: { - assert_cast*>(column_ptr)->insert_data(value_ptr, 0); - break; - } - case TYPE_CHAR: { - assert_cast(column_ptr)->insert_data(value_ptr, wbytes); - break; - } - case TYPE_VARCHAR: - case TYPE_STRING: { - assert_cast(column_ptr)->insert_data(value_ptr, wbytes); - break; - } - case TYPE_OBJECT: { - Slice slice(value_ptr, wbytes); - // insert_default() - auto* target_column = assert_cast(column_ptr); - - target_column->insert_default(); - BitmapValue* pvalue = nullptr; - int pos = target_column->size() - 1; - pvalue = &target_column->get_element(pos); - - if (slice.size != 0) { - BitmapValue value; - value.deserialize(slice.data); - *pvalue = std::move(value); - } else { - *pvalue = std::move(*reinterpret_cast(slice.data)); + case TYPE_VARCHAR:{ + assert_cast(column_ptr)->insert_data(value_ptr, wbytes); + break; } - break; - } - case TYPE_HLL: { - Slice slice(value_ptr, wbytes); - auto* target_column = assert_cast(column_ptr); - - target_column->insert_default(); - HyperLogLog* pvalue = nullptr; - int pos = target_column->size() - 1; - pvalue = &target_column->get_element(pos); - if (slice.size != 0) { - HyperLogLog value; - value.deserialize(slice); - *pvalue = std::move(value); - } else { - *pvalue = std::move(*reinterpret_cast(slice.data)); + default: { + DCHECK(false) << "bad slot type: " << slot_desc->type(); + break; } - break; - } - case TYPE_DECIMALV2: { - assert_cast*>(column_ptr) - ->insert_data(value_ptr, 0); - break; - } - case TYPE_DATETIME: { - Slice slice(value_ptr, wbytes); - DateTimeValue value = *reinterpret_cast(slice.data); - VecDateTimeValue date; - date.convert_dt_to_vec_dt(&value); - assert_cast*>(column_ptr) - ->insert_data(reinterpret_cast(&date), 0); - break; - } - case TYPE_DATE: { - Slice slice(value_ptr, wbytes); - DateTimeValue value = *reinterpret_cast(slice.data); - VecDateTimeValue date; - date.convert_dt_to_vec_dt(&value); - assert_cast*>(column_ptr) - ->insert_data(reinterpret_cast(&date), 0); - break; - } - default: { - DCHECK(false) << "bad slot type: " << slot_desc->type(); - break; - } } return Status::OK(); } @@ -607,24 +456,18 @@ Status VJsonReader::_vhandle_flat_array_complex_json(std::vector= _total_lines) { - size_t size = 0; - Status st = JsonReader::_parse_json_doc(&size, eof); + Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { continue; // continue to read next } - RETURN_IF_ERROR(st); // terminate if encounter other errors - if (size == 0 || *eof) { // read all data, then return - *is_empty_row = true; - return Status::OK(); - } - _json_doc = JsonReader::get_json_doc(); - _total_lines = _json_doc->Size(); - _next_line = 0; - - if (_total_lines == 0) { - // meet an empty json array. - *is_empty_row = true; - continue; + RETURN_IF_ERROR(st); + if (*is_empty_row == true ) { + if (st == Status::OK()) { + return Status::OK(); + } + if (_total_lines == 0) { + continue; + } } } rapidjson::Value& objectValue = (*_json_doc)[_next_line++]; @@ -643,21 +486,18 @@ Status VJsonReader::_vhandle_nested_complex_json(std::vector& const std::vector& slot_descs, bool* is_empty_row, bool* eof) { while (true) { - size_t size = 0; - Status st = JsonReader::_parse_json_doc(&size, eof); + Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { continue; // continue to read next } RETURN_IF_ERROR(st); - if (size == 0 || *eof) { - *is_empty_row = true; - return Status::OK(); // read over,then return + if (*is_empty_row == true && st == Status::OK()) { + return Status::OK(); } *is_empty_row = false; break; // read a valid row } bool valid = true; - _json_doc = JsonReader::get_json_doc(); RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs, columns, &valid)); if (!valid) { // there is only one line in this case, so if it return false, just set is_empty_row true @@ -667,6 +507,30 @@ Status VJsonReader::_vhandle_nested_complex_json(std::vector& return Status::OK(); } +Status VJsonReader::_parse_json(bool* is_empty_row, bool* eof) { + size_t size = 0; + Status st = JsonReader::_parse_json_doc(&size, eof); + // terminate if encounter other errors + RETURN_IF_ERROR(st); + + // read all data, then return + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + + if (!_parsed_jsonpaths.empty() && _strip_outer_array) { + _total_lines = _json_doc->Size(); + _next_line = 0; + + if (_total_lines == 0) { + // meet an empty json array. + *is_empty_row = true; + } + } + return st; +} + Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, const std::vector& slot_descs, std::vector& columns, @@ -677,12 +541,11 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, for (size_t i = 0; i < column_num; i++) { int dest_index = ctx_idx++; auto* column_ptr = columns[dest_index].get(); - _origin_json_doc_ptr = JsonReader::get_origin_json_doc(); rapidjson::Value* json_values = nullptr; bool wrap_explicitly = false; if (LIKELY(i < _parsed_jsonpaths.size())) { json_values = JsonFunctions::get_json_array_from_parsed_json( - _parsed_jsonpaths[i], &objectValue, _origin_json_doc_ptr->GetAllocator(), &wrap_explicitly); + _parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator(), &wrap_explicitly); } if (json_values == nullptr) { @@ -692,14 +555,10 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, nullable_column->insert_data(nullptr, 0); nullcount++; } else { - RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return _print_json_value(objectValue); }, - [&]() -> std::string { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", slot_descs[i]->col_name()); - return fmt::to_string(error_msg); - }, _scanner_eof)); - _counter->num_rows_filtered++; - *valid = false; // current row is invalid + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", slot_descs[i]->col_name()); + std::string err_msg = fmt::to_string(error_msg); + RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); break; } } else { @@ -710,19 +569,32 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, // if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap. json_values = &((*json_values)[0]); } - // RETURN_IF_ERROR(_write_data_to_tuple(json_values, slot_descs[i], tuple, tuple_pool, valid)); RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], column_ptr, valid)); if (!(*valid)) { break; } } } + if (nullcount == column_num) { - RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return _print_json_value(objectValue); }, - [&]() -> std::string { return "All fields is null or not matched, this is a invalid row."; }, _scanner_eof)); - _counter->num_rows_filtered++; + std::string err_msg("All fields is null or not matched, this is a invalid row."); + RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); + } + return Status::OK(); +} + +Status VJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string& error_msg, bool* valid) { + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, + [&]() -> std::string { return error_msg; }, + _scanner_eof)); + + _counter->num_rows_filtered++; + if (valid != nullptr) { + // current row is invalid *valid = false; } + return Status::OK(); } diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h index 7fbf4497717066..85345a13e61b6d 100644 --- a/be/src/vec/exec/vjson_scanner.h +++ b/be/src/vec/exec/vjson_scanner.h @@ -70,21 +70,7 @@ class VJsonScanner : public JsonScanner { Status open_next_reader(); private: - const std::vector& _ranges; - const std::vector& _broker_addresses; - - // Reader - FileReader* _cur_file_reader; - LineReader* _cur_line_reader; - VJsonReader* _cur_vjson_reader; - - int _next_range; - bool _cur_reader_eof; - bool _read_json_by_line; - - // When we fetch range doesn't start from 0, - // we will read to one ahead, and skip the first line - bool _skip_next_line; + std::unique_ptr _cur_vjson_reader; }; class VJsonReader : public JsonReader { @@ -131,26 +117,10 @@ class VJsonReader : public JsonReader { Status _insert_to_column(vectorized::IColumn* column_ptr, SlotDescriptor* slot_desc, const char* value_ptr, int32_t& wbytes); + + Status _append_error_msg(const rapidjson::Value& objectValue, std::string& error_msg, bool* valid); -private: - int _next_line; - int _total_lines; - RuntimeState* _state; - ScannerCounter* _counter; - RuntimeProfile* _profile; - - bool _strip_outer_array; - bool _fuzzy_parse; - RuntimeProfile::Counter* _read_timer; - - std::vector> _parsed_jsonpaths; - std::vector _parsed_json_root; - - rapidjson::Document* _origin_json_doc_ptr; // origin json document object from parsed json string - rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root` - std::unordered_map _name_map; - // point to the _scanner_eof of JsonScanner - bool* _scanner_eof; + Status _parse_json(bool* is_empty_row, bool* eof); }; } // vectorized From 5742a01269a95e59ac7f5e76ef6da91c1e0fa379 Mon Sep 17 00:00:00 2001 From: hucheng01 Date: Sat, 7 May 2022 11:11:19 +0800 Subject: [PATCH 3/9] add vectorized vjson_scanner and apply vexpr --- be/src/exec/base_scanner.cpp | 12 +- be/src/exec/base_scanner.h | 8 +- be/src/exec/broker_scan_node.cpp | 12 +- be/src/vec/exec/vbroker_scan_node.cpp | 4 +- be/src/vec/exec/vbroker_scan_node.h | 3 +- be/src/vec/exec/vjson_scanner.cpp | 268 ++++++----- be/src/vec/exec/vjson_scanner.h | 66 ++- be/test/CMakeLists.txt | 1 + be/test/vec/exec/vjson_scanner_test.cpp | 596 ++++++++++++++++++++++++ 9 files changed, 781 insertions(+), 189 deletions(-) create mode 100644 be/test/vec/exec/vjson_scanner_test.cpp diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index bd8d5c9f992f54..efc5cbd1439800 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -279,25 +279,25 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { } Status BaseScanner::filter_block_and_execute_exprs(vectorized::Block* output_block, - vectorized::Block* temp_block, size_t slot_num) { + vectorized::Block* temp_block, size_t slot_num) { Status status; // filter src tuple by preceding filter first if (!_vpre_filter_ctxs.empty()) { for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { auto old_rows = output_block->rows(); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, - output_block, slot_num)); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, output_block, + slot_num)); _counter->num_rows_unselected += old_rows - output_block->rows(); } } // Do vectorized expr here to speed up load - *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(_dest_vexpr_ctx, - *temp_block, status); + *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( + _dest_vexpr_ctx, *temp_block, status); if (UNLIKELY(output_block->rows() == 0)) { return status; } - + return Status::OK(); } diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 994230714ed719..8a088f68d69878 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -55,7 +55,7 @@ class BaseScanner { const std::vector& pre_filter_texprs, ScannerCounter* counter); virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); - if (_state->enable_vectorized_exec()) { + if (_state != nullptr && _state->enable_vectorized_exec()) { vectorized::VExpr::close(_dest_vexpr_ctx, _state); } }; @@ -71,7 +71,7 @@ class BaseScanner { virtual Status get_next(vectorized::Block* block, bool* eof) { return Status::NotSupported("Not Implemented get block"); } - + virtual Status get_next(vectorized::Block& output_block, bool* eof) { return Status::NotSupported("Not Implemented get block"); } @@ -86,8 +86,8 @@ class BaseScanner { void free_expr_local_allocations(); Status filter_block_and_execute_exprs(vectorized::Block* output_block, - vectorized::Block* temp_block, size_t slot_num); - + vectorized::Block* temp_block, size_t slot_num); + protected: RuntimeState* _state; const TBrokerScanRangeParams& _params; diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 504e75d916aa39..4697e3af7861c8 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -236,9 +236,15 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan counter); break; case TFileFormatType::FORMAT_JSON: - scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, scan_range.broker_addresses, _pre_filter_texprs, - counter); + if (_vectorized) { + scan = new vectorized::VJsonScanner( + _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, + scan_range.broker_addresses, _pre_filter_texprs, counter); + } else { + scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, scan_range.broker_addresses, + _pre_filter_texprs, counter); + } break; default: if (_vectorized) { diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index 1299484bce2893..da82dccba23fa7 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -152,8 +152,8 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner if (_scan_finished.load() || !_process_status.ok()) { return Status::OK(); } - - // get block + + // get block RETURN_IF_ERROR(scanner->get_next(*(block.get()), &scanner_eof)); std::shared_ptr block(new vectorized::Block()); diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h index 4ccebed5fbf836..1606e0d83b951d 100644 --- a/be/src/vec/exec/vbroker_scan_node.h +++ b/be/src/vec/exec/vbroker_scan_node.h @@ -31,7 +31,8 @@ namespace vectorized { class VBrokerScanNode final : public BrokerScanNode { public: VBrokerScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - ~VBrokerScanNode() override = default; + + ~VBrokerScanNode() { close(_runtime_state); } // Fill the next row batch by calling next() on the scanner, virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index 64ad60701ddf4d..f2d72a4d81073b 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -36,13 +36,12 @@ namespace doris::vectorized { VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter) + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter) : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), - _cur_vjson_reader(nullptr) { -} + _cur_vjson_reader(nullptr) {} VJsonScanner::~VJsonScanner() { close(); @@ -53,7 +52,7 @@ Status VJsonScanner::open() { return Status::OK(); } -void VJsonScanner::close() { +void VJsonScanner::close() { BaseScanner::close(); } @@ -77,7 +76,7 @@ Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { break; } } - + if (_read_json_by_line && _skip_next_line) { size_t size = 0; const uint8_t* line_ptr = nullptr; @@ -87,8 +86,8 @@ Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { } bool is_empty_row = false; - RETURN_IF_ERROR(_cur_vjson_reader->read_json_column(columns, _src_slot_descs, - &is_empty_row, &_cur_reader_eof)); + RETURN_IF_ERROR(_cur_vjson_reader->read_json_column(columns, _src_slot_descs, &is_empty_row, + &_cur_reader_eof)); if (is_empty_row) { // Read empty row, just continue continue; @@ -102,24 +101,26 @@ Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { auto n_columns = 0; for (const auto slot_desc : _src_slot_descs) { temp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); } - RETURN_IF_ERROR(filter_block_and_execute_exprs(&output_block, temp_block.get(), slot_num)); + RETURN_IF_ERROR( + filter_block_and_execute_exprs(&output_block, temp_block.get(), slot_num)); } else { auto n_columns = 0; for (const auto slot_desc : _src_slot_descs) { output_block.insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); } - + // filter src tuple by preceding filter first if (!_vpre_filter_ctxs.empty()) { auto old_rows = output_block.rows(); for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { - RETURN_IF_ERROR(VExprContext::filter_block(_vpre_filter_ctx, &output_block, slot_num)); + RETURN_IF_ERROR( + VExprContext::filter_block(_vpre_filter_ctx, &output_block, slot_num)); } _counter->num_rows_unselected += old_rows - output_block.rows(); } @@ -139,7 +140,7 @@ Status VJsonScanner::open_next_reader() { _scanner_eof = true; return Status::OK(); } - + // init file reader RETURN_IF_ERROR(JsonScanner::open_file_reader()); @@ -181,13 +182,15 @@ Status VJsonScanner::open_vjson_reader() { if (range.__isset.fuzzy_parse) { fuzzy_parse = range.fuzzy_parse; } - + if (_read_json_by_line) { - _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, - fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader)); + _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, + num_as_string, fuzzy_parse, &_scanner_eof, nullptr, + _cur_line_reader)); } else { - _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, - fuzzy_parse, &_scanner_eof, _cur_file_reader)); + _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, + num_as_string, fuzzy_parse, &_scanner_eof, + _cur_file_reader)); } RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root)); @@ -195,16 +198,13 @@ Status VJsonScanner::open_vjson_reader() { } VJsonReader::VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, - bool strip_outer_array, bool num_as_string,bool fuzzy_parse, - bool* scanner_eof, FileReader* file_reader, LineReader* line_reader) - : JsonReader(state, counter, profile, strip_outer_array, num_as_string, fuzzy_parse, - scanner_eof, file_reader, line_reader), - _vhandle_json_callback(nullptr) { -} - -VJsonReader::~VJsonReader() { + bool strip_outer_array, bool num_as_string, bool fuzzy_parse, + bool* scanner_eof, FileReader* file_reader, LineReader* line_reader) + : JsonReader(state, counter, profile, strip_outer_array, num_as_string, fuzzy_parse, + scanner_eof, file_reader, line_reader), + _vhandle_json_callback(nullptr) {} -} +VJsonReader::~VJsonReader() {} Status VJsonReader::init(const std::string& jsonpath, const std::string& json_root) { // parse jsonpath @@ -226,19 +226,19 @@ Status VJsonReader::init(const std::string& jsonpath, const std::string& json_ro _vhandle_json_callback = &VJsonReader::_vhandle_nested_complex_json; } } - + return Status::OK(); } -Status VJsonReader::read_json_column(std::vector& columns, - const std::vector& slot_descs, - bool* is_empty_row, bool* eof) { +Status VJsonReader::read_json_column(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row, eof); } -Status VJsonReader::_vhandle_simple_json(std::vector& columns, - const std::vector& slot_descs, - bool* is_empty_row, bool* eof) { +Status VJsonReader::_vhandle_simple_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { do { bool valid = false; if (_next_line >= _total_lines) { // parse json and generic document @@ -249,15 +249,14 @@ Status VJsonReader::_vhandle_simple_json(std::vector& columns, RETURN_IF_ERROR(st); if (*is_empty_row == true && st == Status::OK()) { return Status::OK(); - } + } _name_map.clear(); rapidjson::Value* objectValue = nullptr; if (_json_doc->IsArray()) { _total_lines = _json_doc->Size(); if (_total_lines == 0) { // may be passing an empty json, such as "[]" - std::string err_msg("Empty json line"); - RETURN_IF_ERROR(_append_error_msg(*_json_doc, err_msg, nullptr)); + RETURN_IF_ERROR(_append_error_msg(*_json_doc, "Empty json line", "", nullptr)); if (*_scanner_eof) { *is_empty_row = true; return Status::OK(); @@ -283,7 +282,7 @@ Status VJsonReader::_vhandle_simple_json(std::vector& columns, } } - if (_json_doc->IsArray()) { // handle case 1 + if (_json_doc->IsArray()) { // handle case 1 rapidjson::Value& objectValue = (*_json_doc)[_next_line]; // json object RETURN_IF_ERROR(_set_column_value(objectValue, columns, slot_descs, &valid)); } else { // handle case 2 @@ -309,13 +308,13 @@ Status VJsonReader::_vhandle_simple_json(std::vector& columns, // set valid to true and return OK if succeed. // set valid to false and return OK if we met an invalid row. // return other status if encounter other problmes. -Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, std::vector& columns, - const std::vector& slot_descs, bool* valid) { +Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, + std::vector& columns, + const std::vector& slot_descs, bool* valid) { if (!objectValue.IsObject()) { // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, // not other type of Json format. - std::string err_msg("Expect json object value"); - RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); + RETURN_IF_ERROR(_append_error_msg(objectValue, "Expect json object value", "", valid)); return Status::OK(); } @@ -348,33 +347,33 @@ Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, std::vector nullable_column->insert_data(nullptr, 0); nullcount++; } else { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", slot_desc->col_name()); - std::string err_msg = fmt::to_string(error_msg); - RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); + RETURN_IF_ERROR(_append_error_msg( + objectValue, + "The column `{}` is not nullable, but it's not found in jsondata.", + slot_desc->col_name(), valid)); break; } } } if (nullcount == slot_descs.size()) { - std::string err_msg("All fields is null, this is a invalid row."); - RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); + RETURN_IF_ERROR(_append_error_msg(objectValue, "All fields is null, this is a invalid row.", + "", valid)); return Status::OK(); } *valid = true; return Status::OK(); } -Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value, SlotDescriptor* slot_desc, - vectorized::IColumn* column_ptr, bool* valid) { +Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value, + SlotDescriptor* slot_desc, + vectorized::IColumn* column_ptr, bool* valid) { const char* str_value = nullptr; - uint8_t tmp_buf[128] = {0}; + char tmp_buf[128] = {0}; int32_t wbytes = 0; - + if (slot_desc->is_nullable()) { - auto* nullable_column = - reinterpret_cast(column_ptr); + auto* nullable_column = reinterpret_cast(column_ptr); nullable_column->get_null_map_data().push_back(0); column_ptr = &nullable_column->get_nested_column(); } @@ -385,40 +384,35 @@ Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator v wbytes = strlen(str_value); break; case rapidjson::Type::kNumberType: - if (value->IsUint()) { - wbytes = sprintf((char *)tmp_buf, "%u", value->GetUint()); - str_value = (char *)tmp_buf; + if (value->IsUint()) { + wbytes = sprintf(tmp_buf, "%u", value->GetUint()); } else if (value->IsInt()) { - wbytes = sprintf((char *)tmp_buf, "%d", value->GetInt()); - str_value = (char *)tmp_buf; + wbytes = sprintf(tmp_buf, "%d", value->GetInt()); } else if (value->IsUint64()) { - wbytes = sprintf((char *)tmp_buf, "%lu", value->GetUint64()); - str_value = (char *)tmp_buf; + wbytes = sprintf(tmp_buf, "%lu", value->GetUint64()); } else if (value->IsInt64()) { - wbytes = sprintf((char *)tmp_buf, "%ld", value->GetInt64()); - str_value = (char *)tmp_buf; + wbytes = sprintf(tmp_buf, "%ld", value->GetInt64()); } else { - wbytes = sprintf((char *)tmp_buf, "%f", value->GetDouble()); - str_value = (char *)tmp_buf; + wbytes = sprintf(tmp_buf, "%f", value->GetDouble()); } + str_value = tmp_buf; break; case rapidjson::Type::kFalseType: wbytes = 1; - str_value = (char *)"0"; + str_value = (char*)"0"; break; case rapidjson::Type::kTrueType: wbytes = 1; - str_value = (char *)"1"; + str_value = (char*)"1"; break; case rapidjson::Type::kNullType: if (slot_desc->is_nullable()) { auto* nullable_column = reinterpret_cast(column_ptr); nullable_column->insert_data(nullptr, 0); } else { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "Json value is null, but the column `{}` is not nullable.", slot_desc->col_name()); - std::string err_msg = fmt::to_string(error_msg); - RETURN_IF_ERROR(_append_error_msg(*value, err_msg, valid)); + RETURN_IF_ERROR(_append_error_msg( + *value, "Json value is null, but the column `{}` is not nullable.", + slot_desc->col_name(), valid)); return Status::OK(); } break; @@ -429,31 +423,19 @@ Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator v str_value = json_str.c_str(); break; } - - RETURN_IF_ERROR(_insert_to_column(column_ptr, slot_desc, str_value, wbytes)); - *valid = true; - return Status::OK(); -} + // TODO: if the vexpr can support another 'slot_desc type' than 'TYPE_VARCHAR', + // we need use a function to support these types to insert data in columns. + DCHECK(slot_desc->type().type == TYPE_VARCHAR); + assert_cast(column_ptr)->insert_data(str_value, wbytes); -Status VJsonReader::_insert_to_column(vectorized::IColumn* column_ptr, SlotDescriptor* slot_desc, - const char* value_ptr, int32_t& wbytes) { - switch (slot_desc->type().type) { - case TYPE_VARCHAR:{ - assert_cast(column_ptr)->insert_data(value_ptr, wbytes); - break; - } - default: { - DCHECK(false) << "bad slot type: " << slot_desc->type(); - break; - } - } + *valid = true; return Status::OK(); } Status VJsonReader::_vhandle_flat_array_complex_json(std::vector& columns, - const std::vector& slot_descs, - bool* is_empty_row, bool* eof) { + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { do { if (_next_line >= _total_lines) { Status st = _parse_json(is_empty_row, eof); @@ -461,7 +443,7 @@ Status VJsonReader::_vhandle_flat_array_complex_json(std::vector& columns, - const std::vector& slot_descs, - bool* is_empty_row, bool* eof) { + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { while (true) { Status st = _parse_json(is_empty_row, eof); if (st.is_data_quality_error()) { @@ -507,34 +489,10 @@ Status VJsonReader::_vhandle_nested_complex_json(std::vector& return Status::OK(); } -Status VJsonReader::_parse_json(bool* is_empty_row, bool* eof) { - size_t size = 0; - Status st = JsonReader::_parse_json_doc(&size, eof); - // terminate if encounter other errors - RETURN_IF_ERROR(st); - - // read all data, then return - if (size == 0 || *eof) { - *is_empty_row = true; - return Status::OK(); - } - - if (!_parsed_jsonpaths.empty() && _strip_outer_array) { - _total_lines = _json_doc->Size(); - _next_line = 0; - - if (_total_lines == 0) { - // meet an empty json array. - *is_empty_row = true; - } - } - return st; -} - Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, - const std::vector& slot_descs, - std::vector& columns, - bool* valid) { + const std::vector& slot_descs, + std::vector& columns, + bool* valid) { int nullcount = 0; int ctx_idx = 0; size_t column_num = slot_descs.size(); @@ -545,7 +503,8 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, bool wrap_explicitly = false; if (LIKELY(i < _parsed_jsonpaths.size())) { json_values = JsonFunctions::get_json_array_from_parsed_json( - _parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator(), &wrap_explicitly); + _parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator(), + &wrap_explicitly); } if (json_values == nullptr) { @@ -555,10 +514,10 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, nullable_column->insert_data(nullptr, 0); nullcount++; } else { - fmt::memory_buffer error_msg; - fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", slot_descs[i]->col_name()); - std::string err_msg = fmt::to_string(error_msg); - RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); + RETURN_IF_ERROR(_append_error_msg( + objectValue, + "The column `{}` is not nullable, but it's not found in jsondata.", + slot_descs[i]->col_name(), valid)); break; } } else { @@ -577,24 +536,57 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, } if (nullcount == column_num) { - std::string err_msg("All fields is null or not matched, this is a invalid row."); - RETURN_IF_ERROR(_append_error_msg(objectValue, err_msg, valid)); + RETURN_IF_ERROR(_append_error_msg( + objectValue, "All fields is null or not matched, this is a invalid row.", "", + valid)); } return Status::OK(); } -Status VJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string& error_msg, bool* valid) { +Status VJsonReader::_parse_json(bool* is_empty_row, bool* eof) { + size_t size = 0; + Status st = JsonReader::_parse_json_doc(&size, eof); + // terminate if encounter other errors + RETURN_IF_ERROR(st); + + // read all data, then return + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + + if (!_parsed_jsonpaths.empty() && _strip_outer_array) { + _total_lines = _json_doc->Size(); + _next_line = 0; + + if (_total_lines == 0) { + // meet an empty json array. + *is_empty_row = true; + } + } + return st; +} + +Status VJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string error_msg, + std::string col_name, bool* valid) { + std::string err_msg; + if (!col_name.empty()) { + fmt::memory_buffer error_buf; + fmt::format_to(error_buf, error_msg, col_name); + err_msg = fmt::to_string(error_buf); + } else { + err_msg = error_msg; + } + RETURN_IF_ERROR(_state->append_error_msg_to_file( - [&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, - [&]() -> std::string { return error_msg; }, - _scanner_eof)); - + [&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, + [&]() -> std::string { return err_msg; }, _scanner_eof)); + _counter->num_rows_filtered++; if (valid != nullptr) { // current row is invalid *valid = false; } - return Status::OK(); } diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h index 85345a13e61b6d..6d4f2191e953f9 100644 --- a/be/src/vec/exec/vjson_scanner.h +++ b/be/src/vec/exec/vjson_scanner.h @@ -51,18 +51,16 @@ class VJsonReader; class VJsonScanner : public JsonScanner { public: VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, - ScannerCounter* counter); - + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + ~VJsonScanner(); - + Status open() override; void close() override; - // Status get_next(std::vector& columns, bool* eof) override; Status get_next(vectorized::Block& output_block, bool* eof) override; private: @@ -76,53 +74,51 @@ class VJsonScanner : public JsonScanner { class VJsonReader : public JsonReader { public: VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, - bool strip_outer_array, bool num_as_string,bool fuzzy_parse, - bool* scanner_eof, FileReader* file_reader = nullptr, LineReader* line_reader = nullptr); + bool strip_outer_array, bool num_as_string, bool fuzzy_parse, bool* scanner_eof, + FileReader* file_reader = nullptr, LineReader* line_reader = nullptr); ~VJsonReader(); Status init(const std::string& jsonpath, const std::string& json_root); - Status read_json_column(std::vector& columns, - const std::vector& slot_descs, - bool* is_empty_row, bool* eof); + Status read_json_column(std::vector& columns, + const std::vector& slot_descs, bool* is_empty_row, + bool* eof); private: - Status (VJsonReader::*_vhandle_json_callback)(std::vector& columns, - const std::vector& slot_descs, - bool* is_empty_row, bool* eof); + Status (VJsonReader::*_vhandle_json_callback)( + std::vector& columns, + const std::vector& slot_descs, bool* is_empty_row, bool* eof); - Status _vhandle_simple_json(std::vector& columns, - const std::vector& slot_descs, - bool* is_empty_row, bool* eof); + Status _vhandle_simple_json(std::vector& columns, + const std::vector& slot_descs, bool* is_empty_row, + bool* eof); Status _vhandle_flat_array_complex_json(std::vector& columns, - const std::vector& slot_descs, - bool* is_empty_row, bool* eof); + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); Status _vhandle_nested_complex_json(std::vector& columns, const std::vector& slot_descs, bool* is_empty_row, bool* eof); - + Status _write_columns_by_jsonpath(rapidjson::Value& objectValue, - const std::vector& slot_descs, - std::vector& columns, - bool* valid); - + const std::vector& slot_descs, + std::vector& columns, bool* valid); + Status _set_column_value(rapidjson::Value& objectValue, std::vector& columns, - const std::vector& slot_descs, bool* valid); - - Status _write_data_to_column(rapidjson::Value::ConstValueIterator value, SlotDescriptor* slot_desc, - vectorized::IColumn* column_ptr, bool* valid); - - Status _insert_to_column(vectorized::IColumn* column_ptr, SlotDescriptor* slot_desc, - const char* value_ptr, int32_t& wbytes); - - Status _append_error_msg(const rapidjson::Value& objectValue, std::string& error_msg, bool* valid); + const std::vector& slot_descs, bool* valid); + + Status _write_data_to_column(rapidjson::Value::ConstValueIterator value, + SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr, + bool* valid); Status _parse_json(bool* is_empty_row, bool* eof); + + Status _append_error_msg(const rapidjson::Value& objectValue, std::string error_msg, + std::string col_name, bool* valid); }; -} // vectorized +} // namespace vectorized } // namespace doris #endif diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 5abf9cceb22386..6b7981306044ed 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -338,6 +338,7 @@ set(VEC_TEST_FILES vec/exec/vgeneric_iterators_test.cpp vec/exec/vbroker_scan_node_test.cpp vec/exec/vbroker_scanner_test.cpp + vec/exec/vjson_scanner_test.cpp vec/exec/vtablet_sink_test.cpp vec/exprs/vexpr_test.cpp vec/function/function_array_element_test.cpp diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp new file mode 100644 index 00000000000000..50451a341c62ee --- /dev/null +++ b/be/test/vec/exec/vjson_scanner_test.cpp @@ -0,0 +1,596 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include +#include +#include + +#include "common/object_pool.h" +#include "exec/broker_scan_node.h" +#include "exec/local_file_reader.h" +#include "vec/exec/vbroker_scan_node.h" +#include "vec/exec/vjson_scanner.h" +#include "exprs/cast_functions.h" +#include "exprs/decimalv2_operators.h" +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" +#include "runtime/tuple.h" +#include "runtime/user_function_cache.h" + +namespace doris { +namespace vectorized { + +class VJsonScannerTest : public testing::Test { +public: + VJsonScannerTest() : _runtime_state(TQueryGlobals()) { + init(); + _runtime_state._instance_mem_tracker.reset(new MemTracker()); + _runtime_state._exec_env = ExecEnv::GetInstance(); + } + void init(); + static void SetUpTestCase() { + UserFunctionCache::instance()->init( + "./be/test/runtime/test_data/user_function_cache/normal"); + CastFunctions::init(); + DecimalV2Operators::init(); + } + +protected: + virtual void SetUp() {} + virtual void TearDown() {} + +private: + int create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + void create_expr_info(); + void init_desc_table(); + RuntimeState _runtime_state; + ObjectPool _obj_pool; + std::map _slots_map; + TBrokerScanRangeParams _params; + DescriptorTbl* _desc_tbl; + TPlanNode _tnode; +}; + +#define TUPLE_ID_DST 0 +#define TUPLE_ID_SRC 1 +#define COLUMN_NUMBERS 6 +#define DST_TUPLE_SLOT_ID_START 1 +#define SRC_TUPLE_SLOT_ID_START 7 + +int VJsonScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + const char* columnNames[] = {"category", "author", "title", "price", "largeint", "decimal"}; + for (int i = 0; i < COLUMN_NUMBERS; i++) { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 1; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = i; + slot_desc.byteOffset = i * 16 + 8; + slot_desc.nullIndicatorByte = i / 8; + slot_desc.nullIndicatorBit = i % 8; + slot_desc.colName = columnNames[i]; + slot_desc.slotIdx = i + 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + + { + // TTupleDescriptor source + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_SRC; + t_tuple_desc.byteSize = COLUMN_NUMBERS * 16 + 8; + t_tuple_desc.numNullBytes = 0; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +int VJsonScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + int32_t byteOffset = 8; + { //category + TSlotDescriptor slot_desc; + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 0; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 0; + slot_desc.colName = "category"; + slot_desc.slotIdx = 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + { // author + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 1; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 1; + slot_desc.colName = "author"; + slot_desc.slotIdx = 2; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + { // title + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 2; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 2; + slot_desc.colName = "title"; + slot_desc.slotIdx = 3; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + { // price + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::DOUBLE); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 3; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 3; + slot_desc.colName = "price"; + slot_desc.slotIdx = 4; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 8; + { // lagreint + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::LARGEINT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 4; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 4; + slot_desc.colName = "lagreint"; + slot_desc.slotIdx = 5; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + { // decimal + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__isset.precision = true; + scalar_type.__isset.scale = true; + scalar_type.__set_precision(-1); + scalar_type.__set_scale(-1); + scalar_type.__set_type(TPrimitiveType::DECIMALV2); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 5; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 5; + slot_desc.colName = "decimal"; + slot_desc.slotIdx = 6; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + + t_desc_table.__isset.slotDescriptors = true; + { + // TTupleDescriptor dest + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_DST; + t_tuple_desc.byteSize = byteOffset + 8; + t_tuple_desc.numNullBytes = 0; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +void VJsonScannerTest::init_desc_table() { + TDescriptorTable t_desc_table; + + // table descriptors + TTableDescriptor t_table_desc; + + t_table_desc.id = 0; + t_table_desc.tableType = TTableType::BROKER_TABLE; + t_table_desc.numCols = 0; + t_table_desc.numClusteringCols = 0; + t_desc_table.tableDescriptors.push_back(t_table_desc); + t_desc_table.__isset.tableDescriptors = true; + + int next_slot_id = 1; + + next_slot_id = create_dst_tuple(t_desc_table, next_slot_id); + + next_slot_id = create_src_tuple(t_desc_table, next_slot_id); + + DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl); + + _runtime_state.set_desc_tbl(_desc_tbl); +} + +void VJsonScannerTest::create_expr_info() { + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(5000); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + // category VARCHAR --> VARCHAR + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START; // category id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START); + } + // author VARCHAR --> VARCHAR + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 1; // author id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 1, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 1); + } + // title VARCHAR --> VARCHAR + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 2; // log_time id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 2, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 2); + } + + // price VARCHAR --> DOUBLE + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodouble"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttodouble(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_double_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 3; // price id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 3, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 3); + } + // largeint VARCHAR --> LargeInt + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::LARGEINT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttolargeint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttolargeint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_large_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 4; // price id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 4, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 4); + } + // decimal VARCHAR --> Decimal + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__isset.precision = true; + scalar_type.__isset.scale = true; + scalar_type.__set_precision(-1); + scalar_type.__set_scale(-1); + scalar_type.__set_type(TPrimitiveType::DECIMALV2); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodecimalv2"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttodecimalv2(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 5; // price id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 5, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 5); + } + // _params.__isset.expr_of_dest_slot = true; + _params.__set_dest_tuple_id(TUPLE_ID_DST); + _params.__set_src_tuple_id(TUPLE_ID_SRC); +} + +void VJsonScannerTest::init() { + create_expr_info(); + init_desc_table(); + + // Node Id + _tnode.node_id = 0; + _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE; + _tnode.num_children = 0; + _tnode.limit = -1; + _tnode.row_tuples.push_back(0); + _tnode.nullable_tuples.push_back(false); + _tnode.broker_scan_node.tuple_id = 0; + _tnode.__isset.broker_scan_node = true; +} + +TEST_F(VJsonScannerTest, simple_array_json) { + VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + scan_node.init(_tnode); + auto status = scan_node.prepare(&_runtime_state); + EXPECT_TRUE(status.ok()); + + // set scan range + std::vector scan_ranges; + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_JSON; + range.strip_outer_array = true; + range.__isset.strip_outer_array = true; + range.splittable = true; + range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; + range.file_type = TFileType::FILE_LOCAL; + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + + scan_node.set_scan_ranges(scan_ranges); + status = scan_node.open(&_runtime_state); + EXPECT_TRUE(status.ok()); + + bool eof = false; + vectorized::Block block; + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(2, block.rows()); + EXPECT_EQ(6, block.columns()); + + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 6); + ASSERT_EQ(columns[0].to_string(0), "reference"); + ASSERT_EQ(columns[0].to_string(1), "fiction"); + ASSERT_EQ(columns[1].to_string(0), "NigelRees"); + ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh"); + ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury"); + ASSERT_EQ(columns[2].to_string(1), "SwordofHonour"); + ASSERT_EQ(columns[3].to_string(0), "8.950000"); + ASSERT_EQ(columns[3].to_string(1), "12.990000"); + ASSERT_EQ(columns[4].to_string(0), "1234"); + ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424.000000"); + ASSERT_EQ(columns[5].to_string(0), "1234.123400"); + ASSERT_EQ(columns[5].to_string(1), "10000000000000.001953"); + + block.clear(); + status = scan_node.get_next(&_runtime_state, &block, &eof); + ASSERT_EQ(0, block.rows()); + ASSERT_TRUE(eof); +} + +} // namespace vectorized +} // namespace doris From 0a1badc3b80f55be370408514e28921756ba2c6b Mon Sep 17 00:00:00 2001 From: hucheng01 Date: Sat, 7 May 2022 11:33:58 +0800 Subject: [PATCH 4/9] add vectorized vjson_scanner and apply vexpr --- be/src/vec/exec/vjson_scanner.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index f2d72a4d81073b..d0d59a22991003 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -117,12 +117,12 @@ Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { // filter src tuple by preceding filter first if (!_vpre_filter_ctxs.empty()) { - auto old_rows = output_block.rows(); for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { + auto old_rows = output_block.rows(); RETURN_IF_ERROR( VExprContext::filter_block(_vpre_filter_ctx, &output_block, slot_num)); + _counter->num_rows_unselected += old_rows - output_block.rows(); } - _counter->num_rows_unselected += old_rows - output_block.rows(); } } } From 57ffe01aeb2a2f4f59807600f1e3f26bc5602e16 Mon Sep 17 00:00:00 2001 From: hucheng01 Date: Sat, 7 May 2022 16:04:17 +0800 Subject: [PATCH 5/9] add vectorized vjson_scanner and apply vexpr --- be/src/exec/base_scanner.cpp | 26 ++++++---- be/src/exec/base_scanner.h | 5 +- be/src/exec/json_scanner.cpp | 62 ++++++++++++++-------- be/src/exec/json_scanner.h | 5 ++ be/src/vec/exec/vjson_scanner.cpp | 86 +++++++------------------------ 5 files changed, 82 insertions(+), 102 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index efc5cbd1439800..11bfc6993a4fd5 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -278,24 +278,28 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { return Status::OK(); } -Status BaseScanner::filter_block_and_execute_exprs(vectorized::Block* output_block, - vectorized::Block* temp_block, size_t slot_num) { - Status status; +Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) { // filter src tuple by preceding filter first + auto old_rows = temp_block->rows(); if (!_vpre_filter_ctxs.empty()) { for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { - auto old_rows = output_block->rows(); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, output_block, - slot_num)); - _counter->num_rows_unselected += old_rows - output_block->rows(); + RETURN_IF_ERROR( + vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num)); + _counter->num_rows_unselected += old_rows - temp_block->rows(); } } + return Status::OK(); +} +Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) { // Do vectorized expr here to speed up load - *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( - _dest_vexpr_ctx, *temp_block, status); - if (UNLIKELY(output_block->rows() == 0)) { - return status; + Status status; + if (!_dest_vexpr_ctx.empty()) { + *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( + _dest_vexpr_ctx, *temp_block, status); + if (UNLIKELY(output_block->rows() == 0)) { + return status; + } } return Status::OK(); diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 8a088f68d69878..17888e2c30f4ba 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -85,8 +85,9 @@ class BaseScanner { void free_expr_local_allocations(); - Status filter_block_and_execute_exprs(vectorized::Block* output_block, - vectorized::Block* temp_block, size_t slot_num); + Status filter_block(vectorized::Block* temp_block, size_t slot_num); + + Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block); protected: RuntimeState* _state; diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 5e002ac44e9d61..44745d32edd450 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -112,14 +112,17 @@ Status JsonScanner::open_next_reader() { _scanner_eof = true; return Status::OK(); } + RETURN_IF_ERROR(open_based_reader()); + RETURN_IF_ERROR(open_json_reader()); + _next_range++; + return Status::OK(); +} +Status JsonScanner::open_based_reader() { RETURN_IF_ERROR(open_file_reader()); if (_read_json_by_line) { RETURN_IF_ERROR(open_line_reader()); } - RETURN_IF_ERROR(open_json_reader()); - _next_range++; - return Status::OK(); } @@ -215,6 +218,25 @@ Status JsonScanner::open_json_reader() { bool num_as_string = false; bool fuzzy_parse = false; + RETURN_IF_ERROR( + get_range_params(jsonpath, json_root, strip_outer_array, num_as_string, fuzzy_parse)); + if (_read_json_by_line) { + _cur_json_reader = + new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, + fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader); + } else { + _cur_json_reader = + new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, + fuzzy_parse, &_scanner_eof, _cur_file_reader); + } + + RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root)); + return Status::OK(); +} + +Status JsonScanner::get_range_params(std::string& jsonpath, std::string& json_root, + bool& strip_outer_array, bool& num_as_string, + bool& fuzzy_parse) { const TBrokerRangeDesc& range = _ranges[_next_range]; if (range.__isset.jsonpaths) { @@ -232,17 +254,6 @@ Status JsonScanner::open_json_reader() { if (range.__isset.fuzzy_parse) { fuzzy_parse = range.fuzzy_parse; } - if (_read_json_by_line) { - _cur_json_reader = - new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, - fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader); - } else { - _cur_json_reader = - new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, - fuzzy_parse, &_scanner_eof, _cur_file_reader); - } - - RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root)); return Status::OK(); } @@ -308,14 +319,8 @@ JsonReader::~JsonReader() { } Status JsonReader::init(const std::string& jsonpath, const std::string& json_root) { - // parse jsonpath - if (!jsonpath.empty()) { - Status st = _generate_json_paths(jsonpath, &_parsed_jsonpaths); - RETURN_IF_ERROR(st); - } - if (!json_root.empty()) { - JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); - } + // generate _parsed_jsonpaths and _parsed_json_root + RETURN_IF_ERROR(_parse_jsonpath_and_json_root(jsonpath, json_root)); //improve performance if (_parsed_jsonpaths.empty()) { // input is a simple json-string @@ -330,6 +335,19 @@ Status JsonReader::init(const std::string& jsonpath, const std::string& json_roo return Status::OK(); } +Status JsonReader::_parse_jsonpath_and_json_root(const std::string& jsonpath, + const std::string& json_root) { + // parse jsonpath + if (!jsonpath.empty()) { + Status st = _generate_json_paths(jsonpath, &_parsed_jsonpaths); + RETURN_IF_ERROR(st); + } + if (!json_root.empty()) { + JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); + } + return Status::OK(); +} + Status JsonReader::_generate_json_paths(const std::string& jsonpath, std::vector>* vect) { rapidjson::Document jsonpaths_doc; diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index 9ac628dbdd34fa..276b2dd077d842 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -73,6 +73,10 @@ class JsonScanner : public BaseScanner { Status open_json_reader(); Status open_next_reader(); + Status open_based_reader(); + Status get_range_params(std::string& jsonpath, std::string& json_root, bool& strip_outer_array, + bool& num_as_string, bool& fuzzy_parse); + protected: const std::vector& _ranges; const std::vector& _broker_addresses; @@ -158,6 +162,7 @@ class JsonReader { void _close(); Status _generate_json_paths(const std::string& jsonpath, std::vector>* vect); + Status _parse_jsonpath_and_json_root(const std::string& jsonpath, const std::string& json_root); protected: int _next_line; diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index d0d59a22991003..5c8d4f3100f1e4 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -58,9 +58,10 @@ void VJsonScanner::close() { Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { SCOPED_TIMER(_read_timer); + Status status; const int batch_size = _state->batch_size(); size_t slot_num = _src_slot_descs.size(); - std::shared_ptr temp_block(new vectorized::Block()); + std::unique_ptr temp_block(new vectorized::Block()); std::vector columns(slot_num); auto string_type = make_nullable(std::make_shared()); for (int i = 0; i < slot_num; i++) { @@ -97,41 +98,23 @@ Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { } if (columns[0]->size() > 0) { - if (!_dest_vexpr_ctx.empty()) { - auto n_columns = 0; - for (const auto slot_desc : _src_slot_descs) { - temp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); - } + auto n_columns = 0; + for (const auto slot_desc : _src_slot_descs) { + temp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } - RETURN_IF_ERROR( - filter_block_and_execute_exprs(&output_block, temp_block.get(), slot_num)); - } else { - auto n_columns = 0; - for (const auto slot_desc : _src_slot_descs) { - output_block.insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); - } + RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), slot_num)); - // filter src tuple by preceding filter first - if (!_vpre_filter_ctxs.empty()) { - for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { - auto old_rows = output_block.rows(); - RETURN_IF_ERROR( - VExprContext::filter_block(_vpre_filter_ctx, &output_block, slot_num)); - _counter->num_rows_unselected += old_rows - output_block.rows(); - } - } + if (_dest_vexpr_ctx.empty()) { + output_block = *(temp_block.get()); + } else { + RETURN_IF_ERROR(BaseScanner::execute_exprs(&output_block, temp_block.get())); } } - if (_scanner_eof) { - *eof = true; - } else { - *eof = false; - } + *eof = _scanner_eof; return Status::OK(); } @@ -140,18 +123,9 @@ Status VJsonScanner::open_next_reader() { _scanner_eof = true; return Status::OK(); } - - // init file reader - RETURN_IF_ERROR(JsonScanner::open_file_reader()); - - // init line reader - if (_read_json_by_line) { - RETURN_IF_ERROR(JsonScanner::open_line_reader()); - } - + RETURN_IF_ERROR(JsonScanner::open_based_reader()); RETURN_IF_ERROR(open_vjson_reader()); _next_range++; - return Status::OK(); } @@ -165,24 +139,8 @@ Status VJsonScanner::open_vjson_reader() { bool num_as_string = false; bool fuzzy_parse = false; - const TBrokerRangeDesc& range = _ranges[_next_range]; - - if (range.__isset.jsonpaths) { - jsonpath = range.jsonpaths; - } - if (range.__isset.json_root) { - json_root = range.json_root; - } - if (range.__isset.strip_outer_array) { - strip_outer_array = range.strip_outer_array; - } - if (range.__isset.num_as_string) { - num_as_string = range.num_as_string; - } - if (range.__isset.fuzzy_parse) { - fuzzy_parse = range.fuzzy_parse; - } - + RETURN_IF_ERROR(JsonScanner::get_range_params(jsonpath, json_root, strip_outer_array, + num_as_string, fuzzy_parse)); if (_read_json_by_line) { _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, fuzzy_parse, &_scanner_eof, nullptr, @@ -207,14 +165,8 @@ VJsonReader::VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimePr VJsonReader::~VJsonReader() {} Status VJsonReader::init(const std::string& jsonpath, const std::string& json_root) { - // parse jsonpath - if (!jsonpath.empty()) { - Status st = JsonReader::_generate_json_paths(jsonpath, &_parsed_jsonpaths); - RETURN_IF_ERROR(st); - } - if (!json_root.empty()) { - JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); - } + // generate _parsed_jsonpaths and _parsed_json_root + RETURN_IF_ERROR(JsonReader::_parse_jsonpath_and_json_root(jsonpath, json_root)); //improve performance if (_parsed_jsonpaths.empty()) { // input is a simple json-string From 81fa32d4e189907513a12615452512eff155f26b Mon Sep 17 00:00:00 2001 From: hucheng01 Date: Mon, 9 May 2022 13:40:21 +0800 Subject: [PATCH 6/9] add vectorized vjson_scanner and apply vexpr --- be/src/exec/base_scanner.cpp | 31 ++++- be/src/exec/base_scanner.h | 9 +- be/src/vec/exec/vbroker_scan_node.cpp | 3 - be/src/vec/exec/vjson_scanner.cpp | 42 +----- be/src/vec/exec/vjson_scanner.h | 6 +- be/test/vec/exec/vjson_scanner_test.cpp | 166 ++++++++++++++++++++++++ 6 files changed, 205 insertions(+), 52 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 11bfc6993a4fd5..9da503bc8f824a 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -279,10 +279,10 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { } Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) { - // filter src tuple by preceding filter first - auto old_rows = temp_block->rows(); + // filter block if (!_vpre_filter_ctxs.empty()) { for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { + auto old_rows = temp_block->rows(); RETURN_IF_ERROR( vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num)); _counter->num_rows_unselected += old_rows - temp_block->rows(); @@ -292,7 +292,7 @@ Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) } Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) { - // Do vectorized expr here to speed up load + // Do vectorized expr here Status status; if (!_dest_vexpr_ctx.empty()) { *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( @@ -305,6 +305,31 @@ Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::B return Status::OK(); } +Status BaseScanner::fill_dest_block(vectorized::Block* dest_block, + std::vector& columns) { + if (columns.empty() || columns[0]->size() == 0) { + return Status::OK(); + } + + std::unique_ptr temp_block(new vectorized::Block()); + auto n_columns = 0; + for (const auto slot_desc : _src_slot_descs) { + temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _src_slot_descs.size())); + + if (_dest_vexpr_ctx.empty()) { + *dest_block = *temp_block; + } else { + RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get())); + } + + return Status::OK(); +} + void BaseScanner::fill_slots_of_columns_from_path( int start, const std::vector& columns_from_path) { // values of columns from path can not be null diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 17888e2c30f4ba..0347975e33880e 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -72,14 +72,13 @@ class BaseScanner { return Status::NotSupported("Not Implemented get block"); } - virtual Status get_next(vectorized::Block& output_block, bool* eof) { - return Status::NotSupported("Not Implemented get block"); - } - // Close this scanner virtual void close() = 0; Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple); + Status fill_dest_block(vectorized::Block* dest_block, + std::vector& columns); + void fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path); @@ -121,8 +120,6 @@ class BaseScanner { // and will be converted to `_pre_filter_ctxs` when scanner is open. const std::vector _pre_filter_texprs; std::vector _pre_filter_ctxs; - std::vector _dest_vexpr_ctx; - std::vector _vpre_filter_ctxs; bool _strict_mode; diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index da82dccba23fa7..0386eb6a100076 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -153,9 +153,6 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner return Status::OK(); } - // get block - RETURN_IF_ERROR(scanner->get_next(*(block.get()), &scanner_eof)); - std::shared_ptr block(new vectorized::Block()); RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof)); if (block->rows() == 0) { diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index 5c8d4f3100f1e4..f07ef1b4104584 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -43,25 +43,12 @@ VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile, : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), _cur_vjson_reader(nullptr) {} -VJsonScanner::~VJsonScanner() { - close(); -} - -Status VJsonScanner::open() { - RETURN_IF_ERROR(BaseScanner::open()); - return Status::OK(); -} - -void VJsonScanner::close() { - BaseScanner::close(); -} +VJsonScanner::~VJsonScanner() {} -Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { +Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) { SCOPED_TIMER(_read_timer); - Status status; const int batch_size = _state->batch_size(); size_t slot_num = _src_slot_descs.size(); - std::unique_ptr temp_block(new vectorized::Block()); std::vector columns(slot_num); auto string_type = make_nullable(std::make_shared()); for (int i = 0; i < slot_num; i++) { @@ -94,25 +81,10 @@ Status VJsonScanner::get_next(vectorized::Block& output_block, bool* eof) { continue; } COUNTER_UPDATE(_rows_read_counter, 1); - SCOPED_TIMER(_materialize_timer); } - if (columns[0]->size() > 0) { - auto n_columns = 0; - for (const auto slot_desc : _src_slot_descs) { - temp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); - } - - RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), slot_num)); - - if (_dest_vexpr_ctx.empty()) { - output_block = *(temp_block.get()); - } else { - RETURN_IF_ERROR(BaseScanner::execute_exprs(&output_block, temp_block.get())); - } - } + SCOPED_TIMER(_materialize_timer); + RETURN_IF_ERROR(BaseScanner::fill_dest_block(output_block, columns)); *eof = _scanner_eof; return Status::OK(); @@ -296,7 +268,7 @@ Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, } else { // not found if (slot_desc->is_nullable()) { auto* nullable_column = reinterpret_cast(column_ptr); - nullable_column->insert_data(nullptr, 0); + nullable_column->insert_default(); nullcount++; } else { RETURN_IF_ERROR(_append_error_msg( @@ -360,7 +332,7 @@ Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator v case rapidjson::Type::kNullType: if (slot_desc->is_nullable()) { auto* nullable_column = reinterpret_cast(column_ptr); - nullable_column->insert_data(nullptr, 0); + nullable_column->insert_default(); } else { RETURN_IF_ERROR(_append_error_msg( *value, "Json value is null, but the column `{}` is not nullable.", @@ -463,7 +435,7 @@ Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, // not match in jsondata. if (slot_descs[i]->is_nullable()) { auto* nullable_column = reinterpret_cast(column_ptr); - nullable_column->insert_data(nullptr, 0); + nullable_column->insert_default(); nullcount++; } else { RETURN_IF_ERROR(_append_error_msg( diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h index 6d4f2191e953f9..685861baf38777 100644 --- a/be/src/vec/exec/vjson_scanner.h +++ b/be/src/vec/exec/vjson_scanner.h @@ -57,11 +57,7 @@ class VJsonScanner : public JsonScanner { ~VJsonScanner(); - Status open() override; - - void close() override; - - Status get_next(vectorized::Block& output_block, bool* eof) override; + Status get_next(vectorized::Block* output_block, bool* eof) override; private: Status open_vjson_reader(); diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp index 50451a341c62ee..f9d55e073b43b1 100644 --- a/be/test/vec/exec/vjson_scanner_test.cpp +++ b/be/test/vec/exec/vjson_scanner_test.cpp @@ -592,5 +592,171 @@ TEST_F(VJsonScannerTest, simple_array_json) { ASSERT_TRUE(eof); } +TEST_F(VJsonScannerTest, use_jsonpaths_with_file_reader) { + VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + scan_node.init(_tnode); + auto status = scan_node.prepare(&_runtime_state); + EXPECT_TRUE(status.ok()); + + // set scan range + std::vector scan_ranges; + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_JSON; + range.strip_outer_array = true; + range.__isset.strip_outer_array = true; + range.splittable = true; + range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; + range.file_type = TFileType::FILE_LOCAL; + range.jsonpaths = + "[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", " + "\"$.decimal\"]"; + range.__isset.jsonpaths = true; + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + + scan_node.set_scan_ranges(scan_ranges); + status = scan_node.open(&_runtime_state); + EXPECT_TRUE(status.ok()); + + bool eof = false; + vectorized::Block block; + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(2, block.rows()); + EXPECT_EQ(6, block.columns()); + + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 6); + ASSERT_EQ(columns[0].to_string(0), "reference"); + ASSERT_EQ(columns[0].to_string(1), "fiction"); + ASSERT_EQ(columns[1].to_string(0), "NigelRees"); + ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh"); + ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury"); + ASSERT_EQ(columns[2].to_string(1), "SwordofHonour"); + + block.clear(); + status = scan_node.get_next(&_runtime_state, &block, &eof); + ASSERT_EQ(0, block.rows()); + ASSERT_TRUE(eof); +} + +TEST_F(VJsonScannerTest, use_jsonpaths_with_line_reader) { + VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + scan_node.init(_tnode); + auto status = scan_node.prepare(&_runtime_state); + EXPECT_TRUE(status.ok()); + + std::vector scan_ranges; + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_JSON; + range.splittable = true; + range.strip_outer_array = true; + range.__isset.strip_outer_array = true; + range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; + range.file_type = TFileType::FILE_LOCAL; + range.jsonpaths = + "[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", " + "\"$.decimal\"]"; + range.__isset.jsonpaths = true; + range.read_json_by_line = true; + range.__isset.read_json_by_line = true; + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + + scan_node.set_scan_ranges(scan_ranges); + status = scan_node.open(&_runtime_state); + EXPECT_TRUE(status.ok()); + + bool eof = false; + vectorized::Block block; + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(2, block.rows()); + EXPECT_EQ(6, block.columns()); + + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 6); + ASSERT_EQ(columns[0].to_string(0), "reference"); + ASSERT_EQ(columns[0].to_string(1), "fiction"); + ASSERT_EQ(columns[1].to_string(0), "NigelRees"); + ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh"); + ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury"); + ASSERT_EQ(columns[2].to_string(1), "SwordofHonour"); + + block.clear(); + status = scan_node.get_next(&_runtime_state, &block, &eof); + ASSERT_EQ(0, block.rows()); + ASSERT_TRUE(eof); +} + +TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) { + VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + scan_node.init(_tnode); + auto status = scan_node.prepare(&_runtime_state); + EXPECT_TRUE(status.ok()); + + // set scan range + std::vector scan_ranges; + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_JSON; + range.strip_outer_array = true; + range.__isset.strip_outer_array = true; + range.splittable = true; + range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; + range.file_type = TFileType::FILE_LOCAL; + range.jsonpaths = "[\"$.k1\", \"$.k2\", \"$.k3\", \"$.k4\", \"$.k5\", \"$.k6\"]"; + range.__isset.jsonpaths = true; + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + + scan_node.set_scan_ranges(scan_ranges); + status = scan_node.open(&_runtime_state); + EXPECT_TRUE(status.ok()); + + bool eof = false; + vectorized::Block block; + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(2, block.rows()); + EXPECT_EQ(6, block.columns()); + + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 6); + ASSERT_EQ(columns[0].to_string(0), "\\N"); + ASSERT_EQ(columns[0].to_string(1), "\\N"); + ASSERT_EQ(columns[1].to_string(0), "\\N"); + ASSERT_EQ(columns[1].to_string(1), "\\N"); + ASSERT_EQ(columns[2].to_string(0), "\\N"); + ASSERT_EQ(columns[2].to_string(1), "\\N"); + block.clear(); +} + } // namespace vectorized } // namespace doris From e0c086a633ef9c2c5fee1e4a361a17463eda4bdf Mon Sep 17 00:00:00 2001 From: hucheng01 Date: Mon, 9 May 2022 16:52:12 +0800 Subject: [PATCH 7/9] add vectorized vjson_scanner and apply vexpr --- be/src/exec/base_scanner.cpp | 2 +- be/src/exec/base_scanner.h | 2 +- be/src/exec/json_scanner.cpp | 3 +-- be/src/vec/exec/vbroker_scan_node.h | 3 +-- be/src/vec/exec/vjson_scanner.cpp | 10 +++++++--- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 9da503bc8f824a..c4e1b5c056794a 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -319,7 +319,7 @@ Status BaseScanner::fill_dest_block(vectorized::Block* dest_block, slot_desc->col_name())); } - RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _src_slot_descs.size())); + RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size())); if (_dest_vexpr_ctx.empty()) { *dest_block = *temp_block; diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 0347975e33880e..75433c6178e784 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -55,7 +55,7 @@ class BaseScanner { const std::vector& pre_filter_texprs, ScannerCounter* counter); virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); - if (_state != nullptr && _state->enable_vectorized_exec()) { + if (_state->enable_vectorized_exec()) { vectorized::VExpr::close(_dest_vexpr_ctx, _state); } }; diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 44745d32edd450..a23ce44b034e6f 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -339,8 +339,7 @@ Status JsonReader::_parse_jsonpath_and_json_root(const std::string& jsonpath, const std::string& json_root) { // parse jsonpath if (!jsonpath.empty()) { - Status st = _generate_json_paths(jsonpath, &_parsed_jsonpaths); - RETURN_IF_ERROR(st); + RETURN_IF_ERROR(_generate_json_paths(jsonpath, &_parsed_jsonpaths)); } if (!json_root.empty()) { JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h index 1606e0d83b951d..4ccebed5fbf836 100644 --- a/be/src/vec/exec/vbroker_scan_node.h +++ b/be/src/vec/exec/vbroker_scan_node.h @@ -31,8 +31,7 @@ namespace vectorized { class VBrokerScanNode final : public BrokerScanNode { public: VBrokerScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - - ~VBrokerScanNode() { close(_runtime_state); } + ~VBrokerScanNode() override = default; // Fill the next row batch by calling next() on the scanner, virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override { diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index f07ef1b4104584..3d10de1f464872 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -171,7 +171,7 @@ Status VJsonReader::_vhandle_simple_json(std::vector& columns, continue; // continue to read next } RETURN_IF_ERROR(st); - if (*is_empty_row == true && st == Status::OK()) { + if (*is_empty_row == true) { return Status::OK(); } _name_map.clear(); @@ -245,6 +245,10 @@ Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, int nullcount = 0; int ctx_idx = 0; for (auto slot_desc : slot_descs) { + if (!slot_desc->is_materialized()) { + continue; + } + int dest_index = ctx_idx++; auto* column_ptr = columns[dest_index].get(); rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd(); @@ -397,7 +401,7 @@ Status VJsonReader::_vhandle_nested_complex_json(std::vector& continue; // continue to read next } RETURN_IF_ERROR(st); - if (*is_empty_row == true && st == Status::OK()) { + if (*is_empty_row == true) { return Status::OK(); } *is_empty_row = false; @@ -488,7 +492,7 @@ Status VJsonReader::_parse_json(bool* is_empty_row, bool* eof) { *is_empty_row = true; } } - return st; + return Status::OK(); } Status VJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string error_msg, From a47c536c583b361f8a06c1446a5cee556577bd89 Mon Sep 17 00:00:00 2001 From: hucheng01 Date: Mon, 9 May 2022 17:23:52 +0800 Subject: [PATCH 8/9] add vectorized vjson_scanner and apply vexpr --- be/test/vec/exec/vjson_scanner_test.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp index f9d55e073b43b1..782ff6d47a755e 100644 --- a/be/test/vec/exec/vjson_scanner_test.cpp +++ b/be/test/vec/exec/vjson_scanner_test.cpp @@ -590,6 +590,7 @@ TEST_F(VJsonScannerTest, simple_array_json) { status = scan_node.get_next(&_runtime_state, &block, &eof); ASSERT_EQ(0, block.rows()); ASSERT_TRUE(eof); + scan_node.close(&_runtime_state); } TEST_F(VJsonScannerTest, use_jsonpaths_with_file_reader) { @@ -647,6 +648,7 @@ TEST_F(VJsonScannerTest, use_jsonpaths_with_file_reader) { status = scan_node.get_next(&_runtime_state, &block, &eof); ASSERT_EQ(0, block.rows()); ASSERT_TRUE(eof); + scan_node.close(&_runtime_state); } TEST_F(VJsonScannerTest, use_jsonpaths_with_line_reader) { @@ -705,6 +707,7 @@ TEST_F(VJsonScannerTest, use_jsonpaths_with_line_reader) { status = scan_node.get_next(&_runtime_state, &block, &eof); ASSERT_EQ(0, block.rows()); ASSERT_TRUE(eof); + scan_node.close(&_runtime_state); } TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) { @@ -756,6 +759,7 @@ TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) { ASSERT_EQ(columns[2].to_string(0), "\\N"); ASSERT_EQ(columns[2].to_string(1), "\\N"); block.clear(); + scan_node.close(&_runtime_state); } } // namespace vectorized From 8b60b775a96da4f99ef5fb8a1fc2773921b86d38 Mon Sep 17 00:00:00 2001 From: hucheng01 Date: Tue, 10 May 2022 11:52:05 +0800 Subject: [PATCH 9/9] add vectorized vjson_scanner and apply vexpr[format code] --- be/src/exec/base_scanner.h | 2 +- be/src/exec/broker_scan_node.cpp | 3 +-- be/src/vec/exec/vjson_scanner.cpp | 5 +++-- be/src/vec/exec/vjson_scanner.h | 8 ++++---- be/test/vec/exec/vjson_scanner_test.cpp | 5 +++-- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 75433c6178e784..02c2f568806e1a 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -21,8 +21,8 @@ #include "exprs/expr.h" #include "runtime/tuple.h" #include "util/runtime_profile.h" -#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" namespace doris { diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 4697e3af7861c8..c1144495c2863e 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -21,8 +21,6 @@ #include #include "common/object_pool.h" -#include "vec/exec/vbroker_scanner.h" -#include "vec/exec/vjson_scanner.h" #include "exec/json_scanner.h" #include "exec/orc_scanner.h" #include "exec/parquet_scanner.h" @@ -34,6 +32,7 @@ #include "util/runtime_profile.h" #include "util/thread.h" #include "vec/exec/vbroker_scanner.h" +#include "vec/exec/vjson_scanner.h" namespace doris { diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index 3d10de1f464872..b46d16e80e6fad 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -17,9 +17,10 @@ #include "vec/exec/vjson_scanner.h" -#include #include +#include + #include "env/env.h" #include "exec/broker_reader.h" #include "exec/buffered_reader.h" @@ -80,9 +81,9 @@ Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) { // Read empty row, just continue continue; } - COUNTER_UPDATE(_rows_read_counter, 1); } + COUNTER_UPDATE(_rows_read_counter, columns[0]->size()); SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(BaseScanner::fill_dest_block(output_block, columns)); diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h index 685861baf38777..0da3b96710cafb 100644 --- a/be/src/vec/exec/vjson_scanner.h +++ b/be/src/vec/exec/vjson_scanner.h @@ -32,15 +32,15 @@ #include "common/status.h" #include "exec/base_scanner.h" -#include "exec/json_scanner.h" #include "exec/exec_node.h" -#include "runtime/row_batch.h" +#include "exec/json_scanner.h" +#include "exprs/expr_context.h" #include "runtime/descriptors.h" #include "runtime/mem_pool.h" -#include "runtime/tuple.h" #include "runtime/mem_tracker.h" +#include "runtime/row_batch.h" +#include "runtime/tuple.h" #include "util/runtime_profile.h" -#include "exprs/expr_context.h" namespace doris { class ExprContext; diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp index 782ff6d47a755e..c96772a0119097 100644 --- a/be/test/vec/exec/vjson_scanner_test.cpp +++ b/be/test/vec/exec/vjson_scanner_test.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "vec/exec/vjson_scanner.h" + #include #include @@ -25,8 +27,6 @@ #include "common/object_pool.h" #include "exec/broker_scan_node.h" #include "exec/local_file_reader.h" -#include "vec/exec/vbroker_scan_node.h" -#include "vec/exec/vjson_scanner.h" #include "exprs/cast_functions.h" #include "exprs/decimalv2_operators.h" #include "gen_cpp/Descriptors_types.h" @@ -37,6 +37,7 @@ #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "runtime/user_function_cache.h" +#include "vec/exec/vbroker_scan_node.h" namespace doris { namespace vectorized {