From c38e54422b103e865ae9298126a9ce4d2fa38898 Mon Sep 17 00:00:00 2001 From: Socrates Date: Tue, 24 Jun 2025 10:40:44 +0800 Subject: [PATCH 1/2] [fix](tvf) support compressed json file for tvf and refactor code (#51983) 1. support reading compressed json file for tvf 2. refactor code, intro init_schema_reader in GenericReader --- be/src/olap/push_handler.cpp | 1 - be/src/service/internal_service.cpp | 7 +- .../vec/exec/format/avro/avro_jni_reader.cpp | 5 +- be/src/vec/exec/format/avro/avro_jni_reader.h | 4 +- be/src/vec/exec/format/csv/csv_reader.cpp | 79 +++++++++--------- be/src/vec/exec/format/csv/csv_reader.h | 7 +- be/src/vec/exec/format/generic_reader.h | 6 ++ .../vec/exec/format/json/new_json_reader.cpp | 13 ++- be/src/vec/exec/format/json/new_json_reader.h | 1 + be/src/vec/exec/format/orc/vorc_reader.cpp | 11 ++- be/src/vec/exec/format/orc/vorc_reader.h | 2 + .../exec/format/parquet/vparquet_reader.cpp | 22 ++--- .../vec/exec/format/parquet/vparquet_reader.h | 4 +- .../vec/exec/format/table/iceberg_reader.cpp | 6 +- be/src/vec/exec/format/wal/wal_reader.h | 2 + be/src/vec/exec/scan/vfile_scanner.cpp | 12 +-- be/src/vec/exec/scan/vfile_scanner.h | 1 - .../format/parquet/parquet_reader_test.cpp | 4 - .../simple_object_json.json.gz | Bin 0 -> 211 bytes .../external_table_p0/tvf/test_hdfs_tvf.out | 14 ++++ .../tvf/test_hdfs_tvf.groovy | 10 +++ 21 files changed, 121 insertions(+), 90 deletions(-) create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 8cfe6a1a7f9223..4233b79e865480 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -643,7 +643,6 @@ Status PushBrokerReader::_get_next_reader() { const_cast(&_runtime_state->timezone_obj()), _io_ctx.get(), _runtime_state.get()); - RETURN_IF_ERROR(parquet_reader->open()); std::vector place_holder; init_status = parquet_reader->init_reader( _all_col_names, place_holder, _colname_to_value_range, _push_down_exprs, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ec5e0b7adf09fa..3cb999fea1b731 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -856,7 +856,6 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr case TFileFormatType::FORMAT_AVRO: { reader = vectorized::AvroJNIReader::create_unique(profile.get(), params, range, file_slots); - st = ((vectorized::AvroJNIReader*)(reader.get()))->init_fetch_table_schema_reader(); break; } default: @@ -865,6 +864,12 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr st.to_protobuf(result->mutable_status()); return; } + if (!st.ok()) { + LOG(WARNING) << "failed to create reader, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + st = reader->init_schema_reader(); if (!st.ok()) { LOG(WARNING) << "failed to init reader, errmsg=" << st; st.to_protobuf(result->mutable_status()); diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index 6591abab58d3d2..d6c38730f9fc58 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -54,7 +54,7 @@ Status AvroJNIReader::get_columns(std::unordered_map* colname_to_value_range) { _colname_to_value_range = colname_to_value_range; std::ostringstream required_fields; @@ -107,7 +107,8 @@ TFileType::type AvroJNIReader::get_file_type() { return type; } -Status AvroJNIReader::init_fetch_table_schema_reader() { +// open the jni connector for parsing schema +Status AvroJNIReader::init_schema_reader() { std::map required_param = {{"uri", _range.path}, {"file_type", std::to_string(get_file_type())}, {"is_get_table_schema", "true"}}; diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h b/be/src/vec/exec/format/avro/avro_jni_reader.h index c8d55cf58cf194..7daaa232f64e4c 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.h +++ b/be/src/vec/exec/format/avro/avro_jni_reader.h @@ -70,12 +70,12 @@ class AvroJNIReader : public JniReader { Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; - Status init_fetch_table_reader( + Status init_reader( const std::unordered_map* colname_to_value_range); TFileType::type get_file_type(); - Status init_fetch_table_schema_reader(); + Status init_schema_reader() override; Status get_parsed_schema(std::vector* col_names, std::vector* col_types) override; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 34a789cf854698..83625457882c6c 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -396,14 +396,45 @@ Status CsvReader::get_columns(std::unordered_map* n return Status::OK(); } -Status CsvReader::get_parsed_schema(std::vector* col_names, - std::vector* col_types) { - size_t read_line = 0; - bool is_parse_name = false; - RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name)); +// init decompressor, file reader and line reader for parsing schema +Status CsvReader::init_schema_reader() { + _start_offset = _range.start_offset; + if (_start_offset != 0) { + return Status::InvalidArgument( + "start offset of TFileRangeDesc must be zero in get parsered schema"); + } + if (_params.file_type == TFileType::FILE_BROKER) { + return Status::InternalError( + "Getting parsered schema from csv file do not support stream load and broker " + "load."); + } - if (read_line == 1) { - if (!is_parse_name) { //parse csv file without names and types + // csv file without names line and types line. + _read_line = 1; + _is_parse_name = false; + + if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && + !_params.file_attributes.header_type.empty()) { + std::string header_type = to_lower(_params.file_attributes.header_type); + if (header_type == BeConsts::CSV_WITH_NAMES) { + _is_parse_name = true; + } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { + _read_line = 2; + _is_parse_name = true; + } + } + + RETURN_IF_ERROR(_init_options()); + RETURN_IF_ERROR(_create_file_reader(true)); + RETURN_IF_ERROR(_create_decompressor()); + RETURN_IF_ERROR(_create_line_reader()); + return Status::OK(); +} + +Status CsvReader::get_parsed_schema(std::vector* col_names, + std::vector* col_types) { + if (_read_line == 1) { + if (!_is_parse_name) { //parse csv file without names and types size_t col_nums = 0; RETURN_IF_ERROR(_parse_col_nums(&col_nums)); for (size_t i = 0; i < col_nums; ++i) { @@ -708,40 +739,6 @@ void CsvReader::_split_line(const Slice& line) { _fields_splitter->split_line(line, &_split_values); } -Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { - _start_offset = _range.start_offset; - if (_start_offset != 0) { - return Status::InvalidArgument( - "start offset of TFileRangeDesc must be zero in get parsered schema"); - } - if (_params.file_type == TFileType::FILE_BROKER) { - return Status::InternalError( - "Getting parsered schema from csv file do not support stream load and broker " - "load."); - } - - // csv file without names line and types line. - *read_line = 1; - *is_parse_name = false; - - if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && - !_params.file_attributes.header_type.empty()) { - std::string header_type = to_lower(_params.file_attributes.header_type); - if (header_type == BeConsts::CSV_WITH_NAMES) { - *is_parse_name = true; - } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { - *read_line = 2; - *is_parse_name = true; - } - } - - RETURN_IF_ERROR(_init_options()); - RETURN_IF_ERROR(_create_file_reader(true)); - RETURN_IF_ERROR(_create_decompressor()); - RETURN_IF_ERROR(_create_line_reader()); - return Status::OK(); -} - Status CsvReader::_parse_col_nums(size_t* col_nums) { const uint8_t* ptr = nullptr; size_t size = 0; diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 1f060d18ac3e15..117b5058c522be 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -182,6 +182,7 @@ class CsvReader : public GenericReader { Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; + Status init_schema_reader() override; // get schema of csv file from first one line or first two lines. // if file format is FORMAT_CSV_DEFLATE and if // 1. header_type is empty, get schema from first line. @@ -231,9 +232,6 @@ class CsvReader : public GenericReader { void _init_system_properties(); void _init_file_description(); - // used for parse table schema of csv file. - // Currently, this feature is for table valued function. - Status _prepare_parse(size_t* read_line, bool* is_parse_name); Status _parse_col_nums(size_t* col_nums); Status _parse_col_names(std::vector* col_names); // TODO(ftw): parse type @@ -263,6 +261,9 @@ class CsvReader : public GenericReader { // True if this is a load task bool _is_load = false; bool _line_reader_eof; + // For schema reader + size_t _read_line = 0; + bool _is_parse_name = false; TFileFormatType::type _file_format_type; bool _is_proto_format; TFileCompressType::type _file_compress_type; diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index e32928e4b95de4..c853cae15a6cff 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -45,6 +45,12 @@ class GenericReader : public ProfileCollector { return Status::NotSupported("get_columns is not implemented"); } + // This method is responsible for initializing the resource for parsing schema. + // It will be called before `get_parsed_schema`. + virtual Status init_schema_reader() { + return Status::NotSupported("init_schema_reader is not implemented for this reader."); + } + // `col_types` is always nullable to process illegal values. virtual Status get_parsed_schema(std::vector* col_names, std::vector* col_types) { return Status::NotSupported("get_parsed_schema is not implemented for this reader."); diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index c8969c6d4c314e..489093fe21404b 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -255,18 +255,23 @@ Status NewJsonReader::get_columns(std::unordered_map* col_names, - std::vector* col_types) { +// init decompressor, file reader and line reader for parsing schema +Status NewJsonReader::init_schema_reader() { RETURN_IF_ERROR(_get_range_params()); - + // create decompressor. + // _decompressor may be nullptr if this is not a compressed file + RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); RETURN_IF_ERROR(_open_file_reader(true)); if (_read_json_by_line) { RETURN_IF_ERROR(_open_line_reader()); } - // generate _parsed_jsonpaths and _parsed_json_root RETURN_IF_ERROR(_parse_jsonpath_and_json_root()); + return Status::OK(); +} +Status NewJsonReader::get_parsed_schema(std::vector* col_names, + std::vector* col_types) { bool eof = false; const uint8_t* json_str = nullptr; std::unique_ptr json_str_ptr; diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 31ddc0fa9c9e6f..967a5300529b47 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -93,6 +93,7 @@ class NewJsonReader : public GenericReader { Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; + Status init_schema_reader() override; Status get_parsed_schema(std::vector* col_names, std::vector* col_types) override; diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 35ff0c7561cb28..0f2e4b348572e0 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -359,10 +359,14 @@ Status OrcReader::init_reader( return Status::OK(); } +// init file reader for parsing schema +Status OrcReader::init_schema_reader() { + return _create_file_reader(); +} + Status OrcReader::get_parsed_schema(std::vector* col_names, - std::vector* col_types) { - RETURN_IF_ERROR(_create_file_reader()); - auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); + std::vector* col_types) { + const auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(get_field_name_lower_case(&root_type, i)); col_types->emplace_back(convert_to_doris_type(root_type.getSubtype(i))); @@ -374,7 +378,6 @@ Status OrcReader::get_schema_col_name_attribute(std::vector* col_na std::vector* col_attributes, const std::string& attribute, bool* exist_attribute) { - RETURN_IF_ERROR(_create_file_reader()); *exist_attribute = true; auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 3e2b785cf03812..fc4fba4789c4f7 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -177,6 +177,8 @@ class OrcReader : public GenericReader { Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; + Status init_schema_reader() override; + Status get_parsed_schema(std::vector* col_names, std::vector* col_types) override; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index be149991759399..43245ffecdce02 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -259,12 +259,6 @@ const FieldDescriptor ParquetReader::get_file_metadata_schema() { return _file_metadata->schema(); } -Status ParquetReader::open() { - RETURN_IF_ERROR(_open_file()); - _t_metadata = &(_file_metadata->to_thrift()); - return Status::OK(); -} - void ParquetReader::_init_system_properties() { if (_scan_range.__isset.file_type) { // for compatibility @@ -311,10 +305,8 @@ Status ParquetReader::init_reader( _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; _colname_to_value_range = colname_to_value_range; _hive_use_column_names = hive_use_column_names; - if (_file_metadata == nullptr) { - return Status::InternalError("failed to init parquet reader, please open reader first"); - } - + RETURN_IF_ERROR(_open_file()); + _t_metadata = &(_file_metadata->to_thrift()); SCOPED_RAW_TIMER(&_statistics.parse_meta_time); _total_groups = _t_metadata->row_groups.size(); if (_total_groups == 0) { @@ -491,11 +483,15 @@ Status ParquetReader::set_fill_columns( return Status::OK(); } -Status ParquetReader::get_parsed_schema(std::vector* col_names, - std::vector* col_types) { +// init file reader and file metadata for parsing schema +Status ParquetReader::init_schema_reader() { RETURN_IF_ERROR(_open_file()); - _t_metadata = &_file_metadata->to_thrift(); + _t_metadata = &(_file_metadata->to_thrift()); + return Status::OK(); +} +Status ParquetReader::get_parsed_schema(std::vector* col_names, + std::vector* col_types) { _total_groups = _t_metadata->row_groups.size(); auto schema_desc = _file_metadata->schema(); for (int i = 0; i < schema_desc.size(); ++i) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index e24071093b635c..d189343e82e147 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -107,8 +107,6 @@ class ParquetReader : public GenericReader { // for test void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = file_reader; } - Status open(); - Status init_reader( const std::vector& all_column_names, const std::vector& missing_column_names, @@ -134,6 +132,8 @@ class ParquetReader : public GenericReader { Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; + Status init_schema_reader() override; + Status get_parsed_schema(std::vector* col_names, std::vector* col_types) override; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index c297904ca417b7..cd1bded9eac696 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -160,6 +160,7 @@ Status IcebergTableReader::_equality_delete_base( delete_desc.file_size = -1; std::unique_ptr delete_reader = _create_equality_reader(delete_desc); if (!init_schema) { + RETURN_IF_ERROR(delete_reader->init_schema_reader()); RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names, &equality_delete_col_types)); _generate_equality_delete_block(&_equality_delete_block, equality_delete_col_names, @@ -167,7 +168,6 @@ Status IcebergTableReader::_equality_delete_base( init_schema = true; } if (auto* parquet_reader = typeid_cast(delete_reader.get())) { - RETURN_IF_ERROR(parquet_reader->open()); RETURN_IF_ERROR(parquet_reader->init_reader(equality_delete_col_names, not_in_file_col_names, nullptr, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false)); @@ -446,8 +446,6 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d ParquetReader parquet_delete_reader( _profile, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE, const_cast(&_state->timezone_obj()), _io_ctx, _state); - - RETURN_IF_ERROR(parquet_delete_reader.open()); RETURN_IF_ERROR(parquet_delete_reader.init_reader(delete_file_col_names, {}, nullptr, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false)); @@ -542,6 +540,7 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete Status IcebergParquetReader::get_file_col_id_to_name( bool& exist_schema, std::map& file_col_id_to_name) { auto* parquet_reader = static_cast(_file_format_reader.get()); + RETURN_IF_ERROR(parquet_reader->init_schema_reader()); FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema(); if (field_desc.has_parquet_field_id()) { @@ -561,6 +560,7 @@ Status IcebergOrcReader::get_file_col_id_to_name( std::vector col_names; std::vector col_ids; + RETURN_IF_ERROR(orc_reader->init_schema_reader()); RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute( &col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema)); if (!exist_schema) { diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h index 5834d74efeaced..8da5e74aa1d5a7 100644 --- a/be/src/vec/exec/format/wal/wal_reader.h +++ b/be/src/vec/exec/format/wal/wal_reader.h @@ -24,6 +24,8 @@ namespace doris { namespace vectorized { struct ScannerCounter; class WalReader : public GenericReader { + ENABLE_FACTORY_CREATOR(WalReader); + public: WalReader(RuntimeState* state); ~WalReader() override = default; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 52850d6e8e2b02..fb1910bded6d47 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -129,8 +129,6 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju RETURN_IF_ERROR(VScanner::prepare(state, conjuncts)); _get_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1); - _open_reader_timer = - ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerOpenReaderTime", 1); _cast_to_input_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerCastInputBlockTime", 1); _fill_missing_columns_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), @@ -942,10 +940,6 @@ Status VFileScanner::_get_next_reader() { // ATTN: the push down agg type may be set back to NONE, // see IcebergTableReader::init_row_filters for example. parquet_reader->set_push_down_agg_type(_get_push_down_agg_type()); - { - SCOPED_TIMER(_open_reader_timer); - RETURN_IF_ERROR(parquet_reader->open()); - } if (push_down_predicates) { RETURN_IF_ERROR(_process_late_arrival_conjuncts()); } @@ -1110,12 +1104,12 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_AVRO: { _cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs, range); - init_status = ((AvroJNIReader*)(_cur_reader.get())) - ->init_fetch_table_reader(_colname_to_value_range); + init_status = + ((AvroJNIReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range); break; } case TFileFormatType::FORMAT_WAL: { - _cur_reader.reset(new WalReader(_state)); + _cur_reader = WalReader::create_unique(_state); init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc); break; } diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 5b1604209d49d0..6a605da3dcac14 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -173,7 +173,6 @@ class VFileScanner : public VScanner { private: RuntimeProfile::Counter* _get_block_timer = nullptr; - RuntimeProfile::Counter* _open_reader_timer = nullptr; RuntimeProfile::Counter* _cast_to_input_block_timer = nullptr; RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr; RuntimeProfile::Counter* _pre_filter_timer = nullptr; diff --git a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp index 423adfd41cecab..afa4d7f9d5fabe 100644 --- a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp @@ -150,7 +150,6 @@ TEST_F(ParquetReaderTest, normal) { runtime_state.set_desc_tbl(desc_tbl); std::unordered_map colname_to_value_range; - static_cast(p_reader->open()); static_cast(p_reader->init_reader(column_names, missing_column_names, nullptr, {}, nullptr, nullptr, nullptr, nullptr, nullptr)); std::unordered_map> @@ -231,7 +230,6 @@ TEST_F(ParquetReaderTest, use_column_name) { colname_to_value_range.emplace("smallint_col", ColumnValueRange("smallint_col")); colname_to_value_range.emplace("int_col", ColumnValueRange("int_col")); - static_cast(p_reader->open()); static_cast(p_reader->init_reader(table_column_names, {}, &colname_to_value_range, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false, use_column_name)); @@ -271,7 +269,6 @@ TEST_F(ParquetReaderTest, use_column_name2) { colname_to_value_range.emplace("smallint_col", ColumnValueRange("smallint_col")); colname_to_value_range.emplace("int_col", ColumnValueRange("int_col")); - static_cast(p_reader->open()); static_cast(p_reader->init_reader(table_column_names, {"boolean_col"}, &colname_to_value_range, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false, use_column_name)); @@ -314,7 +311,6 @@ TEST_F(ParquetReaderTest, use_column_idx) { colname_to_value_range.emplace("col3", ColumnValueRange("col3")); colname_to_value_range.emplace("col102", ColumnValueRange("col102")); - static_cast(p_reader->open()); static_cast(p_reader->init_reader(table_column_names, {}, &colname_to_value_range, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false, use_column_name)); diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/json_format_test/simple_object_json.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..8a6db90241ffc25e12d351e459c1f4ce374289af GIT binary patch literal 211 zcmV;^04)C>iwFo!J2H?9- zVL0y~h^TuTrVLmN9ya2p@!f}6QoGdcKth_2KP_FD@7X!$r<`^E@k&>AG5+R*Wm{?N zO)7~}R;uP}wtTce`tEeFPJ6-y@Dgzw6QA7}i7&y+uXq#YbO@KgE5zNe^a{8_Jm_#Z zssddjJ;fAHYz?+Swo5(Rp7zmzZjpZagWwh@FYt)wYUpqCV%KPyAr1qf`y#pc6TH1$ Nvp1P-u Date: Fri, 4 Jul 2025 10:42:50 +0800 Subject: [PATCH 2/2] fix --- be/src/vec/exec/format/csv/csv_reader.cpp | 2 +- be/src/vec/exec/format/json/new_json_reader.cpp | 2 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 2 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 83625457882c6c..6ce31b5956109a 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -432,7 +432,7 @@ Status CsvReader::init_schema_reader() { } Status CsvReader::get_parsed_schema(std::vector* col_names, - std::vector* col_types) { + std::vector* col_types) { if (_read_line == 1) { if (!_is_parse_name) { //parse csv file without names and types size_t col_nums = 0; diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 489093fe21404b..173bb2b4dc48cf 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -271,7 +271,7 @@ Status NewJsonReader::init_schema_reader() { } Status NewJsonReader::get_parsed_schema(std::vector* col_names, - std::vector* col_types) { + std::vector* col_types) { bool eof = false; const uint8_t* json_str = nullptr; std::unique_ptr json_str_ptr; diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 0f2e4b348572e0..2c10c9ff29c967 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -365,7 +365,7 @@ Status OrcReader::init_schema_reader() { } Status OrcReader::get_parsed_schema(std::vector* col_names, - std::vector* col_types) { + std::vector* col_types) { const auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(get_field_name_lower_case(&root_type, i)); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 43245ffecdce02..77ebd8d32cd79e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -491,7 +491,7 @@ Status ParquetReader::init_schema_reader() { } Status ParquetReader::get_parsed_schema(std::vector* col_names, - std::vector* col_types) { + std::vector* col_types) { _total_groups = _t_metadata->row_groups.size(); auto schema_desc = _file_metadata->schema(); for (int i = 0; i < schema_desc.size(); ++i) {