diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index 648155187e1725..770d4de9894206 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -56,7 +56,7 @@ Status AvroJNIReader::get_columns(std::unordered_map* colname_to_value_range) { + const std::unordered_map* colname_to_value_range) { _colname_to_value_range = colname_to_value_range; std::ostringstream required_fields; std::ostringstream columns_types; diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h b/be/src/vec/exec/format/avro/avro_jni_reader.h index 8e956ac714322b..26511b0ab57223 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.h +++ b/be/src/vec/exec/format/avro/avro_jni_reader.h @@ -68,7 +68,7 @@ class AvroJNIReader : public JniReader { std::unordered_set* missing_cols) override; Status init_fetch_table_reader( - std::unordered_map* colname_to_value_range); + const std::unordered_map* colname_to_value_range); TFileType::type get_file_type() const; @@ -82,7 +82,7 @@ class AvroJNIReader : public JniReader { private: const TFileScanRangeParams _params; const TFileRangeDesc _range; - std::unordered_map* _colname_to_value_range = nullptr; + const std::unordered_map* _colname_to_value_range = nullptr; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/jni_reader.cpp b/be/src/vec/exec/format/jni_reader.cpp index 927b3cc2edd227..49d676c2d88ecb 100644 --- a/be/src/vec/exec/format/jni_reader.cpp +++ b/be/src/vec/exec/format/jni_reader.cpp @@ -80,7 +80,7 @@ Status MockJniReader::get_columns(std::unordered_map* colname_to_value_range) { + const std::unordered_map* colname_to_value_range) { _colname_to_value_range = colname_to_value_range; RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); return _jni_connector->open(_state, _profile); diff --git a/be/src/vec/exec/format/jni_reader.h b/be/src/vec/exec/format/jni_reader.h index 17f18a12f130df..323f6746c5688f 100644 --- a/be/src/vec/exec/format/jni_reader.h +++ b/be/src/vec/exec/format/jni_reader.h @@ -89,7 +89,7 @@ class MockJniReader : public JniReader { std::unordered_set* missing_cols) override; Status init_reader( - std::unordered_map* colname_to_value_range); + const std::unordered_map* colname_to_value_range); Status close() override { if (_jni_connector) { @@ -106,7 +106,7 @@ class MockJniReader : public JniReader { } private: - std::unordered_map* _colname_to_value_range; + const std::unordered_map* _colname_to_value_range; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index ee34bd8c3fd3a3..8d566d53bf2e6f 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -315,7 +315,7 @@ Status OrcReader::_create_file_reader() { Status OrcReader::init_reader( const std::vector* column_names, const std::vector& missing_column_names, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 0542d1398b241c..9ebc416e619ffc 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -138,7 +138,7 @@ class OrcReader : public GenericReader { Status init_reader( const std::vector* column_names, const std::vector& missing_column_names, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -631,7 +631,7 @@ class OrcReader : public GenericReader { std::vector _decimal_scale_params; size_t _decimal_scale_params_index; - std::unordered_map* _colname_to_value_range; + const std::unordered_map* _colname_to_value_range = nullptr; bool _is_acid = false; std::unique_ptr _filter; LazyReadContext _lazy_read_ctx; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp index 640b036a40fbf1..0b9369017140c1 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp @@ -57,7 +57,7 @@ Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index, } Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index, - ColumnValueRangeType& col_val_range, + const ColumnValueRangeType& col_val_range, const FieldSchema* col_schema, std::vector& skipped_ranges, const cctz::time_zone& ctz) { diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h index 92725f5bb76772..dee2d26456e6d7 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h @@ -47,7 +47,7 @@ class PageIndex { Status create_skipped_row_range(tparquet::OffsetIndex& offset_index, int64_t total_rows_of_group, int page_idx, RowRange* row_range); Status collect_skipped_page_range(tparquet::ColumnIndex* column_index, - ColumnValueRangeType& col_val_range, + const ColumnValueRangeType& col_val_range, const FieldSchema* col_schema, std::vector& skipped_ranges, const cctz::time_zone& ctz); bool check_and_get_page_index_ranges(const std::vector& columns); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 3083fd61ab03b3..3c302553431219 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -297,7 +297,7 @@ void ParquetReader::iceberg_sanitize(const std::vector& read_column Status ParquetReader::init_reader( const std::vector& all_column_names, const std::vector& missing_column_names, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, @@ -349,7 +349,6 @@ Status ParquetReader::init_reader( } } else { - std::unordered_map new_colname_to_value_range; const auto& table_column_idxs = _scan_params.column_idxs; std::map table_col_id_to_idx; for (int i = 0; i < table_column_idxs.size(); i++) { @@ -363,21 +362,15 @@ Status ParquetReader::init_reader( auto& table_col = all_column_names[idx]; auto file_col = schema_desc.get_column(id)->name; _read_columns.emplace_back(file_col); + _table_col_to_file_col[table_col] = file_col; - if (table_col != file_col) { - _table_col_to_file_col[table_col] = file_col; - auto iter = _colname_to_value_range->find(table_col); - if (iter != _colname_to_value_range->end()) { - continue; - } - new_colname_to_value_range[file_col] = iter->second; - _colname_to_value_range->erase(iter->first); + auto iter = _colname_to_value_range->find(table_col); + if (iter != _colname_to_value_range->end()) { + _colname_to_value_range_index_read.emplace(file_col, iter->second); } } } - for (auto it : new_colname_to_value_range) { - _colname_to_value_range->emplace(it.first, std::move(it.second)); - } + _colname_to_value_range = &_colname_to_value_range_index_read; } // build column predicates for column lazy read _lazy_read_ctx.conjuncts = conjuncts; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index ef0d5d27bd659b..99e16fa7741ed2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -111,7 +111,7 @@ class ParquetReader : public GenericReader { Status init_reader( const std::vector& all_column_names, const std::vector& missing_column_names, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, @@ -250,7 +250,12 @@ class ParquetReader : public GenericReader { size_t _total_groups; // num of groups(stripes) of a parquet(orc) file // table column name to file column name map. For iceberg schema evolution. std::unordered_map _table_col_to_file_col; - std::unordered_map* _colname_to_value_range = nullptr; + const std::unordered_map* _colname_to_value_range = nullptr; + + // During initialization, multiple vfile_scanner's _colname_to_value_range will point to the same object, + // so the content in the object cannot be modified (there is a multi-threading problem). + // _colname_to_value_range_index_read used when _hive_use_column_names = false. + std::unordered_map _colname_to_value_range_index_read; std::vector _read_columns; RowRange _whole_range = RowRange(0, 0); const std::vector* _delete_rows = nullptr; diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp b/be/src/vec/exec/format/table/hudi_jni_reader.cpp index 261dcd82a9bf04..3bdb8eb0d7ddb4 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp @@ -95,7 +95,7 @@ Status HudiJniReader::get_columns(std::unordered_map* colname_to_value_range) { + const std::unordered_map* colname_to_value_range) { _colname_to_value_range = colname_to_value_range; RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); return _jni_connector->open(_state, _profile); diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h b/be/src/vec/exec/format/table/hudi_jni_reader.h index 363e024dc9f4ae..5242de44ea3ddd 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.h +++ b/be/src/vec/exec/format/table/hudi_jni_reader.h @@ -58,12 +58,12 @@ class HudiJniReader : public JniReader { std::unordered_set* missing_cols) override; Status init_reader( - std::unordered_map* colname_to_value_range); + const std::unordered_map* colname_to_value_range); private: const TFileScanRangeParams& _scan_params; const THudiFileDesc& _hudi_params; - std::unordered_map* _colname_to_value_range; + const std::unordered_map* _colname_to_value_range; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/table/hudi_reader.cpp b/be/src/vec/exec/format/table/hudi_reader.cpp index aad4f810ffe50e..6caeb87badbe9e 100644 --- a/be/src/vec/exec/format/table/hudi_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_reader.cpp @@ -52,7 +52,7 @@ Status HudiReader::get_next_block_inner(Block* block, size_t* read_rows, bool* e Status HudiOrcReader::init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -71,7 +71,7 @@ Status HudiOrcReader::init_reader( Status HudiParquetReader::init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, diff --git a/be/src/vec/exec/format/table/hudi_reader.h b/be/src/vec/exec/format/table/hudi_reader.h index cd063d9246271d..2779f296f66266 100644 --- a/be/src/vec/exec/format/table/hudi_reader.h +++ b/be/src/vec/exec/format/table/hudi_reader.h @@ -53,7 +53,8 @@ class HudiOrcReader final : public HudiReader { Status init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* + table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -72,7 +73,8 @@ class HudiParquetReader final : public HudiReader { Status init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* + table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 000b1b3ef8e94d..6c948315ac07c8 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -420,7 +420,7 @@ void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFil Status IcebergParquetReader::init_reader( const std::vector& file_col_names, const std::unordered_map& col_id_name_map, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, @@ -490,7 +490,7 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d Status IcebergOrcReader::init_reader( const std::vector& file_col_names, const std::unordered_map& col_id_name_map, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index b609076840a24a..4292af6a1d58b8 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -131,7 +131,6 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel ShardedKVCache* _kv_cache; IcebergProfile _iceberg_profile; std::vector _iceberg_delete_rows; - std::vector _expand_col_names; std::vector _expand_columns; @@ -170,7 +169,7 @@ class IcebergParquetReader final : public IcebergTableReader { Status init_reader( const std::vector& file_col_names, const std::unordered_map& col_id_name_map, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, @@ -217,7 +216,7 @@ class IcebergOrcReader final : public IcebergTableReader { Status init_reader( const std::vector& file_col_names, const std::unordered_map& col_id_name_map, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp index 887a793b098232..25c25f637b6f8d 100644 --- a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp +++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp @@ -73,7 +73,7 @@ Status LakeSoulJniReader::get_columns(std::unordered_map* colname_to_value_range) { + const std::unordered_map* colname_to_value_range) { _colname_to_value_range = colname_to_value_range; RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); return _jni_connector->open(_state, _profile); diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.h b/be/src/vec/exec/format/table/lakesoul_jni_reader.h index 38f3aaeb27e986..3e5e796e5ed0d1 100644 --- a/be/src/vec/exec/format/table/lakesoul_jni_reader.h +++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.h @@ -58,14 +58,14 @@ class LakeSoulJniReader : public ::doris::vectorized::GenericReader { std::unordered_set* missing_cols) override; Status init_reader( - std::unordered_map* colname_to_value_range); + const std::unordered_map* colname_to_value_range); private: const TLakeSoulFileDesc& _lakesoul_params; const std::vector& _file_slot_descs; RuntimeState* _state; RuntimeProfile* _profile; - std::unordered_map* _colname_to_value_range; + const std::unordered_map* _colname_to_value_range; std::unique_ptr<::doris::vectorized::JniConnector> _jni_connector; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp index 229468cec54714..9eb270d8e8e8a5 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp @@ -99,7 +99,7 @@ Status MaxComputeJniReader::get_columns( } Status MaxComputeJniReader::init_reader( - std::unordered_map* colname_to_value_range) { + const std::unordered_map* colname_to_value_range) { _colname_to_value_range = colname_to_value_range; RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); return _jni_connector->open(_state, _profile); diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.h b/be/src/vec/exec/format/table/max_compute_jni_reader.h index f59fc1a5f77ab7..3dfeb7ba66787e 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.h +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.h @@ -63,13 +63,13 @@ class MaxComputeJniReader : public JniReader { std::unordered_set* missing_cols) override; Status init_reader( - std::unordered_map* colname_to_value_range); + const std::unordered_map* colname_to_value_range); private: const MaxComputeTableDescriptor* _table_desc = nullptr; const TMaxComputeFileDesc& _max_compute_params; const TFileRangeDesc& _range; - std::unordered_map* _colname_to_value_range = nullptr; + const std::unordered_map* _colname_to_value_range = nullptr; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index a05ea4511f47d8..76d2564aedfd83 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -112,7 +112,7 @@ Status PaimonJniReader::get_columns(std::unordered_map* colname_to_value_range) { + const std::unordered_map* colname_to_value_range) { _colname_to_value_range = colname_to_value_range; RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); return _jni_connector->open(_state, _profile); diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h index b5744428392ece..b29423e58d2222 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -63,10 +63,10 @@ class PaimonJniReader : public JniReader { std::unordered_set* missing_cols) override; Status init_reader( - std::unordered_map* colname_to_value_range); + const std::unordered_map* colname_to_value_range); private: - std::unordered_map* _colname_to_value_range; + const std::unordered_map* _colname_to_value_range; int64_t _remaining_table_level_row_count; }; diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_reader.h index c11e994538c61e..4133deed734ff5 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_reader.h @@ -69,7 +69,8 @@ class PaimonOrcReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* + table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -103,7 +104,8 @@ class PaimonParquetReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* + table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index 177ad4b0977bae..d134ba89640de8 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -53,7 +53,7 @@ TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr Status TransactionalHiveReader::init_reader( const std::vector& column_names, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h index f28d954f4d16ba..29cdde4dd6e0b0 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.h +++ b/be/src/vec/exec/format/table/transactional_hive_reader.h @@ -92,7 +92,7 @@ class TransactionalHiveReader : public TableFormatReader { Status init_reader( const std::vector& column_names, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp index c2beca0471c416..f2aa0471c0fca0 100644 --- a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp +++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp @@ -77,7 +77,7 @@ TrinoConnectorJniReader::TrinoConnectorJniReader( } Status TrinoConnectorJniReader::init_reader( - std::unordered_map* colname_to_value_range) { + const std::unordered_map* colname_to_value_range) { RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); RETURN_IF_ERROR(_set_spi_plugins_dir()); return _jni_connector->open(_state, _profile); diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.h b/be/src/vec/exec/format/table/trino_connector_jni_reader.h index 3d0e7be4e7b017..66e651e74caacb 100644 --- a/be/src/vec/exec/format/table/trino_connector_jni_reader.h +++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.h @@ -56,7 +56,7 @@ class TrinoConnectorJniReader : public JniReader { std::unordered_set* missing_cols) override; Status init_reader( - std::unordered_map* colname_to_value_range); + const std::unordered_map* colname_to_value_range); private: Status _set_spi_plugins_dir(); diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 4b5bb72e57bfbd..de579731858707 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -93,7 +93,7 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { } Status JniConnector::init( - std::unordered_map* colname_to_value_range) { + const std::unordered_map* colname_to_value_range) { // TODO: This logic need to be changed. // See the comment of "predicates" field in JniScanner.java @@ -421,7 +421,7 @@ Status JniConnector::_fill_struct_column(TableMetaAddress& address, MutableColum } void JniConnector::_generate_predicates( - std::unordered_map* colname_to_value_range) { + const std::unordered_map* colname_to_value_range) { if (colname_to_value_range == nullptr) { return; } diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index 0c8d27efc02df1..5f1f69fe7b8335 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -222,7 +222,8 @@ class JniConnector : public ProfileCollector { * number_filters(4) | length(4) | column_name | op(4) | scale(4) | num_values(4) | value_length(4) | value | ... * Then, pass the byte array address in configuration map, like "push_down_predicates=${address}" */ - Status init(std::unordered_map* colname_to_value_range); + Status init( + const std::unordered_map* colname_to_value_range); /** * Call java side function JniScanner.getNextBatchMeta. The columns information are stored as long array: @@ -353,7 +354,7 @@ class JniConnector : public ProfileCollector { } void _generate_predicates( - std::unordered_map* colname_to_value_range); + const std::unordered_map* colname_to_value_range); template void _parse_value_range(const ColumnValueRange& col_val_range, diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 721cddc9440f75..a9b8a9cb8f851f 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -97,7 +97,7 @@ FileScanner::FileScanner( RuntimeState* state, pipeline::FileScanLocalState* local_state, int64_t limit, std::shared_ptr split_source, RuntimeProfile* profile, ShardedKVCache* kv_cache, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const std::unordered_map* colname_to_slot_id) : Scanner(state, local_state, limit, profile), _split_source(split_source), diff --git a/be/src/vec/exec/scan/file_scanner.h b/be/src/vec/exec/scan/file_scanner.h index ed3a269cdcfe42..15a9b89a3acdf7 100644 --- a/be/src/vec/exec/scan/file_scanner.h +++ b/be/src/vec/exec/scan/file_scanner.h @@ -65,7 +65,7 @@ class FileScanner : public Scanner { FileScanner(RuntimeState* state, pipeline::FileScanLocalState* parent, int64_t limit, std::shared_ptr split_source, RuntimeProfile* profile, ShardedKVCache* kv_cache, - std::unordered_map* colname_to_value_range, + const std::unordered_map* colname_to_value_range, const std::unordered_map* colname_to_slot_id); Status open(RuntimeState* state) override; @@ -104,7 +104,7 @@ class FileScanner : public Scanner { std::unique_ptr _cur_reader; bool _cur_reader_eof; - std::unordered_map* _colname_to_value_range = nullptr; + const std::unordered_map* _colname_to_value_range = nullptr; // File source slot descriptors std::vector _file_slot_descs; // col names from _file_slot_descs diff --git a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp index cbc6c0ee3d7a11..972d3e5c27c4b3 100644 --- a/be/test/vec/exec/format/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/format/parquet/parquet_reader_test.cpp @@ -56,10 +56,9 @@ class ParquetReaderTest : public testing::Test { ParquetReaderTest() {} }; -TEST_F(ParquetReaderTest, normal) { - TDescriptorTable t_desc_table; - TTableDescriptor t_table_desc; - +static void create_table_desc(TDescriptorTable& t_desc_table, TTableDescriptor& t_table_desc, + std::vector table_column_names, + std::vector types) { t_table_desc.id = 0; t_table_desc.tableType = TTableType::OLAP_TABLE; t_table_desc.numCols = 0; @@ -68,10 +67,7 @@ TEST_F(ParquetReaderTest, normal) { t_desc_table.__isset.tableDescriptors = true; // init boolean and numeric slot - std::vector numeric_types = {"boolean_col", "tinyint_col", "smallint_col", - "int_col", "bigint_col", "float_col", - "double_col"}; - for (int i = 0; i < numeric_types.size(); i++) { + for (int i = 0; i < table_column_names.size(); i++) { TSlotDescriptor tslot_desc; { tslot_desc.id = i; @@ -81,7 +77,7 @@ TEST_F(ParquetReaderTest, normal) { TTypeNode node; node.__set_type(TTypeNodeType::SCALAR); TScalarType scalar_type; - scalar_type.__set_type(TPrimitiveType::type(i + 2)); + scalar_type.__set_type(types[i]); node.__set_scalar_type(scalar_type); type.types.push_back(node); } @@ -90,7 +86,7 @@ TEST_F(ParquetReaderTest, normal) { tslot_desc.byteOffset = 0; tslot_desc.nullIndicatorByte = 0; tslot_desc.nullIndicatorBit = -1; - tslot_desc.colName = numeric_types[i]; + tslot_desc.colName = table_column_names[i]; tslot_desc.slotIdx = 0; tslot_desc.isMaterialized = true; t_desc_table.slotDescriptors.push_back(tslot_desc); @@ -108,6 +104,19 @@ TEST_F(ParquetReaderTest, normal) { t_tuple_desc.__isset.tableId = true; t_desc_table.tupleDescriptors.push_back(t_tuple_desc); } +}; + +TEST_F(ParquetReaderTest, normal) { + TDescriptorTable t_desc_table; + TTableDescriptor t_table_desc; + std::vector table_column_names = {"boolean_col", "tinyint_col", "smallint_col", + "int_col", "bigint_col", "float_col", + "double_col"}; + std::vector table_column_types = { + TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT, TPrimitiveType::SMALLINT, + TPrimitiveType::INT, TPrimitiveType::BIGINT, TPrimitiveType::FLOAT, + TPrimitiveType::DOUBLE}; + create_table_desc(t_desc_table, t_table_desc, table_column_names, table_column_types); DescriptorTbl* desc_tbl; ObjectPool obj_pool; static_cast(DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl)); @@ -164,5 +173,163 @@ TEST_F(ParquetReaderTest, normal) { delete p_reader; } +static ParquetReader* create_parquet_reader(TFileScanRangeParams& scan_params, + std::vector table_column_names, + std::vector types) { + TDescriptorTable t_desc_table; + TTableDescriptor t_table_desc; + + create_table_desc(t_desc_table, t_table_desc, table_column_names, types); + DescriptorTbl* desc_tbl; + ObjectPool obj_pool; + static_cast(DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl)); + + auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots(); + auto local_fs = io::global_local_filesystem(); + io::FileReaderSPtr reader; + static_cast(local_fs->open_file( + "./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", &reader)); + + cctz::time_zone ctz; + TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); + std::vector column_names; + std::vector missing_column_names; + for (int i = 0; i < slot_descs.size(); i++) { + column_names.push_back(slot_descs[i]->col_name()); + } + TFileRangeDesc scan_range; + { + scan_range.start_offset = 0; + scan_range.size = 1000; + } + auto p_reader = + new ParquetReader(nullptr, scan_params, scan_range, 992, &ctz, nullptr, nullptr); + p_reader->set_file_reader(reader); + return p_reader; +} + +TEST_F(ParquetReaderTest, use_column_name) { + bool use_column_name = true; + + std::vector table_column_names = {"boolean_col", "tinyint_col", "smallint_col", + "int_col", "bigint_col", "float_col", + "double_col"}; + std::vector table_column_types = { + TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT, TPrimitiveType::SMALLINT, + TPrimitiveType::INT, TPrimitiveType::BIGINT, TPrimitiveType::FLOAT, + TPrimitiveType::DOUBLE}; + TFileScanRangeParams scan_params; + + auto p_reader = create_parquet_reader(scan_params, table_column_names, table_column_types); + std::unordered_map colname_to_value_range; + colname_to_value_range.emplace("boolean_col", ColumnValueRange("boolean_col")); + colname_to_value_range.emplace("tinyint_col", ColumnValueRange("tinyint_col")); + colname_to_value_range.emplace("smallint_col", ColumnValueRange("smallint_col")); + colname_to_value_range.emplace("int_col", ColumnValueRange("int_col")); + + static_cast(p_reader->open()); + static_cast(p_reader->init_reader(table_column_names, {}, &colname_to_value_range, {}, + nullptr, nullptr, nullptr, nullptr, nullptr, false, + use_column_name)); + + std::vector read_columns_ans = {"tinyint_col", "smallint_col", "int_col", + "bigint_col", "boolean_col", "float_col", + "double_col"}; + EXPECT_EQ(p_reader->_read_columns, read_columns_ans); + + std::vector miss_columns_ans = {}; + EXPECT_EQ(p_reader->_missing_cols, miss_columns_ans); + std::vector colname_to_value_range_names_ans = {"tinyint_col", "smallint_col", + "int_col", "boolean_col"}; + for (auto col : colname_to_value_range_names_ans) { + EXPECT_TRUE(p_reader->_colname_to_value_range->contains(col)); + } + EXPECT_EQ(p_reader->_colname_to_value_range->size(), colname_to_value_range_names_ans.size()); + delete p_reader; +} + +TEST_F(ParquetReaderTest, use_column_name2) { + bool use_column_name = true; + + std::vector table_column_names = {"boolean_col", "tinyint_col", "smallint_col", + "int_col", "bigint_col", "float_col", + "test1", "double_col", "test2"}; + std::vector table_column_types = { + TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT, TPrimitiveType::SMALLINT, + TPrimitiveType::INT, TPrimitiveType::BIGINT, TPrimitiveType::FLOAT, + TPrimitiveType::FLOAT, TPrimitiveType::DOUBLE, TPrimitiveType::DOUBLE}; + TFileScanRangeParams scan_params; + + auto p_reader = create_parquet_reader(scan_params, table_column_names, table_column_types); + std::unordered_map colname_to_value_range; + colname_to_value_range.emplace("boolean_col", ColumnValueRange("boolean_col")); + colname_to_value_range.emplace("tinyint_col", ColumnValueRange("tinyint_col")); + colname_to_value_range.emplace("smallint_col", ColumnValueRange("smallint_col")); + colname_to_value_range.emplace("int_col", ColumnValueRange("int_col")); + + static_cast(p_reader->open()); + static_cast(p_reader->init_reader(table_column_names, {"boolean_col"}, + &colname_to_value_range, {}, nullptr, nullptr, nullptr, + nullptr, nullptr, false, use_column_name)); + + std::vector read_columns_ans = {"tinyint_col", "smallint_col", "int_col", + "bigint_col", "float_col", "double_col"}; + EXPECT_EQ(p_reader->_read_columns, read_columns_ans); + + std::vector miss_columns_ans = {"boolean_col", "test1", "test2"}; + EXPECT_EQ(p_reader->_missing_cols, miss_columns_ans); + std::vector colname_to_value_range_names_ans = {"tinyint_col", "smallint_col", + "int_col", "boolean_col"}; + for (auto col : colname_to_value_range_names_ans) { + EXPECT_TRUE(p_reader->_colname_to_value_range->contains(col)); + } + EXPECT_EQ(p_reader->_colname_to_value_range->size(), colname_to_value_range_names_ans.size()); + delete p_reader; +} + +TEST_F(ParquetReaderTest, use_column_idx) { + bool use_column_name = false; + + std::vector table_column_names = {"col0", "col1", "col3", + "col7", "col100", "col102"}; + std::vector table_column_types = { + TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT, TPrimitiveType::SMALLINT, + TPrimitiveType::INT, TPrimitiveType::BIGINT, TPrimitiveType::BIGINT}; + TFileScanRangeParams scan_params; + scan_params.column_idxs.emplace_back(0); + scan_params.column_idxs.emplace_back(1); + scan_params.column_idxs.emplace_back(3); + scan_params.column_idxs.emplace_back(7); + scan_params.column_idxs.emplace_back(100); + scan_params.column_idxs.emplace_back(102); + + auto p_reader = create_parquet_reader(scan_params, table_column_names, table_column_types); + std::unordered_map colname_to_value_range; + colname_to_value_range.emplace("col0", ColumnValueRange("col0")); + colname_to_value_range.emplace("col1", ColumnValueRange("col1")); + colname_to_value_range.emplace("col3", ColumnValueRange("col3")); + colname_to_value_range.emplace("col102", ColumnValueRange("col102")); + + static_cast(p_reader->open()); + static_cast(p_reader->init_reader(table_column_names, {}, &colname_to_value_range, {}, + nullptr, nullptr, nullptr, nullptr, nullptr, false, + use_column_name)); + + std::vector read_columns_ans = {"tinyint_col", "smallint_col", "bigint_col", + "string_col"}; + EXPECT_EQ(p_reader->_read_columns, read_columns_ans); + + std::vector miss_columns_ans = {"col100", "col102"}; + EXPECT_EQ(p_reader->_missing_cols, miss_columns_ans); + + std::vector colname_to_value_range_names_ans = {"tinyint_col", "smallint_col", + "bigint_col"}; + for (auto col : colname_to_value_range_names_ans) { + EXPECT_TRUE(p_reader->_colname_to_value_range->contains(col)); + } + EXPECT_EQ(p_reader->_colname_to_value_range->size(), colname_to_value_range_names_ans.size()); + delete p_reader; +} + } // namespace vectorized } // namespace doris