From 77f38b19c3bc91a0deebf3c07f627c5d4a253d3a Mon Sep 17 00:00:00 2001 From: daidai Date: Fri, 21 Mar 2025 11:46:39 +0800 Subject: [PATCH 1/3] [enhancement](hudi)support native read hudi top level schema change table. (#49051) Similar to pr #48723 Problem Summary: 1. Supports native reader reading tables after the top-level schema of hudi is changed, but does not support tables after the internal schema of struct is changed. change internal schema of struct schema(not support, will support in the next PR). 2. Unify the logic of iceberg/paimon/hudi native reader to handle schema change's table. --- be/src/vec/exec/format/orc/vorc_reader.cpp | 21 +- be/src/vec/exec/format/orc/vorc_reader.h | 9 +- be/src/vec/exec/format/parquet/schema_desc.h | 4 +- .../exec/format/parquet/vparquet_reader.cpp | 1 + be/src/vec/exec/format/table/hudi_reader.cpp | 92 ++++ be/src/vec/exec/format/table/hudi_reader.h | 83 +++ .../vec/exec/format/table/iceberg_reader.cpp | 202 ++------ be/src/vec/exec/format/table/iceberg_reader.h | 37 +- .../vec/exec/format/table/paimon_reader.cpp | 82 +-- be/src/vec/exec/format/table/paimon_reader.h | 46 +- .../exec/format/table/table_format_reader.cpp | 133 +++++ .../exec/format/table/table_format_reader.h | 45 +- .../table/transactional_hive_reader.cpp | 16 +- .../format/table/transactional_hive_reader.h | 3 - be/src/vec/exec/scan/vfile_scanner.cpp | 24 +- be/src/vec/exec/scan/vfile_scanner.h | 2 +- .../paimon/paimon_schema_change_test.cpp | 31 +- .../table/table_schema_change_helper_test.cpp | 471 ++++++++++++++++++ be/test/vec/exec/orc_reader_test.cpp | 4 +- .../iceberg/run10.sql | 48 ++ .../hive/HiveMetaStoreClientHelper.java | 14 + .../hudi/source/COWIncrementalRelation.java | 26 +- .../datasource/hudi/source/HudiScanNode.java | 51 +- .../paimon/source/PaimonScanNode.java | 11 +- gensrc/thrift/PlanNodes.thrift | 3 +- .../iceberg/iceberg_schema_change2.out | 65 +++ .../hudi/test_hudi_schema_change.out | 259 ++++++++++ .../iceberg/iceberg_schema_change2.groovy | 66 +++ .../hudi/test_hudi_schema_change.groovy | 156 ++++++ 29 files changed, 1648 insertions(+), 357 deletions(-) create mode 100644 be/src/vec/exec/format/table/hudi_reader.cpp create mode 100644 be/src/vec/exec/format/table/hudi_reader.h create mode 100644 be/src/vec/exec/format/table/table_format_reader.cpp create mode 100644 be/test/vec/exec/format/table/table_schema_change_helper_test.cpp create mode 100644 docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run10.sql create mode 100644 regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out create mode 100644 regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out create mode 100644 regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy create mode 100644 regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index b53ce3a0a50619..25d552ae9315ff 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -313,6 +313,7 @@ Status OrcReader::_create_file_reader() { Status OrcReader::init_reader( const std::vector* column_names, + const std::vector& missing_column_names, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -320,6 +321,7 @@ Status OrcReader::init_reader( const std::unordered_map* slot_id_to_filter_conjuncts, const bool hive_use_column_names) { _column_names = column_names; + _missing_column_names_set.insert(missing_column_names.begin(), missing_column_names.end()); _colname_to_value_range = colname_to_value_range; _lazy_read_ctx.conjuncts = conjuncts; _is_acid = is_acid; @@ -363,14 +365,20 @@ Status OrcReader::get_parsed_schema(std::vector* col_names, } Status OrcReader::get_schema_col_name_attribute(std::vector* col_names, - std::vector* col_attributes, - std::string attribute) { + std::vector* col_attributes, + const std::string& attribute, + bool* exist_attribute) { RETURN_IF_ERROR(_create_file_reader()); - auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); + *exist_attribute = true; for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(get_field_name_lower_case(&root_type, i)); + + if (!root_type.getSubtype(i)->hasAttributeKey(attribute)) { + *exist_attribute = false; + return Status::OK(); + } col_attributes->emplace_back( - std::stol(root_type.getSubtype(i)->getAttributeValue(attribute))); + std::stoi(root_type.getSubtype(i)->getAttributeValue(attribute))); } return Status::OK(); } @@ -388,6 +396,11 @@ Status OrcReader::_init_read_columns() { _scan_params.__isset.slot_name_to_schema_pos; for (size_t i = 0; i < _column_names->size(); ++i) { auto& col_name = (*_column_names)[i]; + if (_missing_column_names_set.contains(col_name)) { + _missing_cols.emplace_back(col_name); + continue; + } + if (_is_hive1_orc_or_use_idx) { auto iter = _scan_params.slot_name_to_schema_pos.find(col_name); if (iter != _scan_params.slot_name_to_schema_pos.end()) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 98c31645b41183..7968c6fe9b2751 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -139,6 +139,8 @@ class OrcReader : public GenericReader { Status init_reader( const std::vector* column_names, const std::unordered_map* colname_to_value_range, + const std::vector& missing_column_names, + const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -178,8 +180,8 @@ class OrcReader : public GenericReader { std::vector* col_types) override; Status get_schema_col_name_attribute(std::vector* col_names, - std::vector* col_attributes, - std::string attribute); + std::vector* col_attributes, + const std::string& attribute, bool* exist_attribute); void set_table_col_to_file_col( std::unordered_map table_col_to_file_col) { _table_col_to_file_col = table_col_to_file_col; @@ -577,6 +579,9 @@ class OrcReader : public GenericReader { int64_t _range_size; const std::string& _ctz; const std::vector* _column_names; + // _missing_column_names_set: used in iceberg/hudi/paimon, the columns are dropped + // but added back(drop column a then add column a). Shouldn't read this column data in this case. + std::set _missing_column_names_set; int32_t _offset_days = 0; cctz::time_zone _time_zone; diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index 2593da837c3da6..d1aa8d4c6eda29 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -71,7 +71,7 @@ class FieldDescriptor { std::unordered_map _name_to_field; // Used in from_thrift, marking the next schema position that should be parsed size_t _next_schema_pos; - std::unordered_map _field_id_name_mapping; + std::map _field_id_name_mapping; void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable, FieldSchema* physical_field); @@ -135,6 +135,8 @@ class FieldDescriptor { bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; } + std::map get_field_id_name_map() { return _field_id_name_mapping; } + const doris::Slice get_column_name_from_field_id(int32_t id) const; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index d669a57c609cd3..519dd68579caad 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -345,6 +345,7 @@ Status ParquetReader::init_reader( for (const std::string& name : required_columns) { _missing_cols.emplace_back(name); } + } else { const auto& table_column_idxs = _scan_params.column_idxs; std::map table_col_id_to_idx; diff --git a/be/src/vec/exec/format/table/hudi_reader.cpp b/be/src/vec/exec/format/table/hudi_reader.cpp new file mode 100644 index 00000000000000..aad4f810ffe50e --- /dev/null +++ b/be/src/vec/exec/format/table/hudi_reader.cpp @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "hudi_reader.h" + +#include + +#include "common/status.h" +#include "runtime/runtime_state.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +Status HudiReader::get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) { + if (!_params.__isset.history_schema_info) [[unlikely]] { + exist_schema = false; + return Status::OK(); + } + + if (!_params.history_schema_info.contains(_range.table_format_params.hudi_params.schema_id)) + [[unlikely]] { + return Status::InternalError("hudi file schema info is missing in history schema info."); + } + + file_col_id_to_name = + _params.history_schema_info.at(_range.table_format_params.hudi_params.schema_id); + return Status::OK(); +} + +Status HudiReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block)); + RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block)); + return Status::OK(); +}; + +Status HudiOrcReader::init_reader( + const std::vector& read_table_col_names, + const std::unordered_map& table_col_id_table_name_map, + std::unordered_map* table_col_name_to_value_range, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts) { + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info( + read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range)); + + auto* orc_reader = static_cast(_file_format_reader.get()); + orc_reader->set_table_col_to_file_col(_table_col_to_file_col); + return orc_reader->init_reader(&_all_required_col_names, _not_in_file_col_names, + &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, + row_descriptor, not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts); +} + +Status HudiParquetReader::init_reader( + const std::vector& read_table_col_names, + const std::unordered_map& table_col_id_table_name_map, + std::unordered_map* table_col_name_to_value_range, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const std::unordered_map* colname_to_slot_id, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts) { + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info( + read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range)); + auto* parquet_reader = static_cast(_file_format_reader.get()); + parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); + + return parquet_reader->init_reader( + _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, + conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); +} + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/hudi_reader.h b/be/src/vec/exec/format/table/hudi_reader.h new file mode 100644 index 00000000000000..cd063d9246271d --- /dev/null +++ b/be/src/vec/exec/format/table/hudi_reader.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include +#include + +#include "vec/exec/format/orc/vorc_reader.h" +#include "vec/exec/format/parquet/vparquet_reader.h" +#include "vec/exec/format/table/table_format_reader.h" +namespace doris::vectorized { +#include "common/compile_check_begin.h" +class HudiReader : public TableFormatReader, public TableSchemaChangeHelper { +public: + HudiReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, + RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, + io::IOContext* io_ctx) + : TableFormatReader(std::move(file_format_reader), state, profile, params, range, + io_ctx) {}; + + ~HudiReader() override = default; + + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; + + Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; + + Status init_row_filters() final { return Status::OK(); }; +}; + +class HudiOrcReader final : public HudiReader { +public: + ENABLE_FACTORY_CREATOR(HudiOrcReader); + HudiOrcReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : HudiReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {}; + ~HudiOrcReader() final = default; + + Status init_reader( + const std::vector& read_table_col_names, + const std::unordered_map& table_col_id_table_name_map, + std::unordered_map* table_col_name_to_value_range, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts); +}; + +class HudiParquetReader final : public HudiReader { +public: + ENABLE_FACTORY_CREATOR(HudiParquetReader); + HudiParquetReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : HudiReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {}; + ~HudiParquetReader() final = default; + + Status init_reader( + const std::vector& read_table_col_names, + const std::unordered_map& table_col_id_table_name_map, + std::unordered_map* table_col_name_to_value_range, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const std::unordered_map* colname_to_slot_id, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts); +}; +#include "common/compile_check_end.h" +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index d209eb7d271916..c297904ca417b7 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -93,32 +93,9 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_expand_block_if_need(block)); - // To support iceberg schema evolution. We change the column name in block to - // make it match with the column name in parquet file before reading data. and - // Set the name back to table column name before return this block. - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _table_col_to_file_col.find(col.name); - if (iter != _table_col_to_file_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } - + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block)); RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); - // Set the name back to table column name before return this block. - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _file_col_to_table_col.find(col.name); - if (iter != _file_col_to_table_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block)); if (_equality_delete_impl != nullptr) { RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block)); @@ -127,12 +104,6 @@ Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, return _shrink_block_if_need(block); } -Status IcebergTableReader::get_columns( - std::unordered_map* name_to_type, - std::unordered_set* missing_cols) { - return _file_format_reader->get_columns(name_to_type, missing_cols); -} - Status IcebergTableReader::init_row_filters() { // We get the count value by doris's be, so we don't need to read the delete file if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) { @@ -201,8 +172,9 @@ Status IcebergTableReader::_equality_delete_base( not_in_file_col_names, nullptr, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false)); } else if (auto* orc_reader = typeid_cast(delete_reader.get())) { - RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, nullptr, {}, false, - {}, {}, nullptr, nullptr)); + RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, + not_in_file_col_names, nullptr, {}, false, {}, + {}, nullptr, nullptr)); } else { return Status::InternalError("Unsupported format of delete file"); } @@ -412,60 +384,6 @@ void IcebergTableReader::_sort_delete_rows(std::vector*>& d } } -/* - * Generate _all_required_col_names and _not_in_file_col_names. - * - * _all_required_col_names is all the columns required by user sql. - * If the column name has been modified after the data file was written, - * put the old name in data file to _all_required_col_names. - * - * _not_in_file_col_names is all the columns required by user sql but not in the data file. - * e.g. New columns added after this data file was written. - * The columns added with names used by old dropped columns should consider as a missing column, - * which should be in _not_in_file_col_names. - */ -void IcebergTableReader::_gen_file_col_names() { - _all_required_col_names.clear(); - _not_in_file_col_names.clear(); - for (int i = 0; i < _file_col_names.size(); ++i) { - auto name = _file_col_names[i]; - auto iter = _table_col_to_file_col.find(name); - if (iter == _table_col_to_file_col.end()) { - // If the user creates the iceberg table, directly append the parquet file that already exists, - // there is no 'iceberg.schema' field in the footer of parquet, the '_table_col_to_file_col' may be empty. - // Because we are ignoring case, so, it is converted to lowercase here - auto name_low = to_lower(name); - _all_required_col_names.emplace_back(name_low); - if (_has_iceberg_schema) { - _not_in_file_col_names.emplace_back(name); - } else { - _table_col_to_file_col.emplace(name, name_low); - _file_col_to_table_col.emplace(name_low, name); - if (name != name_low) { - _has_schema_change = true; - } - } - } else { - _all_required_col_names.emplace_back(iter->second); - } - } -} - -/* - * Generate _new_colname_to_value_range, by replacing the column name in - * _colname_to_value_range with column name in data file. - */ -void IcebergTableReader::_gen_new_colname_to_value_range() { - for (auto it = _colname_to_value_range->begin(); it != _colname_to_value_range->end(); it++) { - auto iter = _table_col_to_file_col.find(it->first); - if (iter == _table_col_to_file_col.end()) { - _new_colname_to_value_range.emplace(it->first, it->second); - } else { - _new_colname_to_value_range.emplace(iter->second, it->second); - } - } -} - void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFile* position_delete, size_t read_rows, bool file_path_column_dictionary_coded) { @@ -502,7 +420,7 @@ void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFil Status IcebergParquetReader::init_reader( const std::vector& file_col_names, - const std::unordered_map& col_id_name_map, + const std::unordered_map& col_id_name_map, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -510,17 +428,13 @@ Status IcebergParquetReader::init_reader( const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { _file_format = Fileformat::PARQUET; - ParquetReader* parquet_reader = static_cast(_file_format_reader.get()); - _col_id_name_map = col_id_name_map; - _file_col_names = file_col_names; - _colname_to_value_range = colname_to_value_range; - FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema(); - RETURN_IF_ERROR(_gen_col_name_maps(field_desc)); - _gen_file_col_names(); - _gen_new_colname_to_value_range(); + auto* parquet_reader = static_cast(_file_format_reader.get()); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, col_id_name_map, + colname_to_value_range)); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); parquet_reader->iceberg_sanitize(_all_required_col_names); RETURN_IF_ERROR(init_row_filters()); + return parquet_reader->init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, @@ -575,7 +489,7 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d Status IcebergOrcReader::init_reader( const std::vector& file_col_names, - const std::unordered_map& col_id_name_map, + const std::unordered_map& col_id_name_map, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -584,18 +498,16 @@ Status IcebergOrcReader::init_reader( const std::unordered_map* slot_id_to_filter_conjuncts) { _file_format = Fileformat::ORC; auto* orc_reader = static_cast(_file_format_reader.get()); - _col_id_name_map = col_id_name_map; - _file_col_names = file_col_names; - _colname_to_value_range = colname_to_value_range; - RETURN_IF_ERROR(_gen_col_name_maps(orc_reader)); - _gen_file_col_names(); - _gen_new_colname_to_value_range(); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, col_id_name_map, + colname_to_value_range)); + orc_reader->set_table_col_to_file_col(_table_col_to_file_col); RETURN_IF_ERROR(init_row_filters()); - return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, - conjuncts, false, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + return orc_reader->init_reader(&_all_required_col_names, _not_in_file_col_names, + &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, + row_descriptor, not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts); } Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range, @@ -603,8 +515,9 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete OrcReader orc_delete_reader(_profile, _state, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx); std::unordered_map colname_to_value_range; - RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, &colname_to_value_range, - {}, false, {}, {}, nullptr, nullptr)); + RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, {}, + &colname_to_value_range, {}, false, {}, {}, + nullptr, nullptr)); std::unordered_map> partition_columns; @@ -625,61 +538,36 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete return Status::OK(); } -/* - * To support schema evolution, Iceberg write the column id to column name map to - * parquet file key_value_metadata. - * This function is to compare the table schema from FE (_col_id_name_map) with - * the schema in key_value_metadata for the current parquet file and generate two maps - * for future use: - * 1. table column name to parquet column name. - * 2. parquet column name to table column name. - * For example, parquet file has a column 'col1', - * after this file was written, iceberg changed the column name to 'col1_new'. - * The two maps would contain: - * 1. col1_new -> col1 - * 2. col1 -> col1_new - */ -Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor& field_desc) { +// To support schema evolution, Iceberg write the column id to column name map to parquet file key_value_metadata. +Status IcebergParquetReader::get_file_col_id_to_name( + bool& exist_schema, std::map& file_col_id_to_name) { + auto* parquet_reader = static_cast(_file_format_reader.get()); + FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema(); + if (field_desc.has_parquet_field_id()) { - for (const auto& pair : _col_id_name_map) { - auto name_slice = field_desc.get_column_name_from_field_id(pair.first); - if (name_slice.get_size() == 0) { - _has_schema_change = true; - } else { - auto name_string = name_slice.to_string(); - _table_col_to_file_col.emplace(pair.second, name_string); - _file_col_to_table_col.emplace(name_string, pair.second); - if (name_string != pair.second) { - _has_schema_change = true; - } - } - } + file_col_id_to_name = field_desc.get_field_id_name_map(); + } else { + //For early iceberg version, it doesn't write any schema information to Parquet file. + exist_schema = false; } + return Status::OK(); } -Status IcebergOrcReader::_gen_col_name_maps(OrcReader* orc_reader) { +//To support schema evolution, Iceberg write the column id to orc file attribute. +Status IcebergOrcReader::get_file_col_id_to_name( + bool& exist_schema, std::map& file_col_id_to_name) { + auto* orc_reader = static_cast(_file_format_reader.get()); + std::vector col_names; - std::vector col_ids; - RETURN_IF_ERROR( - orc_reader->get_schema_col_name_attribute(&col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE)); - _has_iceberg_schema = true; - _table_col_to_file_col.clear(); - _file_col_to_table_col.clear(); - for (size_t i = 0; i < col_ids.size(); i++) { - auto col_id = col_ids[i]; - auto& file_col_name = col_names[i]; - - if (_col_id_name_map.find(col_id) == _col_id_name_map.end()) { - _has_schema_change = true; - continue; - } - auto& table_col_name = _col_id_name_map[col_id]; - _table_col_to_file_col.emplace(table_col_name, file_col_name); - _file_col_to_table_col.emplace(file_col_name, table_col_name); - if (table_col_name != file_col_name) { - _has_schema_change = true; - } + std::vector col_ids; + RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute( + &col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema)); + if (!exist_schema) { + return Status::OK(); + } + for (auto i = 0; i < col_names.size(); i++) { + file_col_id_to_name.emplace(col_ids[i], std::move(col_names[i])); } return Status::OK(); } diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 8a2b253d16f9c5..5407415060b098 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -65,7 +65,7 @@ class GenericReader; class ShardedKVCache; class VExprContext; -class IcebergTableReader : public TableFormatReader { +class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHelper { public: struct PositionDeleteRange { std::vector data_file_path; @@ -82,9 +82,6 @@ class IcebergTableReader : public TableFormatReader { Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; - Status get_columns(std::unordered_map* name_to_type, - std::unordered_set* missing_cols) final; - enum { DATA, POSITION_DELETE, EQUALITY_DELETE }; enum Fileformat { NONE, PARQUET, ORC, AVRO }; @@ -116,9 +113,6 @@ class IcebergTableReader : public TableFormatReader { PositionDeleteRange _get_range(const ColumnString& file_path_column); - void _gen_file_col_names(); - - void _gen_new_colname_to_value_range(); static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; } Status _position_delete_base(const std::string data_file_path, @@ -138,28 +132,9 @@ class IcebergTableReader : public TableFormatReader { ShardedKVCache* _kv_cache; IcebergProfile _iceberg_profile; std::vector _iceberg_delete_rows; - // col names from _file_slot_descs - std::vector _file_col_names; - // file column name to table column name map. For iceberg schema evolution. - std::unordered_map _file_col_to_table_col; - // table column name to file column name map. For iceberg schema evolution. - std::unordered_map _table_col_to_file_col; - const std::unordered_map* _colname_to_value_range; - // copy from _colname_to_value_range with new column name that is in parquet/orc file, to support schema evolution. - std::unordered_map _new_colname_to_value_range; - // column id to name map. Collect from FE slot descriptor. - std::unordered_map _col_id_name_map; - // col names in the parquet,orc file - std::vector _all_required_col_names; - // col names in table but not in parquet,orc file - std::vector _not_in_file_col_names; - // equality delete should read the primary columns std::vector _expand_col_names; std::vector _expand_columns; - bool _has_schema_change = false; - bool _has_iceberg_schema = false; - Fileformat _file_format = Fileformat::NONE; const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; @@ -194,7 +169,7 @@ class IcebergParquetReader final : public IcebergTableReader { kv_cache, io_ctx) {} Status init_reader( const std::vector& file_col_names, - const std::unordered_map& col_id_name_map, + const std::unordered_map& col_id_name_map, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -210,7 +185,8 @@ class IcebergParquetReader final : public IcebergTableReader { parquet_reader->set_delete_rows(&_iceberg_delete_rows); } - Status _gen_col_name_maps(const FieldDescriptor& field_desc); + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; protected: std::unique_ptr _create_equality_reader( @@ -240,7 +216,7 @@ class IcebergOrcReader final : public IcebergTableReader { Status init_reader( const std::vector& file_col_names, - const std::unordered_map& col_id_name_map, + const std::unordered_map& col_id_name_map, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -248,7 +224,8 @@ class IcebergOrcReader final : public IcebergTableReader { const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts); - Status _gen_col_name_maps(OrcReader* orc_reader); + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; protected: std::unique_ptr _create_equality_reader( diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index e3fba810bbaf72..4dfe71337676da 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -37,65 +37,19 @@ PaimonReader::PaimonReader(std::unique_ptr file_format_reader, ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile); } -Status PaimonReader::gen_file_col_name( - const std::vector& read_table_col_names, - const std::unordered_map& table_col_id_table_name_map, - const std::unordered_map* - table_col_name_to_value_range) { - // It is a bit similar to iceberg. I will consider integrating it when I write hudi schema change later. - _table_col_to_file_col.clear(); - _file_col_to_table_col.clear(); - - if (!_params.__isset.paimon_schema_info) [[unlikely]] { +Status PaimonReader::get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) { + if (!_params.__isset.history_schema_info) [[unlikely]] { return Status::RuntimeError("miss paimon schema info."); } - if (!_params.paimon_schema_info.contains(_range.table_format_params.paimon_params.schema_id)) + if (!_params.history_schema_info.contains(_range.table_format_params.paimon_params.schema_id)) [[unlikely]] { return Status::InternalError("miss paimon schema info."); } - const auto& table_id_to_file_name = - _params.paimon_schema_info.at(_range.table_format_params.paimon_params.schema_id); - for (auto [table_col_id, file_col_name] : table_id_to_file_name) { - if (table_col_id_table_name_map.find(table_col_id) == table_col_id_table_name_map.end()) { - continue; - } - auto& table_col_name = table_col_id_table_name_map.at(table_col_id); - - _table_col_to_file_col.emplace(table_col_name, file_col_name); - _file_col_to_table_col.emplace(file_col_name, table_col_name); - if (table_col_name != file_col_name) { - _has_schema_change = true; - } - } - - _all_required_col_names.clear(); - _not_in_file_col_names.clear(); - for (auto name : read_table_col_names) { - auto iter = _table_col_to_file_col.find(name); - if (iter == _table_col_to_file_col.end()) { - auto name_low = to_lower(name); - _all_required_col_names.emplace_back(name_low); - - _table_col_to_file_col.emplace(name, name_low); - _file_col_to_table_col.emplace(name_low, name); - if (name != name_low) { - _has_schema_change = true; - } - } else { - _all_required_col_names.emplace_back(iter->second); - } - } - - for (auto& it : *table_col_name_to_value_range) { - auto iter = _table_col_to_file_col.find(it.first); - if (iter == _table_col_to_file_col.end()) { - _new_colname_to_value_range.emplace(it.first, it.second); - } else { - _new_colname_to_value_range.emplace(iter->second, it.second); - } - } + file_col_id_to_name = + _params.history_schema_info.at(_range.table_format_params.paimon_params.schema_id); return Status::OK(); } @@ -168,29 +122,9 @@ Status PaimonReader::init_row_filters() { } Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _table_col_to_file_col.find(col.name); - if (iter != _table_col_to_file_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } - + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block)); RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); - - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _file_col_to_table_col.find(col.name); - if (iter != _file_col_to_table_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block)); return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_reader.h index 11fffca943788f..cc10fce12792af 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_reader.h @@ -25,7 +25,7 @@ #include "vec/exec/format/table/table_format_reader.h" namespace doris::vectorized { -class PaimonReader : public TableFormatReader { +class PaimonReader : public TableFormatReader, public TableSchemaChangeHelper { public: PaimonReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, @@ -37,11 +37,8 @@ class PaimonReader : public TableFormatReader { Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; - Status gen_file_col_name( - const std::vector& read_table_col_names, - const std::unordered_map& table_col_id_table_name_map, - const std::unordered_map* - table_col_name_to_value_range); + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; protected: struct PaimonProfile { @@ -51,16 +48,6 @@ class PaimonReader : public TableFormatReader { std::vector _delete_rows; PaimonProfile _paimon_profile; - std::unordered_map _new_colname_to_value_range; - - std::unordered_map _file_col_to_table_col; - std::unordered_map _table_col_to_file_col; - - std::vector _all_required_col_names; - std::vector _not_in_file_col_names; - - bool _has_schema_change = false; - virtual void set_delete_rows() = 0; }; @@ -80,21 +67,21 @@ class PaimonOrcReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, - const std::unordered_map& table_col_id_table_name_map, - const std::unordered_map* - table_col_name_to_value_range, + const std::unordered_map& table_col_id_table_name_map, + const std::unordered_map* table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { - RETURN_IF_ERROR(gen_file_col_name(read_table_col_names, table_col_id_table_name_map, - table_col_name_to_value_range)); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info( + read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range)); + auto* orc_reader = static_cast(_file_format_reader.get()); orc_reader->set_table_col_to_file_col(_table_col_to_file_col); - return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, - conjuncts, false, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, - slot_id_to_filter_conjuncts); + return orc_reader->init_reader( + &_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, + conjuncts, false, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); } }; @@ -114,16 +101,15 @@ class PaimonParquetReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, - const std::unordered_map& table_col_id_table_name_map, - const std::unordered_map* - table_col_name_to_value_range, + const std::unordered_map& table_col_id_table_name_map, + const std::unordered_map* table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { - RETURN_IF_ERROR(gen_file_col_name(read_table_col_names, table_col_id_table_name_map, - table_col_name_to_value_range)); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info( + read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range)); auto* parquet_reader = static_cast(_file_format_reader.get()); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp b/be/src/vec/exec/format/table/table_format_reader.cpp new file mode 100644 index 00000000000000..86f0dea38e4ff7 --- /dev/null +++ b/be/src/vec/exec/format/table/table_format_reader.cpp @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "table_format_reader.h" + +#include +#include + +#include "common/status.h" +#include "vec/core/block.h" +#include "vec/exec/format/generic_reader.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +Status TableSchemaChangeHelper::init_schema_info( + const std::vector& read_table_col_names, + const std::unordered_map& table_id_to_name, + const std::unordered_map* + table_col_name_to_value_range) { + bool exist_schema = true; + std::map file_id_to_name; + RETURN_IF_ERROR(get_file_col_id_to_name(exist_schema, file_id_to_name)); + if (!exist_schema) { + file_id_to_name.clear(); + for (const auto& [table_col_id, table_col_name] : table_id_to_name) { + file_id_to_name.emplace(table_col_id, table_col_name); + } + } + + /** This is to compare the table schema from FE (table_id_to_name) with + * the current file schema (file_id_to_name) , generate two maps for future use: + * 1. table column name to file column name. + * 2. file column name to table column name. + * For example, file has a column 'col1', + * after this file was written, iceberg changed the column name to 'col1_new'. + * The two maps would contain: + * 1. col1_new -> col1 + * 2. col1 -> col1_new + */ + for (const auto& [file_col_id, file_col_name] : file_id_to_name) { + if (table_id_to_name.find(file_col_id) == table_id_to_name.end()) { + continue; + } + + auto& table_col_name = table_id_to_name.at(file_col_id); + _table_col_to_file_col.emplace(table_col_name, file_col_name); + _file_col_to_table_col.emplace(file_col_name, table_col_name); + if (table_col_name != file_col_name) { + _has_schema_change = true; + } + } + + /** Generate _all_required_col_names and _not_in_file_col_names. + * + * _all_required_col_names is all the columns required by user sql. + * If the column name has been modified after the data file was written, + * put the old name in data file to _all_required_col_names. + * + * _not_in_file_col_names is all the columns required by user sql but not in the data file. + * e.g. New columns added after this data file was written. + * The columns added with names used by old dropped columns should consider as a missing column, + * which should be in _not_in_file_col_names. + */ + _all_required_col_names.clear(); + _not_in_file_col_names.clear(); + for (auto table_col_name : read_table_col_names) { + auto iter = _table_col_to_file_col.find(table_col_name); + if (iter == _table_col_to_file_col.end()) { + _all_required_col_names.emplace_back(table_col_name); + _not_in_file_col_names.emplace_back(table_col_name); + } else { + _all_required_col_names.emplace_back(iter->second); + } + } + + /** Generate _new_colname_to_value_range, by replacing the column name in + * _colname_to_value_range with column name in data file. + */ + for (auto& it : *table_col_name_to_value_range) { + auto iter = _table_col_to_file_col.find(it.first); + if (iter == _table_col_to_file_col.end()) { + _new_colname_to_value_range.emplace(it.first, it.second); + } else { + _new_colname_to_value_range.emplace(iter->second, it.second); + } + } + return Status::OK(); +} + +Status TableSchemaChangeHelper::get_next_block_before(Block* block) const { + if (_has_schema_change) { + for (int i = 0; i < block->columns(); i++) { + ColumnWithTypeAndName& col = block->get_by_position(i); + auto iter = _table_col_to_file_col.find(col.name); + if (iter != _table_col_to_file_col.end()) { + col.name = iter->second; + } + } + block->initialize_index_by_name(); + } + return Status::OK(); +} + +Status TableSchemaChangeHelper::get_next_block_after(Block* block) const { + if (_has_schema_change) { + for (int i = 0; i < block->columns(); i++) { + ColumnWithTypeAndName& col = block->get_by_position(i); + auto iter = _file_col_to_table_col.find(col.name); + if (iter != _file_col_to_table_col.end()) { + col.name = iter->second; + } + } + block->initialize_index_by_name(); + } + return Status::OK(); +} +#include "common/compile_check_end.h" +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index 0257a94a09b79a..72569dc9106fee 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -22,6 +22,7 @@ #include #include "common/status.h" +#include "exec/olap_common.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "vec/core/block.h" @@ -79,7 +80,7 @@ class TableFormatReader : public GenericReader { virtual Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) = 0; Status get_columns(std::unordered_map* name_to_type, - std::unordered_set* missing_cols) override { + std::unordered_set* missing_cols) final { return _file_format_reader->get_columns(name_to_type, missing_cols); } @@ -91,7 +92,7 @@ class TableFormatReader : public GenericReader { Status set_fill_columns( const std::unordered_map>& partition_columns, - const std::unordered_map& missing_columns) override { + const std::unordered_map& missing_columns) final { return _file_format_reader->set_fill_columns(partition_columns, missing_columns); } @@ -115,4 +116,44 @@ class TableFormatReader : public GenericReader { } }; +class TableSchemaChangeHelper { +public: + /** Get the mapping from the unique ID of the column in the current file to the file column name. + * Iceberg/Hudi/Paimon usually maintains field IDs to support schema changes. If you cannot obtain this + * information (maybe the old version does not have this information), you need to set `exist_schema` = `false`. + */ + virtual Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) = 0; + + virtual ~TableSchemaChangeHelper() = default; + +protected: + /** table_id_to_name : table column unique id to table name map */ + Status init_schema_info(const std::vector& read_table_col_names, + const std::unordered_map& table_id_to_name, + const std::unordered_map* + table_col_name_to_value_range); + + /** To support schema evolution. We change the column name in block to + * make it match with the column name in file before reading data. and + * set the name back to table column name before return this block. + */ + Status get_next_block_before(Block* block) const; + + /** Set the name back to table column name before return this block.*/ + Status get_next_block_after(Block* block) const; + + // copy from _colname_to_value_range with new column name that is in parquet/orc file + std::unordered_map _new_colname_to_value_range; + // all the columns required by user sql. + std::vector _all_required_col_names; + // col names in table but not in parquet,orc file + std::vector _not_in_file_col_names; + bool _has_schema_change = false; + // file column name to table column name map + std::unordered_map _file_col_to_table_col; + // table column name to file column name map. + std::unordered_map _table_col_to_file_col; +}; + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index f1d02c3639926f..620ca8673631a9 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -60,8 +60,8 @@ Status TransactionalHiveReader::init_reader( _col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end()); Status status = orc_reader->init_reader( - &_col_names, colname_to_value_range, conjuncts, true, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + &_col_names, {}, colname_to_value_range, conjuncts, true, tuple_descriptor, + row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); return status; } @@ -78,12 +78,6 @@ Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t* read_ return res; } -Status TransactionalHiveReader::get_columns( - std::unordered_map* name_to_type, - std::unordered_set* missing_cols) { - return _file_format_reader->get_columns(name_to_type, missing_cols); -} - Status TransactionalHiveReader::init_row_filters() { std::string data_file_path = _range.path; // the path in _range is remove the namenode prefix, @@ -123,9 +117,9 @@ Status TransactionalHiveReader::init_row_filters() { OrcReader delete_reader(_profile, _state, _params, delete_range, _MIN_BATCH_SIZE, _state->timezone(), _io_ctx, false); - RETURN_IF_ERROR( - delete_reader.init_reader(&TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, - nullptr, {}, false, nullptr, nullptr, nullptr, nullptr)); + RETURN_IF_ERROR(delete_reader.init_reader( + &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, {}, nullptr, {}, false, + nullptr, nullptr, nullptr, nullptr)); std::unordered_map> partition_columns; diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h index f27f33f45635fc..5fbca8635243d9 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.h +++ b/be/src/vec/exec/format/table/transactional_hive_reader.h @@ -91,9 +91,6 @@ class TransactionalHiveReader : public TableFormatReader { Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; - Status get_columns(std::unordered_map* name_to_type, - std::unordered_set* missing_cols) final; - Status init_reader( const std::vector& column_names, const std::unordered_map* colname_to_value_range, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 1c17e788b005be..724297a5f90094 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -62,6 +62,7 @@ #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/exec/format/table/hudi_jni_reader.h" +#include "vec/exec/format/table/hudi_reader.h" #include "vec/exec/format/table/iceberg_reader.h" #include "vec/exec/format/table/lakesoul_jni_reader.h" #include "vec/exec/format/table/max_compute_jni_reader.h" @@ -971,6 +972,17 @@ Status VFileScanner::_get_next_reader() { &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "hudi") { + std::unique_ptr hudi_reader = + HudiParquetReader::create_unique(std::move(parquet_reader), _profile, + _state, *_params, range, _io_ctx.get()); + init_status = hudi_reader->init_reader( + _file_col_names, _col_id_name_map, _colname_to_value_range, + _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), + _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); + _cur_reader = std::move(hudi_reader); } else { bool hive_parquet_use_column_names = true; @@ -1035,6 +1047,16 @@ Status VFileScanner::_get_next_reader() { &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "hudi") { + std::unique_ptr hudi_reader = HudiOrcReader::create_unique( + std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get()); + + init_status = hudi_reader->init_reader( + _file_col_names, _col_id_name_map, _colname_to_value_range, + _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); + _cur_reader = std::move(hudi_reader); } else { bool hive_orc_use_column_names = true; @@ -1044,7 +1066,7 @@ Status VFileScanner::_get_next_reader() { hive_orc_use_column_names = _state->query_options().hive_orc_use_column_names; } init_status = orc_reader->init_reader( - &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, + &_file_col_names, {}, _colname_to_value_range, _push_down_conjuncts, false, _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, hive_orc_use_column_names); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 9bd1f3e2aa1533..5b1604209d49d0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -107,7 +107,7 @@ class VFileScanner : public VScanner { // col names from _file_slot_descs std::vector _file_col_names; // column id to name map. Collect from FE slot descriptor. - std::unordered_map _col_id_name_map; + std::unordered_map _col_id_name_map; // Partition source slot descriptors std::vector _partition_slot_descs; diff --git a/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp b/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp index 8dafed48a97bf4..8063549afc5ced 100644 --- a/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp +++ b/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp @@ -51,11 +51,26 @@ class PaimonMockReader final : public PaimonReader { table_col_to_file_col_ans["d"] = "struct_col"; table_col_to_file_col_ans["a"] = "vvv"; table_col_to_file_col_ans["c"] = "k"; - table_col_to_file_col_ans["nonono"] = "nonono"; for (auto [table_col, file_col] : table_col_to_file_col_ans) { ASSERT_TRUE(_table_col_to_file_col[table_col] == file_col); ASSERT_TRUE(_file_col_to_table_col[file_col] == table_col); } + ASSERT_TRUE(_all_required_col_names.size() == 6); + + std::set all_required_col_names_set; + all_required_col_names_set.emplace("map_col"); + all_required_col_names_set.emplace("array_col"); + all_required_col_names_set.emplace("struct_col"); + all_required_col_names_set.emplace("vvv"); + all_required_col_names_set.emplace("k"); + all_required_col_names_set.emplace("nonono"); + + for (auto i : _all_required_col_names) { + ASSERT_TRUE(all_required_col_names_set.contains(i)); + } + + ASSERT_TRUE(_not_in_file_col_names.size() == 1); + ASSERT_TRUE(_not_in_file_col_names.back() == "nonono"); } }; @@ -65,7 +80,6 @@ class PaimonReaderTest : public ::testing::Test { _profile = new RuntimeProfile("test_profile"); _state = new RuntimeState(TQueryGlobals()); _io_ctx = new io::IOContext(); - _schema_file_path = "./be/test/exec/test_data/paimon_scanner/schema-0"; } void TearDown() override { @@ -77,11 +91,10 @@ class PaimonReaderTest : public ::testing::Test { RuntimeProfile* _profile; RuntimeState* _state; io::IOContext* _io_ctx; - std::string _schema_file_path; }; TEST_F(PaimonReaderTest, ReadSchemaFile) { - std::map file_id_to_name; + std::map file_id_to_name; file_id_to_name[0] = "k"; file_id_to_name[1] = "vvv"; file_id_to_name[2] = "array_col"; @@ -92,8 +105,8 @@ TEST_F(PaimonReaderTest, ReadSchemaFile) { params.file_type = TFileType::FILE_LOCAL; params.properties = {}; params.hdfs_params = {}; - params.__isset.paimon_schema_info = true; - params.paimon_schema_info[0] = file_id_to_name; + params.__isset.history_schema_info = true; + params.history_schema_info[0] = file_id_to_name; TFileRangeDesc range; range.table_format_params.paimon_params.schema_id = 0; @@ -118,7 +131,7 @@ TEST_F(PaimonReaderTest, ReadSchemaFile) { read_table_col_names.emplace_back("e"); read_table_col_names.emplace_back("nonono"); - std::unordered_map table_col_id_table_name_map; + std::unordered_map table_col_id_table_name_map; table_col_id_table_name_map[1] = "a"; table_col_id_table_name_map[6] = "b"; table_col_id_table_name_map[0] = "c"; @@ -127,8 +140,8 @@ TEST_F(PaimonReaderTest, ReadSchemaFile) { table_col_id_table_name_map[10] = "nonono"; std::unordered_map table_col_name_to_value_range; - Status status = reader.gen_file_col_name(read_table_col_names, table_col_id_table_name_map, - &table_col_name_to_value_range); + Status status = reader.init_schema_info(read_table_col_names, table_col_id_table_name_map, + &table_col_name_to_value_range); ASSERT_TRUE(status.ok()); reader.check(); } diff --git a/be/test/vec/exec/format/table/table_schema_change_helper_test.cpp b/be/test/vec/exec/format/table/table_schema_change_helper_test.cpp new file mode 100644 index 00000000000000..a7940eb4a544d1 --- /dev/null +++ b/be/test/vec/exec/format/table/table_schema_change_helper_test.cpp @@ -0,0 +1,471 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include + +#include "common/status.h" +#include "vec/columns/column_string.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_string.h" +#include "vec/exec/format/table/table_format_reader.h" + +namespace doris::vectorized { +class MockTableSchemaChangeHelper : public TableSchemaChangeHelper { +public: + MockTableSchemaChangeHelper(std::map file_schema, bool exist_schema = true) + : _file_schema(std::move(file_schema)), _exist_schema(exist_schema) {} + + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) override { + exist_schema = _exist_schema; + if (_exist_schema) { + file_col_id_to_name = _file_schema; + } + return Status::OK(); + } + + bool has_schema_change() const { return _has_schema_change; } + const std::vector& all_required_col_names() const { + return _all_required_col_names; + } + const std::vector& not_in_file_col_names() const { return _not_in_file_col_names; } + const std::unordered_map& file_col_to_table_col() const { + return _file_col_to_table_col; + } + const std::unordered_map& table_col_to_file_col() const { + return _table_col_to_file_col; + } + const std::unordered_map& new_colname_to_value_range() + const { + return _new_colname_to_value_range; + } + +private: + std::map _file_schema; + bool _exist_schema; +}; + +TEST(TableSchemaChangeHelperTest, NoSchemaChange) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}}; + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::vector read_cols = {"col1", "col3"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 2); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col3"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); +} + +TEST(TableSchemaChangeHelperTest, WithSchemaChange) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}, {3, "col3_old"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3_new"}}; + + std::vector read_cols = {"col1", "col2_new", "col3_new"}; + + std::unordered_map col_ranges = { + {"col2_new", ColumnValueRangeType()}}; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 3); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2_old"); + ASSERT_EQ(helper.all_required_col_names()[2], "col3_old"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); + + ASSERT_EQ(helper.table_col_to_file_col().size(), 3); + ASSERT_EQ(helper.table_col_to_file_col().at("col2_new"), "col2_old"); + ASSERT_EQ(helper.table_col_to_file_col().at("col3_new"), "col3_old"); + + ASSERT_EQ(helper.file_col_to_table_col().size(), 3); + ASSERT_EQ(helper.file_col_to_table_col().at("col2_old"), "col2_new"); + ASSERT_EQ(helper.file_col_to_table_col().at("col3_old"), "col3_new"); + + ASSERT_EQ(helper.new_colname_to_value_range().size(), 1); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col2_old") != + helper.new_colname_to_value_range().end()); +} + +TEST(TableSchemaChangeHelperTest, MissingColumns) { + std::map file_schema = {{1, "col1"}, {2, "col2"} + + }; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col3"}, {4, "col4"}}; + std::vector read_cols = {"col1", "col3", "col4"}; + std::unordered_map col_ranges = { + {"col3", ColumnValueRangeType()}}; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 3); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col3"); + ASSERT_EQ(helper.all_required_col_names()[2], "col4"); + ASSERT_EQ(helper.not_in_file_col_names().size(), 2); + ASSERT_EQ(helper.not_in_file_col_names()[0], "col3"); + ASSERT_EQ(helper.not_in_file_col_names()[1], "col4"); +} + +TEST(TableSchemaChangeHelperTest, NoFileSchema) { + std::map file_schema; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::vector read_cols = {"col1", "col2"}; + std::unordered_map col_ranges; + MockTableSchemaChangeHelper helper(file_schema, false); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 2); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); +} + +TEST(TableSchemaChangeHelperTest, MixedScenario) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}, {4, "col4_old"}}; + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3"}, {4, "col4_new"}, {5, "col5"}}; + std::vector read_cols = {"col1", "col2_new", "col3", "col4_new", "col5"}; + std::unordered_map col_ranges = { + {"col2_new", ColumnValueRangeType()}, + {"col3", ColumnValueRangeType()}, + {"col5", ColumnValueRangeType()}}; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + ASSERT_TRUE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 5); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2_old"); + ASSERT_EQ(helper.all_required_col_names()[2], "col3"); + ASSERT_EQ(helper.all_required_col_names()[3], "col4_old"); + ASSERT_EQ(helper.all_required_col_names()[4], "col5"); + ASSERT_EQ(helper.not_in_file_col_names().size(), 2); + ASSERT_EQ(helper.not_in_file_col_names()[0], "col3"); + ASSERT_EQ(helper.not_in_file_col_names()[1], "col5"); + ASSERT_EQ(helper.table_col_to_file_col().at("col2_new"), "col2_old"); + ASSERT_EQ(helper.table_col_to_file_col().at("col4_new"), "col4_old"); + ASSERT_EQ(helper.new_colname_to_value_range().size(), 3); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col2_old") != + helper.new_colname_to_value_range().end()); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col3") != + helper.new_colname_to_value_range().end()); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col5") != + helper.new_colname_to_value_range().end()); +} + +TEST(TableSchemaChangeHelperTest, EmptySchemas) { + std::map file_schema; + std::unordered_map table_id_to_name; + std::vector read_cols; + std::unordered_map col_ranges; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_TRUE(helper.all_required_col_names().empty()); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); + ASSERT_TRUE(helper.table_col_to_file_col().empty()); + ASSERT_TRUE(helper.file_col_to_table_col().empty()); + ASSERT_TRUE(helper.new_colname_to_value_range().empty()); +} + +TEST(TableSchemaChangeHelperTest, IdMismatch) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::unordered_map table_id_to_name = { + {10, "col1"}, {20, "col2"}, {30, "col3"}}; + + std::vector read_cols = {"col1", "col2", "col3"}; + + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 3); + ASSERT_EQ(helper.not_in_file_col_names().size(), 3); + ASSERT_TRUE(helper.table_col_to_file_col().empty()); + ASSERT_TRUE(helper.file_col_to_table_col().empty()); +} + +TEST(TableSchemaChangeHelperTest, DuplicateColumnNames) { + std::map file_schema = {{1, "col1"}, {2, "col2"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col2"}, {4, "col1"}}; + + std::vector read_cols = {"col1", "col2"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 2); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); + ASSERT_EQ(helper.table_col_to_file_col().size(), 2); +} + +TEST(TableSchemaChangeHelperTest, ValueRangeForNonReadColumns) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}, {4, "col4"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3"}, {4, "col4"}}; + + std::vector read_cols = {"col1", "col3"}; + + std::unordered_map col_ranges = { + {"col1", ColumnValueRangeType()}, + {"col2_new", ColumnValueRangeType()}, + {"col4", ColumnValueRangeType()}}; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 2); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col3"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); + + ASSERT_EQ(helper.new_colname_to_value_range().size(), 3); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col1") != + helper.new_colname_to_value_range().end()); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col2") != + helper.new_colname_to_value_range().end()); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col4") != + helper.new_colname_to_value_range().end()); +} + +TEST(TableSchemaChangeHelperTest, PartialIdMatch) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}, {4, "col4"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {20, "col2"}, {3, "col3_new"}, {40, "col4_new"}}; + std::vector read_cols = {"col1", "col2", "col3_new", "col4_new"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + + ASSERT_EQ(helper.all_required_col_names().size(), 4); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2"); + ASSERT_EQ(helper.all_required_col_names()[2], "col3"); + ASSERT_EQ(helper.all_required_col_names()[3], "col4_new"); + + ASSERT_EQ(helper.not_in_file_col_names().size(), 2); + ASSERT_EQ(helper.not_in_file_col_names()[0], "col2"); + ASSERT_EQ(helper.not_in_file_col_names()[1], "col4_new"); + + ASSERT_EQ(helper.table_col_to_file_col().size(), 2); + ASSERT_EQ(helper.table_col_to_file_col().at("col1"), "col1"); + ASSERT_EQ(helper.table_col_to_file_col().at("col3_new"), "col3"); +} + +Block create_test_block(const std::vector& column_names) { + Block block; + for (const auto& name : column_names) { + auto column = ColumnString::create(); + block.insert( + ColumnWithTypeAndName(std::move(column), std::make_shared(), name)); + } + return block; +} + +TEST(TableSchemaChangeHelperTest, BasicColumnNameConversion) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}, {3, "col3_old"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3_new"}}; + + std::vector read_cols = {"col1", "col2_new", "col3_new"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + + Block before_block = create_test_block({"col1", "col2_new", "col3_new"}); + ASSERT_TRUE(helper.get_next_block_before(&before_block).ok()); + + ASSERT_EQ(before_block.get_by_position(0).name, "col1"); + ASSERT_EQ(before_block.get_by_position(1).name, "col2_old"); + ASSERT_EQ(before_block.get_by_position(2).name, "col3_old"); + + Block after_block = create_test_block({"col1", "col2_old", "col3_old"}); + ASSERT_TRUE(helper.get_next_block_after(&after_block).ok()); + + ASSERT_EQ(after_block.get_by_position(0).name, "col1"); + ASSERT_EQ(after_block.get_by_position(1).name, "col2_new"); + ASSERT_EQ(after_block.get_by_position(2).name, "col3_new"); +} + +TEST(TableSchemaChangeHelperTest, NoSchemaChangeBlocks) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::vector read_cols = {"col1", "col2", "col3"}; + + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + + Block before_block = create_test_block({"col1", "col2", "col3"}); + ASSERT_TRUE(helper.get_next_block_before(&before_block).ok()); + + ASSERT_EQ(before_block.get_by_position(0).name, "col1"); + ASSERT_EQ(before_block.get_by_position(1).name, "col2"); + ASSERT_EQ(before_block.get_by_position(2).name, "col3"); + + Block after_block = create_test_block({"col1", "col2", "col3"}); + ASSERT_TRUE(helper.get_next_block_after(&after_block).ok()); + + ASSERT_EQ(after_block.get_by_position(0).name, "col1"); + ASSERT_EQ(after_block.get_by_position(1).name, "col2"); + ASSERT_EQ(after_block.get_by_position(2).name, "col3"); +} + +TEST(TableSchemaChangeHelperTest, MixedColumnNameConversion) { + std::map file_schema = { + {1, "col1"}, {2, "col2_old"}, {3, "col3"}, {4, "col4_old"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3"}, {4, "col4_new"}, {5, "col5"}}; + + std::vector read_cols = {"col1", "col2_new", "col3", "col4_new", "col5"}; + + std::unordered_map col_ranges; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + Block before_block = + create_test_block({"col1", "col2_new", "col3", "col4_new", "col5", "extra_col"}); + + ASSERT_TRUE(helper.get_next_block_before(&before_block).ok()); + + ASSERT_EQ(before_block.get_by_position(0).name, "col1"); + ASSERT_EQ(before_block.get_by_position(1).name, "col2_old"); + ASSERT_EQ(before_block.get_by_position(2).name, "col3"); + ASSERT_EQ(before_block.get_by_position(3).name, "col4_old"); + ASSERT_EQ(before_block.get_by_position(4).name, "col5"); + ASSERT_EQ(before_block.get_by_position(5).name, "extra_col"); + + Block after_block = + create_test_block({"col1", "col2_old", "col3", "col4_old", "col5", "extra_col"}); + + ASSERT_TRUE(helper.get_next_block_after(&after_block).ok()); + ASSERT_EQ(after_block.get_by_position(0).name, "col1"); + ASSERT_EQ(after_block.get_by_position(1).name, "col2_new"); + ASSERT_EQ(after_block.get_by_position(2).name, "col3"); + ASSERT_EQ(after_block.get_by_position(3).name, "col4_new"); + ASSERT_EQ(after_block.get_by_position(4).name, "col5"); + ASSERT_EQ(after_block.get_by_position(5).name, "extra_col"); +} + +TEST(TableSchemaChangeHelperTest, EmptyAndSingleColumnBlocks) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}}; + + std::unordered_map table_id_to_name = {{1, "col1"}, {2, "col2_new"}}; + + std::vector read_cols = {"col1", "col2_new"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + ASSERT_TRUE(helper.has_schema_change()); + + Block empty_block; + ASSERT_TRUE(helper.get_next_block_before(&empty_block).ok()); + ASSERT_TRUE(helper.get_next_block_after(&empty_block).ok()); + ASSERT_EQ(empty_block.columns(), 0); + + Block single_block1 = create_test_block({"col1"}); + ASSERT_TRUE(helper.get_next_block_before(&single_block1).ok()); + ASSERT_EQ(single_block1.get_by_position(0).name, "col1"); + + ASSERT_TRUE(helper.get_next_block_after(&single_block1).ok()); + ASSERT_EQ(single_block1.get_by_position(0).name, "col1"); + + Block single_block2 = create_test_block({"col2_new"}); + ASSERT_TRUE(helper.get_next_block_before(&single_block2).ok()); + ASSERT_EQ(single_block2.get_by_position(0).name, "col2_old"); + + Block single_block3 = create_test_block({"col2_old"}); + ASSERT_TRUE(helper.get_next_block_after(&single_block3).ok()); + ASSERT_EQ(single_block3.get_by_position(0).name, "col2_new"); +} + +TEST(TableSchemaChangeHelperTest, ColumnOrderChange) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}, {3, "col3_old"}}; + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3_new"}}; + std::vector read_cols = {"col1", "col2_new", "col3_new"}; + std::unordered_map col_ranges; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + + Block before_block = create_test_block({"col3_new", "col1", "col2_new"}); + ASSERT_TRUE(helper.get_next_block_before(&before_block).ok()); + + ASSERT_EQ(before_block.get_by_position(0).name, "col3_old"); + ASSERT_EQ(before_block.get_by_position(1).name, "col1"); + ASSERT_EQ(before_block.get_by_position(2).name, "col2_old"); + + Block after_block = create_test_block({"col3_old", "col1", "col2_old"}); + ASSERT_TRUE(helper.get_next_block_after(&after_block).ok()); + + ASSERT_EQ(after_block.get_by_position(0).name, "col3_new"); + ASSERT_EQ(after_block.get_by_position(1).name, "col1"); + ASSERT_EQ(after_block.get_by_position(2).name, "col2_new"); +} +} // namespace doris::vectorized diff --git a/be/test/vec/exec/orc_reader_test.cpp b/be/test/vec/exec/orc_reader_test.cpp index ff7452ae625428..e27bdf08c9d5e7 100644 --- a/be/test/vec/exec/orc_reader_test.cpp +++ b/be/test/vec/exec/orc_reader_test.cpp @@ -66,8 +66,8 @@ class OrcReaderTest : public testing::Test { range.start_offset = 0; range.size = 1293; auto reader = OrcReader::create_unique(params, range, "", nullptr, true); - auto status = reader->init_reader(&column_names, nullptr, {}, false, tuple_desc, &row_desc, - nullptr, nullptr); + auto status = reader->init_reader(&column_names, {}, nullptr, {}, false, tuple_desc, + &row_desc, nullptr, nullptr); EXPECT_TRUE(status.ok()); // deserialize expr diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run10.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run10.sql new file mode 100644 index 00000000000000..1a3d844ef6027e --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run10.sql @@ -0,0 +1,48 @@ + +use demo.test_db; + +CREATE TABLE sc_drop_add_orc ( + id BIGINT, + name STRING, + age INT +) +USING iceberg +PARTITIONED BY (id) +TBLPROPERTIES ('format'='orc'); + +INSERT INTO sc_drop_add_orc VALUES (1, 'Alice', 25); +INSERT INTO sc_drop_add_orc VALUES (2, 'Bob', 30); + +ALTER TABLE sc_drop_add_orc DROP COLUMN age; + +INSERT INTO sc_drop_add_orc (id, name) VALUES (3, 'Charlie'); +INSERT INTO sc_drop_add_orc (id, name) VALUES (4, 'David'); + +ALTER TABLE sc_drop_add_orc ADD COLUMN age INT; + +INSERT INTO sc_drop_add_orc VALUES (5, 'Eve', 28); +INSERT INTO sc_drop_add_orc VALUES (6, 'Frank', 35); + + + +CREATE TABLE sc_drop_add_parquet ( + id BIGINT, + name STRING, + age INT +) +USING iceberg +PARTITIONED BY (id) +TBLPROPERTIES ('format'='parquet'); + +INSERT INTO sc_drop_add_parquet VALUES (1, 'Alice', 25); +INSERT INTO sc_drop_add_parquet VALUES (2, 'Bob', 30); + +ALTER TABLE sc_drop_add_parquet DROP COLUMN age; + +INSERT INTO sc_drop_add_parquet (id, name) VALUES (3, 'Charlie'); +INSERT INTO sc_drop_add_parquet (id, name) VALUES (4, 'David'); + +ALTER TABLE sc_drop_add_parquet ADD COLUMN age INT; + +INSERT INTO sc_drop_add_parquet VALUES (5, 'Eve', 28); +INSERT INTO sc_drop_add_parquet VALUES (6, 'Frank', 35); \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 74bf07fff1aacb..5042ad763e9589 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -836,6 +836,20 @@ public static InternalSchema getHudiTableSchema(HMSExternalTable table, boolean[ } } + if (internalSchemaOption.isPresent()) { + enableSchemaEvolution[0] = true; + return internalSchemaOption.get(); + } else { + try { + // schema evolution is not enabled. (hoodie.schema.on.read.enable = false). + enableSchemaEvolution[0] = false; + // AvroInternalSchemaConverter.convert() will generator field id. + return AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema(true)); + } catch (Exception e) { + throw new RuntimeException("Cannot get hudi table schema.", e); + } + } + } public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction action) { // if hive config is not ready, then use hadoop kerberos to login diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java index 843dded27969ad..abb89dc32e7705 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java @@ -18,7 +18,7 @@ package org.apache.doris.datasource.hudi.source; import org.apache.doris.common.util.LocationPath; -import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.TableFormatType; import org.apache.doris.spi.Split; import org.apache.hadoop.conf.Configuration; @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.stream.Collectors; public class COWIncrementalRelation implements IncrementalRelation { @@ -212,19 +213,22 @@ public List collectSplits() throws HoodieException { Option partitionColumns = metaClient.getTableConfig().getPartitionFields(); List partitionNames = partitionColumns.isPresent() ? Arrays.asList(partitionColumns.get()) : Collections.emptyList(); - for (String baseFile : filteredMetaBootstrapFullPaths) { + + Consumer generatorSplit = baseFile -> { HoodieWriteStat stat = fileToWriteStat.get(baseFile); - splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0, - stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), - 0, new String[0], - HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath()))); + LocationPath locationPath = new LocationPath(baseFile, optParams); + HudiSplit hudiSplit = new HudiSplit(locationPath, 0, + stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), new String[0], + HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath())); + hudiSplit.setTableFormatType(TableFormatType.HUDI); + splits.add(hudiSplit); + }; + + for (String baseFile : filteredMetaBootstrapFullPaths) { + generatorSplit.accept(baseFile); } for (String baseFile : filteredRegularFullPaths) { - HoodieWriteStat stat = fileToWriteStat.get(baseFile); - splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0, - stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), - 0, new String[0], - HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath()))); + generatorSplit.accept(baseFile); } return splits; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 96ee3c08605a31..d5a04566e327b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -60,10 +60,12 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.storage.StoragePath; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -72,6 +74,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -212,6 +215,9 @@ protected void doInitialize() throws UserException { .getExtMetaCacheMgr() .getFsViewProcessor(hmsTable.getCatalog()) .getFsView(hmsTable.getDbName(), hmsTable.getName(), hudiClient); + if (HudiUtils.getSchemaCacheValue(hmsTable).isEnableSchemaEvolution()) { + params.setHistorySchemaInfo(new ConcurrentHashMap<>()); + } } @Override @@ -250,17 +256,32 @@ private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(hudiSplit.getTableFormatType().value()); THudiFileDesc fileDesc = new THudiFileDesc(); - fileDesc.setInstantTime(hudiSplit.getInstantTime()); - fileDesc.setSerde(hudiSplit.getSerde()); - fileDesc.setInputFormat(hudiSplit.getInputFormat()); - fileDesc.setBasePath(hudiSplit.getBasePath()); - fileDesc.setDataFilePath(hudiSplit.getDataFilePath()); - fileDesc.setDataFileLength(hudiSplit.getFileLength()); - fileDesc.setDeltaLogs(hudiSplit.getHudiDeltaLogs()); - fileDesc.setColumnNames(hudiSplit.getHudiColumnNames()); - fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes()); - // TODO(gaoxin): support complex types - // fileDesc.setNestedFields(hudiSplit.getNestedFields()); + if (rangeDesc.getFormatType() == TFileFormatType.FORMAT_JNI) { + fileDesc.setInstantTime(hudiSplit.getInstantTime()); + fileDesc.setInputFormat(hudiSplit.getInputFormat()); + fileDesc.setSerde(hudiSplit.getSerde()); + fileDesc.setBasePath(hudiSplit.getBasePath()); + fileDesc.setDataFilePath(hudiSplit.getDataFilePath()); + fileDesc.setDataFileLength(hudiSplit.getFileLength()); + fileDesc.setDeltaLogs(hudiSplit.getHudiDeltaLogs()); + fileDesc.setColumnNames(hudiSplit.getHudiColumnNames()); + fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes()); + // TODO(gaoxin): support complex types + // fileDesc.setNestedFields(hudiSplit.getNestedFields()); + fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner()); + } else { + HudiSchemaCacheValue hudiSchemaCacheValue = HudiUtils.getSchemaCacheValue(hmsTable); + if (hudiSchemaCacheValue.isEnableSchemaEvolution()) { + long commitInstantTime = Long.parseLong(FSUtils.getCommitTime( + new File(hudiSplit.getPath().get()).getName())); + InternalSchema internalSchema = hudiSchemaCacheValue + .getCommitInstantInternalSchema(hudiClient, commitInstantTime); + params.history_schema_info.computeIfAbsent( + internalSchema.schemaId(), + k -> HudiUtils.getSchemaInfo(internalSchema)); + fileDesc.setSchemaId(internalSchema.schemaId()); //for schema change. (native reader) + } + } tableFormatFileDesc.setHudiParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } @@ -318,6 +339,7 @@ private List getIncrementalSplits() { incrementalRelation.getEndTs())).collect(Collectors.toList()); } + private void getPartitionSplits(HivePartition partition, List splits) throws IOException { String partitionName; @@ -332,11 +354,14 @@ private void getPartitionSplits(HivePartition partition, List splits) thr fsView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { noLogsSplitNum.incrementAndGet(); String filePath = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); // Need add hdfs host to location LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); - splits.add(new FileSplit(locationPath, 0, fileSize, fileSize, 0, - new String[0], partition.getPartitionValues())); + HudiSplit hudiSplit = new HudiSplit(locationPath, 0, fileSize, fileSize, + new String[0], partition.getPartitionValues()); + hudiSplit.setTableFormatType(TableFormatType.HUDI); + splits.add(hudiSplit); }); } else { fsView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 25539c8247704c..87680686986e13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; public class PaimonScanNode extends FileQueryScanNode { @@ -132,7 +133,7 @@ protected void doInitialize() throws UserException { source = new PaimonSource(desc); serializedTable = encodeObjectToString(source.getPaimonTable()); Preconditions.checkNotNull(source); - params.setPaimonSchemaInfo(new HashMap<>()); + params.setHistorySchemaInfo(new ConcurrentHashMap<>()); } @VisibleForTesting @@ -170,12 +171,12 @@ protected Optional getSerializedTable() { return Optional.of(serializedTable); } - private Map getSchemaInfo(Long schemaId) { + private Map getSchemaInfo(Long schemaId) { PaimonExternalTable table = (PaimonExternalTable) source.getTargetTable(); TableSchema tableSchema = table.getPaimonSchemaCacheValue(schemaId).getTableSchema(); - Map columnIdToName = new HashMap<>(tableSchema.fields().size()); + Map columnIdToName = new HashMap<>(tableSchema.fields().size()); for (DataField dataField : tableSchema.fields()) { - columnIdToName.put((long) dataField.id(), dataField.name().toLowerCase()); + columnIdToName.put(dataField.id(), dataField.name().toLowerCase()); } return columnIdToName; @@ -203,7 +204,7 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) throw new RuntimeException("Unsupported file format: " + fileFormat); } fileDesc.setSchemaId(paimonSplit.getSchemaId()); - params.paimon_schema_info.computeIfAbsent(paimonSplit.getSchemaId(), this::getSchemaInfo); + params.history_schema_info.computeIfAbsent(paimonSplit.getSchemaId(), this::getSchemaInfo); } fileDesc.setFileFormat(fileFormat); fileDesc.setPaimonPredicate(encodeObjectToString(predicates)); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index c4a01bf2ec4e79..33f8f74e14f05e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -379,6 +379,7 @@ struct THudiFileDesc { 9: optional list column_types; 10: optional list nested_fields; 11: optional string hudi_jni_scanner; // deprecated + 12: optional i64 schema_id; // for schema change. (native reader) } struct TLakeSoulFileDesc { @@ -467,7 +468,7 @@ struct TFileScanRangeParams { // 1. Reduce the access to HMS and HDFS on the JNI side. // 2. There will be no inconsistency between the fe and be tables. 24: optional string serialized_table - 25: optional map> paimon_schema_info //paimon map> : for schema change. + 25: optional map> history_schema_info // paimon/hudi map> : for schema change. (native reader) } struct TFileRangeDesc { diff --git a/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out new file mode 100644 index 00000000000000..4da3be0b7c0eab --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out @@ -0,0 +1,65 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !parquet_1 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N +5 Eve 28 +6 Frank 35 + +-- !parquet_2 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N + +-- !parquet_3 -- +5 Eve 28 +6 Frank 35 + +-- !parquet_4 -- +6 Frank 35 + +-- !parquet_5 -- +5 Eve 28 +6 Frank 35 + +-- !parquet_6 -- +5 Eve +6 Frank + +-- !parquet_7 -- +5 28 + +-- !orc_1 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N +5 Eve 28 +6 Frank 35 + +-- !orc_2 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N + +-- !orc_3 -- +5 Eve 28 +6 Frank 35 + +-- !orc_4 -- +6 Frank 35 + +-- !orc_5 -- +5 Eve 28 +6 Frank 35 + +-- !orc_6 -- +5 Eve +6 Frank + +-- !orc_7 -- +5 28 + diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out b/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out new file mode 100644 index 00000000000000..13c9f5535646f9 --- /dev/null +++ b/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out @@ -0,0 +1,259 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !hudi_0 -- +20250314162744620 20250314162744620_0_0 1 84ab609d-947a-4a24-af9a-6360348cf977-0_0-80-105_20250314162744620.parquet 1 \N Alice \N \N +20250314162747350 20250314162747350_0_0 2 07b6bd0b-0f2c-4500-a8c8-75d3cd90e85e-0_0-88-112_20250314162747350.parquet 2 \N Bob \N \N +20250314162759470 20250314162759470_0_0 3 58382f07-0dca-431d-ad4d-d5d94140d60f-0_0-96-119_20250314162759470.parquet 3 \N Charlie New York \N +20250314162804702 20250314162804702_0_0 4 05d28f5c-acc5-4530-8163-c82bdf96b720-0_0-104-126_20250314162804702.parquet 4 \N David Los Angeles \N +20250314162809486 20250314162809486_0_0 5 9164c294-2606-4537-bb84-e7ba4dbb98e5-0_0-112-133_20250314162809486.parquet 5 \N Eve Chicago \N +20250314162813019 20250314162813019_0_0 6 43b432a3-3581-439b-83b6-6c171bd6492a-0_0-120-140_20250314162813019.parquet 6 85.5 Frank San Francisco \N +20250314162814849 20250314162814849_0_0 7 28ad4dfc-07ae-4108-926e-7ba35b1ac5ce-0_0-130-149_20250314162814849.parquet 7 90.0 Grace Seattle \N +20250314162817433 20250314162817433_0_0 8 a07d9dfb-791a-4cdc-bc7c-5f0d0d0d6a77-0_0-142-160_20250314162817433.parquet 8 95.5 Heidi Portland \N +20250314162822624 20250314162822624_0_0 9 91bcf0a8-708e-4f15-af6a-8a077da68184-0_0-154-171_20250314162822624.parquet 9 88.0 Ivan Denver \N +20250314162828063 20250314162828063_0_0 10 8df59b32-21a7-4a24-9fe4-eb8ef71956eb-0_0-166-182_20250314162828063.parquet 10 101.1 Judy Austin \N +20250314162847946 20250314162847946_0_0 11 6e2a56c2-9fbb-45cf-84fa-58815bda53ce-0_0-178-193_20250314162847946.parquet 11 222.2 QQ cn 24 + +-- !hudi_1 -- +8 95.5 Heidi Portland +10 101.1 Judy Austin +11 222.2 QQ cn + +-- !hudi_2 -- +6 85.5 Frank San Francisco +9 88.0 Ivan Denver + +-- !hudi_3 -- +7 90.0 Grace Seattle + +-- !hudi_4 -- +1 \N Alice \N +2 \N Bob \N +3 \N Charlie New York +4 \N David Los Angeles +5 \N Eve Chicago + +-- !hudi_5 -- +3 \N Charlie New York + +-- !hudi_6 -- +1 \N Alice \N +2 \N Bob \N + +-- !hudi_7 -- +6 85.5 Frank San Francisco + +-- !hudi_8 -- +6 85.5 Frank San Francisco +7 90.0 Grace Seattle +8 95.5 Heidi Portland +9 88.0 Ivan Denver +10 101.1 Judy Austin + +-- !hudi_9 -- +1 Alice + +-- !hudi_10 -- +3 \N Charlie New York +4 \N David Los Angeles +5 \N Eve Chicago +6 85.5 Frank San Francisco +7 90.0 Grace Seattle + +-- !hudi_11 -- +11 222.2 QQ cn 24 + +-- !hudi_12 -- +1 \N Alice \N \N +2 \N Bob \N \N +3 \N Charlie New York \N +4 \N David Los Angeles \N +5 \N Eve Chicago \N +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +10 101.1 Judy Austin \N + +-- !hudi_13 -- +11 222.2 QQ cn 24 + +-- !hudi_14 -- +11 222.2 QQ cn 24 + +-- !hudi_15 -- +11 222.2 QQ cn 24 + +-- !hudi_16 -- +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +11 222.2 QQ cn 24 + +-- !hudi_17 -- +11 222.2 QQ cn 24 + +-- !hudi_18 -- +1 \N Alice \N \N +2 \N Bob \N \N + +-- !hudi_19 -- +11 QQ 24 + +-- !hudi_20 -- +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +10 101.1 Judy Austin \N + +-- !hudi_0 -- +20250314163405965 20250314163405965_0_0 1 193e809e-9620-412e-ab3f-c408a84129ca-0_0-191-205_20250314163405965.parquet 1 \N Alice \N \N +20250314163409045 20250314163409045_0_0 2 d47ed400-2407-4ec3-a3ae-1bb8251edba1-0_0-199-212_20250314163409045.parquet 2 \N Bob \N \N +20250314163412409 20250314163412409_0_0 3 d82c289c-ffcb-4806-b893-d10d4ffe185e-0_0-207-219_20250314163412409.parquet 3 \N Charlie New York \N +20250314163416966 20250314163416966_0_0 4 b0c5e6d8-b9fd-4532-9a55-b65185719b84-0_0-215-226_20250314163416966.parquet 4 \N David Los Angeles \N +20250314163421827 20250314163421827_0_0 5 33648978-cbee-455a-a382-f40744a11509-0_0-223-233_20250314163421827.parquet 5 \N Eve Chicago \N +20250314163425482 20250314163425482_0_0 6 ce12666a-5f10-488c-a143-069d2b478922-0_0-231-240_20250314163425482.parquet 6 85.5 Frank San Francisco \N +20250314163426999 20250314163426999_0_0 7 6175143f-b2ea-40aa-ad98-7c06cf96b013-0_0-241-249_20250314163426999.parquet 7 90.0 Grace Seattle \N +20250314163429429 20250314163429429_0_0 8 fe1dc348-f4ed-4aff-996d-b2d391b92795-0_0-253-260_20250314163429429.parquet 8 95.5 Heidi Portland \N +20250314163434457 20250314163434457_0_0 9 873dbde7-1ca8-4d75-886f-055e1b4ead69-0_0-265-271_20250314163434457.parquet 9 88.0 Ivan Denver \N +20250314163439685 20250314163439685_0_0 10 ca84ae4f-b5b6-4c28-8168-551941f75586-0_0-277-282_20250314163439685.parquet 10 101.1 Judy Austin \N +20250314163446641 20250314163446641_0_0 11 0f944779-eaf3-431f-afc2-11720338bc34-0_0-289-293_20250314163446641.parquet 11 222.2 QQ cn 24 + +-- !hudi_1 -- +8 95.5 Heidi Portland +10 101.1 Judy Austin +11 222.2 QQ cn + +-- !hudi_2 -- +6 85.5 Frank San Francisco +9 88.0 Ivan Denver + +-- !hudi_3 -- +7 90.0 Grace Seattle + +-- !hudi_4 -- +1 \N Alice \N +2 \N Bob \N +3 \N Charlie New York +4 \N David Los Angeles +5 \N Eve Chicago + +-- !hudi_5 -- +3 \N Charlie New York + +-- !hudi_6 -- +1 \N Alice \N +2 \N Bob \N + +-- !hudi_7 -- +6 85.5 Frank San Francisco + +-- !hudi_8 -- +6 85.5 Frank San Francisco +7 90.0 Grace Seattle +8 95.5 Heidi Portland +9 88.0 Ivan Denver +10 101.1 Judy Austin + +-- !hudi_9 -- +1 Alice + +-- !hudi_10 -- +3 \N Charlie New York +4 \N David Los Angeles +5 \N Eve Chicago +6 85.5 Frank San Francisco +7 90.0 Grace Seattle + +-- !hudi_11 -- +11 222.2 QQ cn 24 + +-- !hudi_12 -- +1 \N Alice \N \N +2 \N Bob \N \N +3 \N Charlie New York \N +4 \N David Los Angeles \N +5 \N Eve Chicago \N +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +10 101.1 Judy Austin \N + +-- !hudi_13 -- +11 222.2 QQ cn 24 + +-- !hudi_14 -- +11 222.2 QQ cn 24 + +-- !hudi_15 -- +11 222.2 QQ cn 24 + +-- !hudi_16 -- +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +11 222.2 QQ cn 24 + +-- !hudi_17 -- +11 222.2 QQ cn 24 + +-- !hudi_18 -- +1 \N Alice \N \N +2 \N Bob \N \N + +-- !hudi_19 -- +11 QQ 24 + +-- !hudi_20 -- +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +10 101.1 Judy Austin \N + +-- !orc_time_travel -- +20250314162744620 20250314162744620_0_0 1 84ab609d-947a-4a24-af9a-6360348cf977-0_0-80-105_20250314162744620.parquet 1 \N Alice \N \N +20250314162747350 20250314162747350_0_0 2 07b6bd0b-0f2c-4500-a8c8-75d3cd90e85e-0_0-88-112_20250314162747350.parquet 2 \N Bob \N \N +20250314162759470 20250314162759470_0_0 3 58382f07-0dca-431d-ad4d-d5d94140d60f-0_0-96-119_20250314162759470.parquet 3 \N Charlie New York \N +20250314162804702 20250314162804702_0_0 4 05d28f5c-acc5-4530-8163-c82bdf96b720-0_0-104-126_20250314162804702.parquet 4 \N David Los Angeles \N +20250314162809486 20250314162809486_0_0 5 9164c294-2606-4537-bb84-e7ba4dbb98e5-0_0-112-133_20250314162809486.parquet 5 \N Eve Chicago \N +20250314162813019 20250314162813019_0_0 6 43b432a3-3581-439b-83b6-6c171bd6492a-0_0-120-140_20250314162813019.parquet 6 85.5 Frank San Francisco \N +20250314162814849 20250314162814849_0_0 7 28ad4dfc-07ae-4108-926e-7ba35b1ac5ce-0_0-130-149_20250314162814849.parquet 7 90.0 Grace Seattle \N +20250314162817433 20250314162817433_0_0 8 a07d9dfb-791a-4cdc-bc7c-5f0d0d0d6a77-0_0-142-160_20250314162817433.parquet 8 95.5 Heidi Portland \N + +-- !parquet_time_travel -- +20250314163405965 20250314163405965_0_0 1 193e809e-9620-412e-ab3f-c408a84129ca-0_0-191-205_20250314163405965.parquet 1 \N Alice \N \N +20250314163409045 20250314163409045_0_0 2 d47ed400-2407-4ec3-a3ae-1bb8251edba1-0_0-199-212_20250314163409045.parquet 2 \N Bob \N \N +20250314163412409 20250314163412409_0_0 3 d82c289c-ffcb-4806-b893-d10d4ffe185e-0_0-207-219_20250314163412409.parquet 3 \N Charlie New York \N +20250314163416966 20250314163416966_0_0 4 b0c5e6d8-b9fd-4532-9a55-b65185719b84-0_0-215-226_20250314163416966.parquet 4 \N David Los Angeles \N +20250314163421827 20250314163421827_0_0 5 33648978-cbee-455a-a382-f40744a11509-0_0-223-233_20250314163421827.parquet 5 \N Eve Chicago \N +20250314163425482 20250314163425482_0_0 6 ce12666a-5f10-488c-a143-069d2b478922-0_0-231-240_20250314163425482.parquet 6 85.5 Frank San Francisco \N + +-- !parquet_inc_1 -- +20250314163425482 20250314163425482_0_0 6 ce12666a-5f10-488c-a143-069d2b478922-0_0-231-240_20250314163425482.parquet 6 85.5 Frank San Francisco \N +20250314163426999 20250314163426999_0_0 7 6175143f-b2ea-40aa-ad98-7c06cf96b013-0_0-241-249_20250314163426999.parquet 7 90.0 Grace Seattle \N +20250314163429429 20250314163429429_0_0 8 fe1dc348-f4ed-4aff-996d-b2d391b92795-0_0-253-260_20250314163429429.parquet 8 95.5 Heidi Portland \N +20250314163434457 20250314163434457_0_0 9 873dbde7-1ca8-4d75-886f-055e1b4ead69-0_0-265-271_20250314163434457.parquet 9 88.0 Ivan Denver \N +20250314163439685 20250314163439685_0_0 10 ca84ae4f-b5b6-4c28-8168-551941f75586-0_0-277-282_20250314163439685.parquet 10 101.1 Judy Austin \N +20250314163446641 20250314163446641_0_0 11 0f944779-eaf3-431f-afc2-11720338bc34-0_0-289-293_20250314163446641.parquet 11 222.2 QQ cn 24 + +-- !parquet_inc_2 -- +20250314163425482 20250314163425482_0_0 6 ce12666a-5f10-488c-a143-069d2b478922-0_0-231-240_20250314163425482.parquet 6 85.5 Frank San Francisco \N +20250314163426999 20250314163426999_0_0 7 6175143f-b2ea-40aa-ad98-7c06cf96b013-0_0-241-249_20250314163426999.parquet 7 90.0 Grace Seattle \N +20250314163429429 20250314163429429_0_0 8 fe1dc348-f4ed-4aff-996d-b2d391b92795-0_0-253-260_20250314163429429.parquet 8 95.5 Heidi Portland \N +20250314163434457 20250314163434457_0_0 9 873dbde7-1ca8-4d75-886f-055e1b4ead69-0_0-265-271_20250314163434457.parquet 9 88.0 Ivan Denver \N + +-- !orc_inc_1 -- +20250314162814849 20250314162814849_0_0 7 28ad4dfc-07ae-4108-926e-7ba35b1ac5ce-0_0-130-149_20250314162814849.parquet 7 90.0 Grace Seattle \N +20250314162817433 20250314162817433_0_0 8 a07d9dfb-791a-4cdc-bc7c-5f0d0d0d6a77-0_0-142-160_20250314162817433.parquet 8 95.5 Heidi Portland \N +20250314162822624 20250314162822624_0_0 9 91bcf0a8-708e-4f15-af6a-8a077da68184-0_0-154-171_20250314162822624.parquet 9 88.0 Ivan Denver \N +20250314162828063 20250314162828063_0_0 10 8df59b32-21a7-4a24-9fe4-eb8ef71956eb-0_0-166-182_20250314162828063.parquet 10 101.1 Judy Austin \N +20250314162847946 20250314162847946_0_0 11 6e2a56c2-9fbb-45cf-84fa-58815bda53ce-0_0-178-193_20250314162847946.parquet 11 222.2 QQ cn 24 + +-- !orc_inc_2 -- +20250314162814849 20250314162814849_0_0 7 28ad4dfc-07ae-4108-926e-7ba35b1ac5ce-0_0-130-149_20250314162814849.parquet 7 90.0 Grace Seattle \N +20250314162817433 20250314162817433_0_0 8 a07d9dfb-791a-4cdc-bc7c-5f0d0d0d6a77-0_0-142-160_20250314162817433.parquet 8 95.5 Heidi Portland \N +20250314162822624 20250314162822624_0_0 9 91bcf0a8-708e-4f15-af6a-8a077da68184-0_0-154-171_20250314162822624.parquet 9 88.0 Ivan Denver \N + diff --git a/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy new file mode 100644 index 00000000000000..efaf7e1bfefb00 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_schema_change2", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String catalog_name = "iceberg_schema_change2" + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """ use test_db;""" + + qt_parquet_1 """ select * from sc_drop_add_parquet order by id; """ + qt_parquet_2 """ select * from sc_drop_add_parquet where age is NULL order by id; """ + qt_parquet_3 """ select * from sc_drop_add_parquet where age is not NULL order by id; """ + qt_parquet_4 """ select * from sc_drop_add_parquet where age > 28 order by id; """ + qt_parquet_5 """ select * from sc_drop_add_parquet where age >= 28 order by id; """ + qt_parquet_6 """ select id, name from sc_drop_add_parquet where age >= 28 order by id; """ + qt_parquet_7 """ select id, age from sc_drop_add_parquet where name="Eve" order by id; """ + + + + qt_orc_1 """ select * from sc_drop_add_orc order by id; """ + qt_orc_2 """ select * from sc_drop_add_orc where age is NULL order by id; """ + qt_orc_3 """ select * from sc_drop_add_orc where age is not NULL order by id; """ + qt_orc_4 """ select * from sc_drop_add_orc where age > 28 order by id; """ + qt_orc_5 """ select * from sc_drop_add_orc where age >= 28 order by id; """ + qt_orc_6 """ select id, name from sc_drop_add_orc where age >= 28 order by id; """ + qt_orc_7 """ select id, age from sc_drop_add_orc where name="Eve" order by id; """ + +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy new file mode 100644 index 00000000000000..648a4079a6eaed --- /dev/null +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hudi_schema_change", "p2,external,hudi,external_remote,external_remote_hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable hudi test") + return + } + + String catalog_name = "test_hudi_schema_change" + String props = context.config.otherConfigs.get("hudiEmrCatalog") + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + ${props} + ); + """ + + sql """ switch ${catalog_name};""" + sql """ use regression_hudi;""" + sql """ set enable_fallback_to_original_planner=false """ + sql """set force_jni_scanner = false;""" + + def hudi_sc_tbs = ["hudi_sc_orc_cow","hudi_sc_parquet_cow"] + + for (String hudi_sc_tb : hudi_sc_tbs) { + qt_hudi_0 """ SELECT * FROM ${hudi_sc_tb} ORDER BY id; """ + qt_hudi_1 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score > 90 ORDER BY id; """ + qt_hudi_2 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score < 90 ORDER BY id; """ + qt_hudi_3 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score = 90 ORDER BY id; """ + qt_hudi_4 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score IS NULL ORDER BY id; """ + qt_hudi_5 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE location = 'New York' ORDER BY id; """ + qt_hudi_6 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE location IS NULL ORDER BY id; """ + qt_hudi_7 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score > 85 AND location = 'San Francisco' ORDER BY id; """ + qt_hudi_8 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score < 100 OR location = 'Austin' ORDER BY id; """ + qt_hudi_9 """ SELECT id, full_name FROM ${hudi_sc_tb} WHERE full_name LIKE 'A%' ORDER BY id; """ + qt_hudi_10 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE id BETWEEN 3 AND 7 ORDER BY id; """ + qt_hudi_11 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE age > 20 ORDER BY id; """ + qt_hudi_12 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE age IS NULL ORDER BY id; """ + qt_hudi_13 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE score > 100 AND age IS NOT NULL ORDER BY id; """ + qt_hudi_14 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE location = 'cn' ORDER BY id; """ + qt_hudi_15 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE full_name = 'QQ' AND age > 20 ORDER BY id; """ + qt_hudi_16 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE score < 100 OR age < 25 ORDER BY id; """ + qt_hudi_17 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE age BETWEEN 20 AND 30 ORDER BY id; """ + qt_hudi_18 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE location IS NULL AND age IS NULL ORDER BY id; """ + qt_hudi_19 """ SELECT id, full_name, age FROM ${hudi_sc_tb} WHERE full_name LIKE 'Q%' AND age IS NOT NULL ORDER BY id; """ + qt_hudi_20 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE id > 5 AND age IS NULL ORDER BY id; """ + + + } + qt_orc_time_travel """ select * from hudi_sc_orc_cow FOR TIME AS OF "20250314162817433_0_0" order by id; """ //1-8 + qt_parquet_time_travel """ select * from hudi_sc_parquet_cow FOR TIME AS OF "20250314163425482" order by id; """//1-6 + + qt_parquet_inc_1 """ SELECT * from hudi_sc_parquet_cow@incr('beginTime'='20250314163421827') order by id; """ + qt_parquet_inc_2 """ SELECT * from hudi_sc_parquet_cow@incr('beginTime'='20250314163421827','endTime'="20250314163434457") order by id; """ + + qt_orc_inc_1 """ SELECT * from hudi_sc_orc_cow@incr('beginTime'='20250314162813019') order by id; """ + qt_orc_inc_2 """ SELECT * from hudi_sc_orc_cow@incr('beginTime'='20250314162813019','endTime'='20250314162822624') order by id; """ + + + sql """drop catalog if exists ${catalog_name};""" +} +/* + +spark-sql \ +--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ +--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ +--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ +--conf spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes=false + +set hoodie.schema.on.read.enable=true; +set hoodie.metadata.enable=false; +set hoodie.parquet.small.file.limit = 100; + + +CREATE TABLE hudi_sc_orc_cow ( + id int, + name string, + age int +) USING hudi +OPTIONS ( + type = 'cow', + primaryKey = 'id', + hoodie.base.file.format= 'orc' +); + +desc hudi_sc_orc_cow; +select * from hudi_sc_orc_cow; + +INSERT INTO hudi_sc_orc_cow VALUES (1, 'Alice', 25); +INSERT INTO hudi_sc_orc_cow VALUES (2, 'Bob', 30); + +-- id name age city +ALTER TABLE hudi_sc_orc_cow ADD COLUMNS (city string); +INSERT INTO hudi_sc_orc_cow VALUES (3, 'Charlie', 28, 'New York'); + +-- id name city +ALTER TABLE hudi_sc_orc_cow DROP COLUMN age; +INSERT INTO hudi_sc_orc_cow VALUES (4, 'David', 'Los Angeles'); + +-- id full_name city +ALTER TABLE hudi_sc_orc_cow RENAME COLUMN name TO full_name; +INSERT INTO hudi_sc_orc_cow VALUES (5, 'Eve', 'Chicago'); + +-- id score full_name city +ALTER TABLE hudi_sc_orc_cow ADD COLUMNS (score float AFTER id); +INSERT INTO hudi_sc_orc_cow VALUES (6,85.5, 'Frank', 'San Francisco'); + +-- id city score full_name +ALTER TABLE hudi_sc_orc_cow CHANGE COLUMN city city string AFTER id; +INSERT INTO hudi_sc_orc_cow VALUES (7, 'Seattle', 90.0, 'Grace'); + +ALTER TABLE hudi_sc_orc_cow CHANGE COLUMN score score double; +INSERT INTO hudi_sc_orc_cow VALUES (8, 'Portland', 95.5 , 'Heidi'); + +-- id location score full_name +ALTER TABLE hudi_sc_orc_cow RENAME COLUMN city TO location; +INSERT INTO hudi_sc_orc_cow VALUES (9, 'Denver', 88.0, 'Ivan'); + +-- id score full_name location +ALTER TABLE hudi_sc_orc_cow ALTER COLUMN location AFTER full_name; +INSERT INTO hudi_sc_orc_cow VALUES (10, 101.1,'Judy', 'Austin'); + + +select id,score,full_name,location from hudi_sc_orc_cow order by id; +1 NULL Alice NULL +2 NULL Bob NULL +3 NULL Charlie New York +4 NULL David Los Angeles +5 NULL Eve Chicago +6 85.5 Frank San Francisco +7 90.0 Grace Seattle +8 95.5 Heidi Portland +9 88.0 Ivan Denver +10 101.1 Judy Austin + +-- id score full_name location age +ALTER TABLE hudi_sc_orc_cow ADD COLUMN age int; +INSERT INTO hudi_sc_orc_cow VALUES (11, 222.2,'QQ', 'cn', 24); +*/ + From 3a782d9df7b60f034cd1304bcb04c4c2415e1cd4 Mon Sep 17 00:00:00 2001 From: daidai Date: Sun, 29 Jun 2025 01:06:49 +0800 Subject: [PATCH 2/3] fix build --- be/src/vec/exec/format/table/paimon_reader.h | 6 ++++-- .../hive/HiveMetaStoreClientHelper.java | 15 --------------- .../datasource/hudi/source/HudiScanNode.java | 6 ++---- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_reader.h index cc10fce12792af..f4a98940bfce5e 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_reader.h @@ -68,7 +68,8 @@ class PaimonOrcReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - const std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* + table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -102,7 +103,8 @@ class PaimonParquetReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - const std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* + table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 5042ad763e9589..623325e39bf07b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -836,21 +836,6 @@ public static InternalSchema getHudiTableSchema(HMSExternalTable table, boolean[ } } - if (internalSchemaOption.isPresent()) { - enableSchemaEvolution[0] = true; - return internalSchemaOption.get(); - } else { - try { - // schema evolution is not enabled. (hoodie.schema.on.read.enable = false). - enableSchemaEvolution[0] = false; - // AvroInternalSchemaConverter.convert() will generator field id. - return AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema(true)); - } catch (Exception e) { - throw new RuntimeException("Cannot get hudi table schema.", e); - } - } - } - public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction action) { // if hive config is not ready, then use hadoop kerberos to login AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index d5a04566e327b6..b717a881643dd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -30,7 +30,6 @@ import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; @@ -215,7 +214,7 @@ protected void doInitialize() throws UserException { .getExtMetaCacheMgr() .getFsViewProcessor(hmsTable.getCatalog()) .getFsView(hmsTable.getDbName(), hmsTable.getName(), hudiClient); - if (HudiUtils.getSchemaCacheValue(hmsTable).isEnableSchemaEvolution()) { + if (hudiSchemaCacheValue.isEnableSchemaEvolution()) { params.setHistorySchemaInfo(new ConcurrentHashMap<>()); } } @@ -268,9 +267,8 @@ private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) { fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes()); // TODO(gaoxin): support complex types // fileDesc.setNestedFields(hudiSplit.getNestedFields()); - fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner()); } else { - HudiSchemaCacheValue hudiSchemaCacheValue = HudiUtils.getSchemaCacheValue(hmsTable); + HudiSchemaCacheValue hudiSchemaCacheValue = HudiUtils.getSchemaCacheValue(hmsTable, queryInstant); if (hudiSchemaCacheValue.isEnableSchemaEvolution()) { long commitInstantTime = Long.parseLong(FSUtils.getCommitTime( new File(hudiSplit.getPath().get()).getName())); From 288d3feb0947c4ee3b32cd4595f640ec8764066e Mon Sep 17 00:00:00 2001 From: daidai Date: Sun, 29 Jun 2025 02:21:56 +0800 Subject: [PATCH 3/3] fix build --- be/src/vec/exec/format/orc/vorc_reader.cpp | 1 + be/src/vec/exec/format/orc/vorc_reader.h | 1 - be/src/vec/exec/format/table/hudi_reader.cpp | 4 ++-- be/src/vec/exec/format/table/hudi_reader.h | 6 ++++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 25d552ae9315ff..14784da43a6e3a 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -370,6 +370,7 @@ Status OrcReader::get_schema_col_name_attribute(std::vector* col_na bool* exist_attribute) { RETURN_IF_ERROR(_create_file_reader()); *exist_attribute = true; + auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(get_field_name_lower_case(&root_type, i)); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 7968c6fe9b2751..10d72844942d0c 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -138,7 +138,6 @@ class OrcReader : public GenericReader { //If you want to read the file by index instead of column name, set hive_use_column_names to false. Status init_reader( const std::vector* column_names, - const std::unordered_map* colname_to_value_range, const std::vector& missing_column_names, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, diff --git a/be/src/vec/exec/format/table/hudi_reader.cpp b/be/src/vec/exec/format/table/hudi_reader.cpp index aad4f810ffe50e..6caeb87badbe9e 100644 --- a/be/src/vec/exec/format/table/hudi_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_reader.cpp @@ -52,7 +52,7 @@ Status HudiReader::get_next_block_inner(Block* block, size_t* read_rows, bool* e Status HudiOrcReader::init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -71,7 +71,7 @@ Status HudiOrcReader::init_reader( Status HudiParquetReader::init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id, diff --git a/be/src/vec/exec/format/table/hudi_reader.h b/be/src/vec/exec/format/table/hudi_reader.h index cd063d9246271d..2779f296f66266 100644 --- a/be/src/vec/exec/format/table/hudi_reader.h +++ b/be/src/vec/exec/format/table/hudi_reader.h @@ -53,7 +53,8 @@ class HudiOrcReader final : public HudiReader { Status init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* + table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, @@ -72,7 +73,8 @@ class HudiParquetReader final : public HudiReader { Status init_reader( const std::vector& read_table_col_names, const std::unordered_map& table_col_id_table_name_map, - std::unordered_map* table_col_name_to_value_range, + const std::unordered_map* + table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const std::unordered_map* colname_to_slot_id,