diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 8621cf75f83de7..c4e1b5c056794a 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -156,7 +156,6 @@ Status BaseScanner::init_expr_ctxes() { RETURN_IF_ERROR(ctx->open(_state)); _dest_expr_ctx.emplace_back(ctx); } - if (has_slot_id_map) { auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) { @@ -279,6 +278,58 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { return Status::OK(); } +Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) { + // filter block + if (!_vpre_filter_ctxs.empty()) { + for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { + auto old_rows = temp_block->rows(); + RETURN_IF_ERROR( + vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num)); + _counter->num_rows_unselected += old_rows - temp_block->rows(); + } + } + return Status::OK(); +} + +Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) { + // Do vectorized expr here + Status status; + if (!_dest_vexpr_ctx.empty()) { + *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs( + _dest_vexpr_ctx, *temp_block, status); + if (UNLIKELY(output_block->rows() == 0)) { + return status; + } + } + + return Status::OK(); +} + +Status BaseScanner::fill_dest_block(vectorized::Block* dest_block, + std::vector& columns) { + if (columns.empty() || columns[0]->size() == 0) { + return Status::OK(); + } + + std::unique_ptr temp_block(new vectorized::Block()); + auto n_columns = 0; + for (const auto slot_desc : _src_slot_descs) { + temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size())); + + if (_dest_vexpr_ctx.empty()) { + *dest_block = *temp_block; + } else { + RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get())); + } + + return Status::OK(); +} + void BaseScanner::fill_slots_of_columns_from_path( int start, const std::vector& columns_from_path) { // values of columns from path can not be null diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 9c4179874e0923..02c2f568806e1a 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -22,6 +22,7 @@ #include "runtime/tuple.h" #include "util/runtime_profile.h" #include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" namespace doris { @@ -75,11 +76,18 @@ class BaseScanner { virtual void close() = 0; Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple); + Status fill_dest_block(vectorized::Block* dest_block, + std::vector& columns); + void fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path); void free_expr_local_allocations(); + Status filter_block(vectorized::Block* temp_block, size_t slot_num); + + Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block); + protected: RuntimeState* _state; const TBrokerScanRangeParams& _params; diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 513e653fb760a1..c1144495c2863e 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -32,6 +32,7 @@ #include "util/runtime_profile.h" #include "util/thread.h" #include "vec/exec/vbroker_scanner.h" +#include "vec/exec/vjson_scanner.h" namespace doris { @@ -234,9 +235,15 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan counter); break; case TFileFormatType::FORMAT_JSON: - scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, scan_range.broker_addresses, _pre_filter_texprs, - counter); + if (_vectorized) { + scan = new vectorized::VJsonScanner( + _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, + scan_range.broker_addresses, _pre_filter_texprs, counter); + } else { + scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, scan_range.broker_addresses, + _pre_filter_texprs, counter); + } break; default: if (_vectorized) { diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 5e002ac44e9d61..a23ce44b034e6f 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -112,14 +112,17 @@ Status JsonScanner::open_next_reader() { _scanner_eof = true; return Status::OK(); } + RETURN_IF_ERROR(open_based_reader()); + RETURN_IF_ERROR(open_json_reader()); + _next_range++; + return Status::OK(); +} +Status JsonScanner::open_based_reader() { RETURN_IF_ERROR(open_file_reader()); if (_read_json_by_line) { RETURN_IF_ERROR(open_line_reader()); } - RETURN_IF_ERROR(open_json_reader()); - _next_range++; - return Status::OK(); } @@ -215,6 +218,25 @@ Status JsonScanner::open_json_reader() { bool num_as_string = false; bool fuzzy_parse = false; + RETURN_IF_ERROR( + get_range_params(jsonpath, json_root, strip_outer_array, num_as_string, fuzzy_parse)); + if (_read_json_by_line) { + _cur_json_reader = + new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, + fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader); + } else { + _cur_json_reader = + new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, + fuzzy_parse, &_scanner_eof, _cur_file_reader); + } + + RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root)); + return Status::OK(); +} + +Status JsonScanner::get_range_params(std::string& jsonpath, std::string& json_root, + bool& strip_outer_array, bool& num_as_string, + bool& fuzzy_parse) { const TBrokerRangeDesc& range = _ranges[_next_range]; if (range.__isset.jsonpaths) { @@ -232,17 +254,6 @@ Status JsonScanner::open_json_reader() { if (range.__isset.fuzzy_parse) { fuzzy_parse = range.fuzzy_parse; } - if (_read_json_by_line) { - _cur_json_reader = - new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, - fuzzy_parse, &_scanner_eof, nullptr, _cur_line_reader); - } else { - _cur_json_reader = - new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string, - fuzzy_parse, &_scanner_eof, _cur_file_reader); - } - - RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root)); return Status::OK(); } @@ -308,14 +319,8 @@ JsonReader::~JsonReader() { } Status JsonReader::init(const std::string& jsonpath, const std::string& json_root) { - // parse jsonpath - if (!jsonpath.empty()) { - Status st = _generate_json_paths(jsonpath, &_parsed_jsonpaths); - RETURN_IF_ERROR(st); - } - if (!json_root.empty()) { - JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); - } + // generate _parsed_jsonpaths and _parsed_json_root + RETURN_IF_ERROR(_parse_jsonpath_and_json_root(jsonpath, json_root)); //improve performance if (_parsed_jsonpaths.empty()) { // input is a simple json-string @@ -330,6 +335,18 @@ Status JsonReader::init(const std::string& jsonpath, const std::string& json_roo return Status::OK(); } +Status JsonReader::_parse_jsonpath_and_json_root(const std::string& jsonpath, + const std::string& json_root) { + // parse jsonpath + if (!jsonpath.empty()) { + RETURN_IF_ERROR(_generate_json_paths(jsonpath, &_parsed_jsonpaths)); + } + if (!json_root.empty()) { + JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); + } + return Status::OK(); +} + Status JsonReader::_generate_json_paths(const std::string& jsonpath, std::vector>* vect) { rapidjson::Document jsonpaths_doc; diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index b12c96f3961976..276b2dd077d842 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -67,13 +67,17 @@ class JsonScanner : public BaseScanner { // Close this scanner void close() override; -private: +protected: Status open_file_reader(); Status open_line_reader(); Status open_json_reader(); Status open_next_reader(); -private: + Status open_based_reader(); + Status get_range_params(std::string& jsonpath, std::string& json_root, bool& strip_outer_array, + bool& num_as_string, bool& fuzzy_parse); + +protected: const std::vector& _ranges; const std::vector& _broker_addresses; @@ -129,7 +133,7 @@ class JsonReader { Status read_json_row(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* is_empty_row, bool* eof); -private: +protected: Status (JsonReader::*_handle_json_callback)(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* is_empty_row, bool* eof); @@ -158,8 +162,9 @@ class JsonReader { void _close(); Status _generate_json_paths(const std::string& jsonpath, std::vector>* vect); + Status _parse_jsonpath_and_json_root(const std::string& jsonpath, const std::string& json_root); -private: +protected: int _next_line; int _total_lines; RuntimeState* _state; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 7555e9d0ca9222..22fa489f463dae 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -101,6 +101,7 @@ set(VEC_FILES exec/vtable_function_node.cpp exec/vbroker_scan_node.cpp exec/vbroker_scanner.cpp + exec/vjson_scanner.cpp exec/join/vhash_join_node.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp new file mode 100644 index 00000000000000..b46d16e80e6fad --- /dev/null +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -0,0 +1,522 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vjson_scanner.h" + +#include + +#include + +#include "env/env.h" +#include "exec/broker_reader.h" +#include "exec/buffered_reader.h" +#include "exec/local_file_reader.h" +#include "exec/plain_text_line_reader.h" +#include "exec/s3_reader.h" +#include "exprs/expr.h" +#include "exprs/json_functions.h" +#include "gutil/strings/split.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/time.h" + +namespace doris::vectorized { + +VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter) + : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), + _cur_vjson_reader(nullptr) {} + +VJsonScanner::~VJsonScanner() {} + +Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) { + SCOPED_TIMER(_read_timer); + const int batch_size = _state->batch_size(); + size_t slot_num = _src_slot_descs.size(); + std::vector columns(slot_num); + auto string_type = make_nullable(std::make_shared()); + for (int i = 0; i < slot_num; i++) { + columns[i] = string_type->create_column(); + } + + // Get one line + while (columns[0]->size() < batch_size && !_scanner_eof) { + if (_cur_file_reader == nullptr || _cur_reader_eof) { + RETURN_IF_ERROR(open_next_reader()); + // If there isn't any more reader, break this + if (_scanner_eof) { + break; + } + } + + if (_read_json_by_line && _skip_next_line) { + size_t size = 0; + const uint8_t* line_ptr = nullptr; + RETURN_IF_ERROR(_cur_line_reader->read_line(&line_ptr, &size, &_cur_reader_eof)); + _skip_next_line = false; + continue; + } + + bool is_empty_row = false; + RETURN_IF_ERROR(_cur_vjson_reader->read_json_column(columns, _src_slot_descs, &is_empty_row, + &_cur_reader_eof)); + if (is_empty_row) { + // Read empty row, just continue + continue; + } + } + + COUNTER_UPDATE(_rows_read_counter, columns[0]->size()); + SCOPED_TIMER(_materialize_timer); + RETURN_IF_ERROR(BaseScanner::fill_dest_block(output_block, columns)); + + *eof = _scanner_eof; + return Status::OK(); +} + +Status VJsonScanner::open_next_reader() { + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + RETURN_IF_ERROR(JsonScanner::open_based_reader()); + RETURN_IF_ERROR(open_vjson_reader()); + _next_range++; + return Status::OK(); +} + +Status VJsonScanner::open_vjson_reader() { + if (_cur_vjson_reader != nullptr) { + _cur_vjson_reader.reset(); + } + std::string json_root = ""; + std::string jsonpath = ""; + bool strip_outer_array = false; + bool num_as_string = false; + bool fuzzy_parse = false; + + RETURN_IF_ERROR(JsonScanner::get_range_params(jsonpath, json_root, strip_outer_array, + num_as_string, fuzzy_parse)); + if (_read_json_by_line) { + _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, + num_as_string, fuzzy_parse, &_scanner_eof, nullptr, + _cur_line_reader)); + } else { + _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array, + num_as_string, fuzzy_parse, &_scanner_eof, + _cur_file_reader)); + } + + RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root)); + return Status::OK(); +} + +VJsonReader::VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, + bool strip_outer_array, bool num_as_string, bool fuzzy_parse, + bool* scanner_eof, FileReader* file_reader, LineReader* line_reader) + : JsonReader(state, counter, profile, strip_outer_array, num_as_string, fuzzy_parse, + scanner_eof, file_reader, line_reader), + _vhandle_json_callback(nullptr) {} + +VJsonReader::~VJsonReader() {} + +Status VJsonReader::init(const std::string& jsonpath, const std::string& json_root) { + // generate _parsed_jsonpaths and _parsed_json_root + RETURN_IF_ERROR(JsonReader::_parse_jsonpath_and_json_root(jsonpath, json_root)); + + //improve performance + if (_parsed_jsonpaths.empty()) { // input is a simple json-string + _vhandle_json_callback = &VJsonReader::_vhandle_simple_json; + } else { // input is a complex json-string and a json-path + if (_strip_outer_array) { + _vhandle_json_callback = &VJsonReader::_vhandle_flat_array_complex_json; + } else { + _vhandle_json_callback = &VJsonReader::_vhandle_nested_complex_json; + } + } + + return Status::OK(); +} + +Status VJsonReader::read_json_column(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row, eof); +} + +Status VJsonReader::_vhandle_simple_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + do { + bool valid = false; + if (_next_line >= _total_lines) { // parse json and generic document + Status st = _parse_json(is_empty_row, eof); + if (st.is_data_quality_error()) { + continue; // continue to read next + } + RETURN_IF_ERROR(st); + if (*is_empty_row == true) { + return Status::OK(); + } + _name_map.clear(); + rapidjson::Value* objectValue = nullptr; + if (_json_doc->IsArray()) { + _total_lines = _json_doc->Size(); + if (_total_lines == 0) { + // may be passing an empty json, such as "[]" + RETURN_IF_ERROR(_append_error_msg(*_json_doc, "Empty json line", "", nullptr)); + if (*_scanner_eof) { + *is_empty_row = true; + return Status::OK(); + } + continue; + } + objectValue = &(*_json_doc)[0]; + } else { + _total_lines = 1; // only one row + objectValue = _json_doc; + } + _next_line = 0; + if (_fuzzy_parse) { + for (auto v : slot_descs) { + for (int i = 0; i < objectValue->MemberCount(); ++i) { + auto it = objectValue->MemberBegin() + i; + if (v->col_name() == it->name.GetString()) { + _name_map[v->col_name()] = i; + break; + } + } + } + } + } + + if (_json_doc->IsArray()) { // handle case 1 + rapidjson::Value& objectValue = (*_json_doc)[_next_line]; // json object + RETURN_IF_ERROR(_set_column_value(objectValue, columns, slot_descs, &valid)); + } else { // handle case 2 + RETURN_IF_ERROR(_set_column_value(*_json_doc, columns, slot_descs, &valid)); + } + _next_line++; + if (!valid) { + if (*_scanner_eof) { + // When _scanner_eof is true and valid is false, it means that we have encountered + // unqualified data and decided to stop the scan. + *is_empty_row = true; + return Status::OK(); + } + continue; + } + *is_empty_row = false; + break; // get a valid row, then break + } while (_next_line <= _total_lines); + return Status::OK(); +} + +// for simple format json +// set valid to true and return OK if succeed. +// set valid to false and return OK if we met an invalid row. +// return other status if encounter other problmes. +Status VJsonReader::_set_column_value(rapidjson::Value& objectValue, + std::vector& columns, + const std::vector& slot_descs, bool* valid) { + if (!objectValue.IsObject()) { + // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, + // not other type of Json format. + RETURN_IF_ERROR(_append_error_msg(objectValue, "Expect json object value", "", valid)); + return Status::OK(); + } + + int nullcount = 0; + int ctx_idx = 0; + for (auto slot_desc : slot_descs) { + if (!slot_desc->is_materialized()) { + continue; + } + + int dest_index = ctx_idx++; + auto* column_ptr = columns[dest_index].get(); + rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd(); + + if (_fuzzy_parse) { + auto idx_it = _name_map.find(slot_desc->col_name()); + if (idx_it != _name_map.end() && idx_it->second < objectValue.MemberCount()) { + it = objectValue.MemberBegin() + idx_it->second; + } + } else { + it = objectValue.FindMember( + rapidjson::Value(slot_desc->col_name().c_str(), slot_desc->col_name().size())); + } + + if (it != objectValue.MemberEnd()) { + const rapidjson::Value& value = it->value; + RETURN_IF_ERROR(_write_data_to_column(&value, slot_desc, column_ptr, valid)); + if (!(*valid)) { + return Status::OK(); + } + } else { // not found + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(column_ptr); + nullable_column->insert_default(); + nullcount++; + } else { + RETURN_IF_ERROR(_append_error_msg( + objectValue, + "The column `{}` is not nullable, but it's not found in jsondata.", + slot_desc->col_name(), valid)); + break; + } + } + } + + if (nullcount == slot_descs.size()) { + RETURN_IF_ERROR(_append_error_msg(objectValue, "All fields is null, this is a invalid row.", + "", valid)); + return Status::OK(); + } + *valid = true; + return Status::OK(); +} + +Status VJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value, + SlotDescriptor* slot_desc, + vectorized::IColumn* column_ptr, bool* valid) { + const char* str_value = nullptr; + char tmp_buf[128] = {0}; + int32_t wbytes = 0; + + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(column_ptr); + nullable_column->get_null_map_data().push_back(0); + column_ptr = &nullable_column->get_nested_column(); + } + + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + wbytes = strlen(str_value); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf(tmp_buf, "%u", value->GetUint()); + } else if (value->IsInt()) { + wbytes = sprintf(tmp_buf, "%d", value->GetInt()); + } else if (value->IsUint64()) { + wbytes = sprintf(tmp_buf, "%lu", value->GetUint64()); + } else if (value->IsInt64()) { + wbytes = sprintf(tmp_buf, "%ld", value->GetInt64()); + } else { + wbytes = sprintf(tmp_buf, "%f", value->GetDouble()); + } + str_value = tmp_buf; + break; + case rapidjson::Type::kFalseType: + wbytes = 1; + str_value = (char*)"0"; + break; + case rapidjson::Type::kTrueType: + wbytes = 1; + str_value = (char*)"1"; + break; + case rapidjson::Type::kNullType: + if (slot_desc->is_nullable()) { + auto* nullable_column = reinterpret_cast(column_ptr); + nullable_column->insert_default(); + } else { + RETURN_IF_ERROR(_append_error_msg( + *value, "Json value is null, but the column `{}` is not nullable.", + slot_desc->col_name(), valid)); + return Status::OK(); + } + break; + default: + // for other type like array or object. we convert it to string to save + std::string json_str = JsonReader::_print_json_value(*value); + wbytes = json_str.size(); + str_value = json_str.c_str(); + break; + } + + // TODO: if the vexpr can support another 'slot_desc type' than 'TYPE_VARCHAR', + // we need use a function to support these types to insert data in columns. + DCHECK(slot_desc->type().type == TYPE_VARCHAR); + assert_cast(column_ptr)->insert_data(str_value, wbytes); + + *valid = true; + return Status::OK(); +} + +Status VJsonReader::_vhandle_flat_array_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + do { + if (_next_line >= _total_lines) { + Status st = _parse_json(is_empty_row, eof); + if (st.is_data_quality_error()) { + continue; // continue to read next + } + RETURN_IF_ERROR(st); + if (*is_empty_row == true) { + if (st == Status::OK()) { + return Status::OK(); + } + if (_total_lines == 0) { + continue; + } + } + } + rapidjson::Value& objectValue = (*_json_doc)[_next_line++]; + bool valid = true; + RETURN_IF_ERROR(_write_columns_by_jsonpath(objectValue, slot_descs, columns, &valid)); + if (!valid) { + continue; // process next line + } + *is_empty_row = false; + break; // get a valid row, then break + } while (_next_line <= _total_lines); + return Status::OK(); +} + +Status VJsonReader::_vhandle_nested_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + while (true) { + Status st = _parse_json(is_empty_row, eof); + if (st.is_data_quality_error()) { + continue; // continue to read next + } + RETURN_IF_ERROR(st); + if (*is_empty_row == true) { + return Status::OK(); + } + *is_empty_row = false; + break; // read a valid row + } + bool valid = true; + RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs, columns, &valid)); + if (!valid) { + // there is only one line in this case, so if it return false, just set is_empty_row true + // so that the caller will continue reading next line. + *is_empty_row = true; + } + return Status::OK(); +} + +Status VJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, + const std::vector& slot_descs, + std::vector& columns, + bool* valid) { + int nullcount = 0; + int ctx_idx = 0; + size_t column_num = slot_descs.size(); + for (size_t i = 0; i < column_num; i++) { + int dest_index = ctx_idx++; + auto* column_ptr = columns[dest_index].get(); + rapidjson::Value* json_values = nullptr; + bool wrap_explicitly = false; + if (LIKELY(i < _parsed_jsonpaths.size())) { + json_values = JsonFunctions::get_json_array_from_parsed_json( + _parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator(), + &wrap_explicitly); + } + + if (json_values == nullptr) { + // not match in jsondata. + if (slot_descs[i]->is_nullable()) { + auto* nullable_column = reinterpret_cast(column_ptr); + nullable_column->insert_default(); + nullcount++; + } else { + RETURN_IF_ERROR(_append_error_msg( + objectValue, + "The column `{}` is not nullable, but it's not found in jsondata.", + slot_descs[i]->col_name(), valid)); + break; + } + } else { + CHECK(json_values->IsArray()); + if (json_values->Size() == 1 && wrap_explicitly) { + // NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array. + // so here we unwrap the array to get the real element. + // if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap. + json_values = &((*json_values)[0]); + } + RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], column_ptr, valid)); + if (!(*valid)) { + break; + } + } + } + + if (nullcount == column_num) { + RETURN_IF_ERROR(_append_error_msg( + objectValue, "All fields is null or not matched, this is a invalid row.", "", + valid)); + } + return Status::OK(); +} + +Status VJsonReader::_parse_json(bool* is_empty_row, bool* eof) { + size_t size = 0; + Status st = JsonReader::_parse_json_doc(&size, eof); + // terminate if encounter other errors + RETURN_IF_ERROR(st); + + // read all data, then return + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + + if (!_parsed_jsonpaths.empty() && _strip_outer_array) { + _total_lines = _json_doc->Size(); + _next_line = 0; + + if (_total_lines == 0) { + // meet an empty json array. + *is_empty_row = true; + } + } + return Status::OK(); +} + +Status VJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string error_msg, + std::string col_name, bool* valid) { + std::string err_msg; + if (!col_name.empty()) { + fmt::memory_buffer error_buf; + fmt::format_to(error_buf, error_msg, col_name); + err_msg = fmt::to_string(error_buf); + } else { + err_msg = error_msg; + } + + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return JsonReader::_print_json_value(objectValue); }, + [&]() -> std::string { return err_msg; }, _scanner_eof)); + + _counter->num_rows_filtered++; + if (valid != nullptr) { + // current row is invalid + *valid = false; + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h new file mode 100644 index 00000000000000..0da3b96710cafb --- /dev/null +++ b/be/src/vec/exec/vjson_scanner.h @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BE_SRC_VJSON_SCANNER_H_ +#define BE_SRC_VJSON_SCANNER_H_ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/base_scanner.h" +#include "exec/exec_node.h" +#include "exec/json_scanner.h" +#include "exprs/expr_context.h" +#include "runtime/descriptors.h" +#include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" +#include "runtime/row_batch.h" +#include "runtime/tuple.h" +#include "util/runtime_profile.h" + +namespace doris { +class ExprContext; + +namespace vectorized { +class VJsonReader; + +class VJsonScanner : public JsonScanner { +public: + VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + ~VJsonScanner(); + + Status get_next(vectorized::Block* output_block, bool* eof) override; + +private: + Status open_vjson_reader(); + Status open_next_reader(); + +private: + std::unique_ptr _cur_vjson_reader; +}; + +class VJsonReader : public JsonReader { +public: + VJsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, + bool strip_outer_array, bool num_as_string, bool fuzzy_parse, bool* scanner_eof, + FileReader* file_reader = nullptr, LineReader* line_reader = nullptr); + + ~VJsonReader(); + + Status init(const std::string& jsonpath, const std::string& json_root); + + Status read_json_column(std::vector& columns, + const std::vector& slot_descs, bool* is_empty_row, + bool* eof); + +private: + Status (VJsonReader::*_vhandle_json_callback)( + std::vector& columns, + const std::vector& slot_descs, bool* is_empty_row, bool* eof); + + Status _vhandle_simple_json(std::vector& columns, + const std::vector& slot_descs, bool* is_empty_row, + bool* eof); + + Status _vhandle_flat_array_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); + + Status _vhandle_nested_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); + + Status _write_columns_by_jsonpath(rapidjson::Value& objectValue, + const std::vector& slot_descs, + std::vector& columns, bool* valid); + + Status _set_column_value(rapidjson::Value& objectValue, std::vector& columns, + const std::vector& slot_descs, bool* valid); + + Status _write_data_to_column(rapidjson::Value::ConstValueIterator value, + SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr, + bool* valid); + + Status _parse_json(bool* is_empty_row, bool* eof); + + Status _append_error_msg(const rapidjson::Value& objectValue, std::string error_msg, + std::string col_name, bool* valid); +}; + +} // namespace vectorized +} // namespace doris +#endif diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 5abf9cceb22386..6b7981306044ed 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -338,6 +338,7 @@ set(VEC_TEST_FILES vec/exec/vgeneric_iterators_test.cpp vec/exec/vbroker_scan_node_test.cpp vec/exec/vbroker_scanner_test.cpp + vec/exec/vjson_scanner_test.cpp vec/exec/vtablet_sink_test.cpp vec/exprs/vexpr_test.cpp vec/function/function_array_element_test.cpp diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp new file mode 100644 index 00000000000000..c96772a0119097 --- /dev/null +++ b/be/test/vec/exec/vjson_scanner_test.cpp @@ -0,0 +1,767 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vjson_scanner.h" + +#include +#include + +#include +#include +#include + +#include "common/object_pool.h" +#include "exec/broker_scan_node.h" +#include "exec/local_file_reader.h" +#include "exprs/cast_functions.h" +#include "exprs/decimalv2_operators.h" +#include "gen_cpp/Descriptors_types.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" +#include "runtime/tuple.h" +#include "runtime/user_function_cache.h" +#include "vec/exec/vbroker_scan_node.h" + +namespace doris { +namespace vectorized { + +class VJsonScannerTest : public testing::Test { +public: + VJsonScannerTest() : _runtime_state(TQueryGlobals()) { + init(); + _runtime_state._instance_mem_tracker.reset(new MemTracker()); + _runtime_state._exec_env = ExecEnv::GetInstance(); + } + void init(); + static void SetUpTestCase() { + UserFunctionCache::instance()->init( + "./be/test/runtime/test_data/user_function_cache/normal"); + CastFunctions::init(); + DecimalV2Operators::init(); + } + +protected: + virtual void SetUp() {} + virtual void TearDown() {} + +private: + int create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id); + void create_expr_info(); + void init_desc_table(); + RuntimeState _runtime_state; + ObjectPool _obj_pool; + std::map _slots_map; + TBrokerScanRangeParams _params; + DescriptorTbl* _desc_tbl; + TPlanNode _tnode; +}; + +#define TUPLE_ID_DST 0 +#define TUPLE_ID_SRC 1 +#define COLUMN_NUMBERS 6 +#define DST_TUPLE_SLOT_ID_START 1 +#define SRC_TUPLE_SLOT_ID_START 7 + +int VJsonScannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + const char* columnNames[] = {"category", "author", "title", "price", "largeint", "decimal"}; + for (int i = 0; i < COLUMN_NUMBERS; i++) { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 1; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = i; + slot_desc.byteOffset = i * 16 + 8; + slot_desc.nullIndicatorByte = i / 8; + slot_desc.nullIndicatorBit = i % 8; + slot_desc.colName = columnNames[i]; + slot_desc.slotIdx = i + 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + + { + // TTupleDescriptor source + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_SRC; + t_tuple_desc.byteSize = COLUMN_NUMBERS * 16 + 8; + t_tuple_desc.numNullBytes = 0; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +int VJsonScannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { + int32_t byteOffset = 8; + { //category + TSlotDescriptor slot_desc; + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 0; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 0; + slot_desc.colName = "category"; + slot_desc.slotIdx = 1; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + { // author + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 1; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 1; + slot_desc.colName = "author"; + slot_desc.slotIdx = 2; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + { // title + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 2; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 2; + slot_desc.colName = "title"; + slot_desc.slotIdx = 3; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + { // price + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::DOUBLE); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 3; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 3; + slot_desc.colName = "price"; + slot_desc.slotIdx = 4; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 8; + { // lagreint + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::LARGEINT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 4; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 4; + slot_desc.colName = "lagreint"; + slot_desc.slotIdx = 5; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + byteOffset += 16; + { // decimal + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__isset.precision = true; + scalar_type.__isset.scale = true; + scalar_type.__set_precision(-1); + scalar_type.__set_scale(-1); + scalar_type.__set_type(TPrimitiveType::DECIMALV2); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 5; + slot_desc.byteOffset = byteOffset; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = 5; + slot_desc.colName = "decimal"; + slot_desc.slotIdx = 6; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + + t_desc_table.__isset.slotDescriptors = true; + { + // TTupleDescriptor dest + TTupleDescriptor t_tuple_desc; + t_tuple_desc.id = TUPLE_ID_DST; + t_tuple_desc.byteSize = byteOffset + 8; + t_tuple_desc.numNullBytes = 0; + t_tuple_desc.tableId = 0; + t_tuple_desc.__isset.tableId = true; + t_desc_table.tupleDescriptors.push_back(t_tuple_desc); + } + return next_slot_id; +} + +void VJsonScannerTest::init_desc_table() { + TDescriptorTable t_desc_table; + + // table descriptors + TTableDescriptor t_table_desc; + + t_table_desc.id = 0; + t_table_desc.tableType = TTableType::BROKER_TABLE; + t_table_desc.numCols = 0; + t_table_desc.numClusteringCols = 0; + t_desc_table.tableDescriptors.push_back(t_table_desc); + t_desc_table.__isset.tableDescriptors = true; + + int next_slot_id = 1; + + next_slot_id = create_dst_tuple(t_desc_table, next_slot_id); + + next_slot_id = create_src_tuple(t_desc_table, next_slot_id); + + DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl); + + _runtime_state.set_desc_tbl(_desc_tbl); +} + +void VJsonScannerTest::create_expr_info() { + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(5000); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + // category VARCHAR --> VARCHAR + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START; // category id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START); + } + // author VARCHAR --> VARCHAR + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 1; // author id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 1, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 1); + } + // title VARCHAR --> VARCHAR + { + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 2; // log_time id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 2, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 2); + } + + // price VARCHAR --> DOUBLE + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodouble"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttodouble(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_double_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 3; // price id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 3, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 3); + } + // largeint VARCHAR --> LargeInt + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::LARGEINT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttolargeint"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttolargeint(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_large_int_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 4; // price id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 4, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 4); + } + // decimal VARCHAR --> Decimal + { + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__isset.precision = true; + scalar_type.__isset.scale = true; + scalar_type.__set_precision(-1); + scalar_type.__set_scale(-1); + scalar_type.__set_type(TPrimitiveType::DECIMALV2); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TExprNode cast_expr; + cast_expr.node_type = TExprNodeType::CAST_EXPR; + cast_expr.type = int_type; + cast_expr.__set_opcode(TExprOpcode::CAST); + cast_expr.__set_num_children(1); + cast_expr.__set_output_scale(-1); + cast_expr.__isset.fn = true; + cast_expr.fn.name.function_name = "casttodecimalv2"; + cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN; + cast_expr.fn.arg_types.push_back(varchar_type); + cast_expr.fn.ret_type = int_type; + cast_expr.fn.has_var_args = false; + cast_expr.fn.__set_signature("casttodecimalv2(VARCHAR(*))"); + cast_expr.fn.__isset.scalar_fn = true; + cast_expr.fn.scalar_fn.symbol = "doris::DecimalV2Operators::cast_to_decimalv2_val"; + + TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = varchar_type; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + slot_ref.slot_ref.slot_id = SRC_TUPLE_SLOT_ID_START + 5; // price id in src tuple + slot_ref.slot_ref.tuple_id = 1; + + TExpr expr; + expr.nodes.push_back(cast_expr); + expr.nodes.push_back(slot_ref); + + _params.expr_of_dest_slot.emplace(DST_TUPLE_SLOT_ID_START + 5, expr); + _params.src_slot_ids.push_back(SRC_TUPLE_SLOT_ID_START + 5); + } + // _params.__isset.expr_of_dest_slot = true; + _params.__set_dest_tuple_id(TUPLE_ID_DST); + _params.__set_src_tuple_id(TUPLE_ID_SRC); +} + +void VJsonScannerTest::init() { + create_expr_info(); + init_desc_table(); + + // Node Id + _tnode.node_id = 0; + _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE; + _tnode.num_children = 0; + _tnode.limit = -1; + _tnode.row_tuples.push_back(0); + _tnode.nullable_tuples.push_back(false); + _tnode.broker_scan_node.tuple_id = 0; + _tnode.__isset.broker_scan_node = true; +} + +TEST_F(VJsonScannerTest, simple_array_json) { + VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + scan_node.init(_tnode); + auto status = scan_node.prepare(&_runtime_state); + EXPECT_TRUE(status.ok()); + + // set scan range + std::vector scan_ranges; + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_JSON; + range.strip_outer_array = true; + range.__isset.strip_outer_array = true; + range.splittable = true; + range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; + range.file_type = TFileType::FILE_LOCAL; + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + + scan_node.set_scan_ranges(scan_ranges); + status = scan_node.open(&_runtime_state); + EXPECT_TRUE(status.ok()); + + bool eof = false; + vectorized::Block block; + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(2, block.rows()); + EXPECT_EQ(6, block.columns()); + + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 6); + ASSERT_EQ(columns[0].to_string(0), "reference"); + ASSERT_EQ(columns[0].to_string(1), "fiction"); + ASSERT_EQ(columns[1].to_string(0), "NigelRees"); + ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh"); + ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury"); + ASSERT_EQ(columns[2].to_string(1), "SwordofHonour"); + ASSERT_EQ(columns[3].to_string(0), "8.950000"); + ASSERT_EQ(columns[3].to_string(1), "12.990000"); + ASSERT_EQ(columns[4].to_string(0), "1234"); + ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424.000000"); + ASSERT_EQ(columns[5].to_string(0), "1234.123400"); + ASSERT_EQ(columns[5].to_string(1), "10000000000000.001953"); + + block.clear(); + status = scan_node.get_next(&_runtime_state, &block, &eof); + ASSERT_EQ(0, block.rows()); + ASSERT_TRUE(eof); + scan_node.close(&_runtime_state); +} + +TEST_F(VJsonScannerTest, use_jsonpaths_with_file_reader) { + VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + scan_node.init(_tnode); + auto status = scan_node.prepare(&_runtime_state); + EXPECT_TRUE(status.ok()); + + // set scan range + std::vector scan_ranges; + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_JSON; + range.strip_outer_array = true; + range.__isset.strip_outer_array = true; + range.splittable = true; + range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; + range.file_type = TFileType::FILE_LOCAL; + range.jsonpaths = + "[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", " + "\"$.decimal\"]"; + range.__isset.jsonpaths = true; + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + + scan_node.set_scan_ranges(scan_ranges); + status = scan_node.open(&_runtime_state); + EXPECT_TRUE(status.ok()); + + bool eof = false; + vectorized::Block block; + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(2, block.rows()); + EXPECT_EQ(6, block.columns()); + + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 6); + ASSERT_EQ(columns[0].to_string(0), "reference"); + ASSERT_EQ(columns[0].to_string(1), "fiction"); + ASSERT_EQ(columns[1].to_string(0), "NigelRees"); + ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh"); + ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury"); + ASSERT_EQ(columns[2].to_string(1), "SwordofHonour"); + + block.clear(); + status = scan_node.get_next(&_runtime_state, &block, &eof); + ASSERT_EQ(0, block.rows()); + ASSERT_TRUE(eof); + scan_node.close(&_runtime_state); +} + +TEST_F(VJsonScannerTest, use_jsonpaths_with_line_reader) { + VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + scan_node.init(_tnode); + auto status = scan_node.prepare(&_runtime_state); + EXPECT_TRUE(status.ok()); + + std::vector scan_ranges; + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_JSON; + range.splittable = true; + range.strip_outer_array = true; + range.__isset.strip_outer_array = true; + range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; + range.file_type = TFileType::FILE_LOCAL; + range.jsonpaths = + "[\"$.category\", \"$.author\", \"$.title\", \"$.price\", \"$.largeint\", " + "\"$.decimal\"]"; + range.__isset.jsonpaths = true; + range.read_json_by_line = true; + range.__isset.read_json_by_line = true; + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + + scan_node.set_scan_ranges(scan_ranges); + status = scan_node.open(&_runtime_state); + EXPECT_TRUE(status.ok()); + + bool eof = false; + vectorized::Block block; + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(2, block.rows()); + EXPECT_EQ(6, block.columns()); + + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 6); + ASSERT_EQ(columns[0].to_string(0), "reference"); + ASSERT_EQ(columns[0].to_string(1), "fiction"); + ASSERT_EQ(columns[1].to_string(0), "NigelRees"); + ASSERT_EQ(columns[1].to_string(1), "EvelynWaugh"); + ASSERT_EQ(columns[2].to_string(0), "SayingsoftheCentury"); + ASSERT_EQ(columns[2].to_string(1), "SwordofHonour"); + + block.clear(); + status = scan_node.get_next(&_runtime_state, &block, &eof); + ASSERT_EQ(0, block.rows()); + ASSERT_TRUE(eof); + scan_node.close(&_runtime_state); +} + +TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) { + VBrokerScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); + scan_node.init(_tnode); + auto status = scan_node.prepare(&_runtime_state); + EXPECT_TRUE(status.ok()); + + // set scan range + std::vector scan_ranges; + { + TScanRangeParams scan_range_params; + + TBrokerScanRange broker_scan_range; + broker_scan_range.params = _params; + TBrokerRangeDesc range; + range.start_offset = 0; + range.size = -1; + range.format_type = TFileFormatType::FORMAT_JSON; + range.strip_outer_array = true; + range.__isset.strip_outer_array = true; + range.splittable = true; + range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json"; + range.file_type = TFileType::FILE_LOCAL; + range.jsonpaths = "[\"$.k1\", \"$.k2\", \"$.k3\", \"$.k4\", \"$.k5\", \"$.k6\"]"; + range.__isset.jsonpaths = true; + broker_scan_range.ranges.push_back(range); + scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); + scan_ranges.push_back(scan_range_params); + } + + scan_node.set_scan_ranges(scan_ranges); + status = scan_node.open(&_runtime_state); + EXPECT_TRUE(status.ok()); + + bool eof = false; + vectorized::Block block; + status = scan_node.get_next(&_runtime_state, &block, &eof); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(2, block.rows()); + EXPECT_EQ(6, block.columns()); + + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 6); + ASSERT_EQ(columns[0].to_string(0), "\\N"); + ASSERT_EQ(columns[0].to_string(1), "\\N"); + ASSERT_EQ(columns[1].to_string(0), "\\N"); + ASSERT_EQ(columns[1].to_string(1), "\\N"); + ASSERT_EQ(columns[2].to_string(0), "\\N"); + ASSERT_EQ(columns[2].to_string(1), "\\N"); + block.clear(); + scan_node.close(&_runtime_state); +} + +} // namespace vectorized +} // namespace doris