diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index b53ce3a0a50619..14784da43a6e3a 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -313,6 +313,7 @@ Status OrcReader::_create_file_reader() { Status OrcReader::init_reader( const std::vector* column_names, + const std::vector& missing_column_names, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -320,6 +321,7 @@ Status OrcReader::init_reader( const std::unordered_map* slot_id_to_filter_conjuncts, const bool hive_use_column_names) { _column_names = column_names; + _missing_column_names_set.insert(missing_column_names.begin(), missing_column_names.end()); _colname_to_value_range = colname_to_value_range; _lazy_read_ctx.conjuncts = conjuncts; _is_acid = is_acid; @@ -363,14 +365,21 @@ Status OrcReader::get_parsed_schema(std::vector* col_names, } Status OrcReader::get_schema_col_name_attribute(std::vector* col_names, - std::vector* col_attributes, - std::string attribute) { + std::vector* col_attributes, + const std::string& attribute, + bool* exist_attribute) { RETURN_IF_ERROR(_create_file_reader()); + *exist_attribute = true; auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { col_names->emplace_back(get_field_name_lower_case(&root_type, i)); + + if (!root_type.getSubtype(i)->hasAttributeKey(attribute)) { + *exist_attribute = false; + return Status::OK(); + } col_attributes->emplace_back( - std::stol(root_type.getSubtype(i)->getAttributeValue(attribute))); + std::stoi(root_type.getSubtype(i)->getAttributeValue(attribute))); } return Status::OK(); } @@ -388,6 +397,11 @@ Status OrcReader::_init_read_columns() { _scan_params.__isset.slot_name_to_schema_pos; for (size_t i = 0; i < _column_names->size(); ++i) { auto& col_name = (*_column_names)[i]; + if (_missing_column_names_set.contains(col_name)) { + _missing_cols.emplace_back(col_name); + continue; + } + if (_is_hive1_orc_or_use_idx) { auto iter = _scan_params.slot_name_to_schema_pos.find(col_name); if (iter != _scan_params.slot_name_to_schema_pos.end()) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 98c31645b41183..10d72844942d0c 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -138,6 +138,7 @@ class OrcReader : public GenericReader { //If you want to read the file by index instead of column name, set hive_use_column_names to false. Status init_reader( const std::vector* column_names, + const std::vector& missing_column_names, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -178,8 +179,8 @@ class OrcReader : public GenericReader { std::vector* col_types) override; Status get_schema_col_name_attribute(std::vector* col_names, - std::vector* col_attributes, - std::string attribute); + std::vector* col_attributes, + const std::string& attribute, bool* exist_attribute); void set_table_col_to_file_col( std::unordered_map table_col_to_file_col) { _table_col_to_file_col = table_col_to_file_col; @@ -577,6 +578,9 @@ class OrcReader : public GenericReader { int64_t _range_size; const std::string& _ctz; const std::vector* _column_names; + // _missing_column_names_set: used in iceberg/hudi/paimon, the columns are dropped + // but added back(drop column a then add column a). Shouldn't read this column data in this case. + std::set _missing_column_names_set; int32_t _offset_days = 0; cctz::time_zone _time_zone; diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index 2593da837c3da6..d1aa8d4c6eda29 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -71,7 +71,7 @@ class FieldDescriptor { std::unordered_map _name_to_field; // Used in from_thrift, marking the next schema position that should be parsed size_t _next_schema_pos; - std::unordered_map _field_id_name_mapping; + std::map _field_id_name_mapping; void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable, FieldSchema* physical_field); @@ -135,6 +135,8 @@ class FieldDescriptor { bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; } + std::map get_field_id_name_map() { return _field_id_name_mapping; } + const doris::Slice get_column_name_from_field_id(int32_t id) const; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index d669a57c609cd3..519dd68579caad 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -345,6 +345,7 @@ Status ParquetReader::init_reader( for (const std::string& name : required_columns) { _missing_cols.emplace_back(name); } + } else { const auto& table_column_idxs = _scan_params.column_idxs; std::map table_col_id_to_idx; diff --git a/be/src/vec/exec/format/table/hudi_reader.cpp b/be/src/vec/exec/format/table/hudi_reader.cpp new file mode 100644 index 00000000000000..6caeb87badbe9e --- /dev/null +++ b/be/src/vec/exec/format/table/hudi_reader.cpp @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "hudi_reader.h" + +#include + +#include "common/status.h" +#include "runtime/runtime_state.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +Status HudiReader::get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) { + if (!_params.__isset.history_schema_info) [[unlikely]] { + exist_schema = false; + return Status::OK(); + } + + if (!_params.history_schema_info.contains(_range.table_format_params.hudi_params.schema_id)) + [[unlikely]] { + return Status::InternalError("hudi file schema info is missing in history schema info."); + } + + file_col_id_to_name = + _params.history_schema_info.at(_range.table_format_params.hudi_params.schema_id); + return Status::OK(); +} + +Status HudiReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block)); + RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block)); + return Status::OK(); +}; + +Status HudiOrcReader::init_reader( + const std::vector& read_table_col_names, + const std::unordered_map& table_col_id_table_name_map, + const std::unordered_map* table_col_name_to_value_range, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts) { + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info( + read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range)); + + auto* orc_reader = static_cast(_file_format_reader.get()); + orc_reader->set_table_col_to_file_col(_table_col_to_file_col); + return orc_reader->init_reader(&_all_required_col_names, _not_in_file_col_names, + &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, + row_descriptor, not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts); +} + +Status HudiParquetReader::init_reader( + const std::vector& read_table_col_names, + const std::unordered_map& table_col_id_table_name_map, + const std::unordered_map* table_col_name_to_value_range, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const std::unordered_map* colname_to_slot_id, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts) { + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info( + read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range)); + auto* parquet_reader = static_cast(_file_format_reader.get()); + parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); + + return parquet_reader->init_reader( + _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, + conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); +} + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/hudi_reader.h b/be/src/vec/exec/format/table/hudi_reader.h new file mode 100644 index 00000000000000..2779f296f66266 --- /dev/null +++ b/be/src/vec/exec/format/table/hudi_reader.h @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include +#include + +#include "vec/exec/format/orc/vorc_reader.h" +#include "vec/exec/format/parquet/vparquet_reader.h" +#include "vec/exec/format/table/table_format_reader.h" +namespace doris::vectorized { +#include "common/compile_check_begin.h" +class HudiReader : public TableFormatReader, public TableSchemaChangeHelper { +public: + HudiReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, + RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, + io::IOContext* io_ctx) + : TableFormatReader(std::move(file_format_reader), state, profile, params, range, + io_ctx) {}; + + ~HudiReader() override = default; + + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; + + Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; + + Status init_row_filters() final { return Status::OK(); }; +}; + +class HudiOrcReader final : public HudiReader { +public: + ENABLE_FACTORY_CREATOR(HudiOrcReader); + HudiOrcReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : HudiReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {}; + ~HudiOrcReader() final = default; + + Status init_reader( + const std::vector& read_table_col_names, + const std::unordered_map& table_col_id_table_name_map, + const std::unordered_map* + table_col_name_to_value_range, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts); +}; + +class HudiParquetReader final : public HudiReader { +public: + ENABLE_FACTORY_CREATOR(HudiParquetReader); + HudiParquetReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : HudiReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {}; + ~HudiParquetReader() final = default; + + Status init_reader( + const std::vector& read_table_col_names, + const std::unordered_map& table_col_id_table_name_map, + const std::unordered_map* + table_col_name_to_value_range, + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const std::unordered_map* colname_to_slot_id, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map* slot_id_to_filter_conjuncts); +}; +#include "common/compile_check_end.h" +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index d209eb7d271916..c297904ca417b7 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -93,32 +93,9 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_expand_block_if_need(block)); - // To support iceberg schema evolution. We change the column name in block to - // make it match with the column name in parquet file before reading data. and - // Set the name back to table column name before return this block. - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _table_col_to_file_col.find(col.name); - if (iter != _table_col_to_file_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } - + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block)); RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); - // Set the name back to table column name before return this block. - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _file_col_to_table_col.find(col.name); - if (iter != _file_col_to_table_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block)); if (_equality_delete_impl != nullptr) { RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block)); @@ -127,12 +104,6 @@ Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, return _shrink_block_if_need(block); } -Status IcebergTableReader::get_columns( - std::unordered_map* name_to_type, - std::unordered_set* missing_cols) { - return _file_format_reader->get_columns(name_to_type, missing_cols); -} - Status IcebergTableReader::init_row_filters() { // We get the count value by doris's be, so we don't need to read the delete file if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) { @@ -201,8 +172,9 @@ Status IcebergTableReader::_equality_delete_base( not_in_file_col_names, nullptr, {}, nullptr, nullptr, nullptr, nullptr, nullptr, false)); } else if (auto* orc_reader = typeid_cast(delete_reader.get())) { - RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, nullptr, {}, false, - {}, {}, nullptr, nullptr)); + RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, + not_in_file_col_names, nullptr, {}, false, {}, + {}, nullptr, nullptr)); } else { return Status::InternalError("Unsupported format of delete file"); } @@ -412,60 +384,6 @@ void IcebergTableReader::_sort_delete_rows(std::vector*>& d } } -/* - * Generate _all_required_col_names and _not_in_file_col_names. - * - * _all_required_col_names is all the columns required by user sql. - * If the column name has been modified after the data file was written, - * put the old name in data file to _all_required_col_names. - * - * _not_in_file_col_names is all the columns required by user sql but not in the data file. - * e.g. New columns added after this data file was written. - * The columns added with names used by old dropped columns should consider as a missing column, - * which should be in _not_in_file_col_names. - */ -void IcebergTableReader::_gen_file_col_names() { - _all_required_col_names.clear(); - _not_in_file_col_names.clear(); - for (int i = 0; i < _file_col_names.size(); ++i) { - auto name = _file_col_names[i]; - auto iter = _table_col_to_file_col.find(name); - if (iter == _table_col_to_file_col.end()) { - // If the user creates the iceberg table, directly append the parquet file that already exists, - // there is no 'iceberg.schema' field in the footer of parquet, the '_table_col_to_file_col' may be empty. - // Because we are ignoring case, so, it is converted to lowercase here - auto name_low = to_lower(name); - _all_required_col_names.emplace_back(name_low); - if (_has_iceberg_schema) { - _not_in_file_col_names.emplace_back(name); - } else { - _table_col_to_file_col.emplace(name, name_low); - _file_col_to_table_col.emplace(name_low, name); - if (name != name_low) { - _has_schema_change = true; - } - } - } else { - _all_required_col_names.emplace_back(iter->second); - } - } -} - -/* - * Generate _new_colname_to_value_range, by replacing the column name in - * _colname_to_value_range with column name in data file. - */ -void IcebergTableReader::_gen_new_colname_to_value_range() { - for (auto it = _colname_to_value_range->begin(); it != _colname_to_value_range->end(); it++) { - auto iter = _table_col_to_file_col.find(it->first); - if (iter == _table_col_to_file_col.end()) { - _new_colname_to_value_range.emplace(it->first, it->second); - } else { - _new_colname_to_value_range.emplace(iter->second, it->second); - } - } -} - void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFile* position_delete, size_t read_rows, bool file_path_column_dictionary_coded) { @@ -502,7 +420,7 @@ void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFil Status IcebergParquetReader::init_reader( const std::vector& file_col_names, - const std::unordered_map& col_id_name_map, + const std::unordered_map& col_id_name_map, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -510,17 +428,13 @@ Status IcebergParquetReader::init_reader( const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { _file_format = Fileformat::PARQUET; - ParquetReader* parquet_reader = static_cast(_file_format_reader.get()); - _col_id_name_map = col_id_name_map; - _file_col_names = file_col_names; - _colname_to_value_range = colname_to_value_range; - FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema(); - RETURN_IF_ERROR(_gen_col_name_maps(field_desc)); - _gen_file_col_names(); - _gen_new_colname_to_value_range(); + auto* parquet_reader = static_cast(_file_format_reader.get()); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, col_id_name_map, + colname_to_value_range)); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); parquet_reader->iceberg_sanitize(_all_required_col_names); RETURN_IF_ERROR(init_row_filters()); + return parquet_reader->init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, @@ -575,7 +489,7 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d Status IcebergOrcReader::init_reader( const std::vector& file_col_names, - const std::unordered_map& col_id_name_map, + const std::unordered_map& col_id_name_map, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -584,18 +498,16 @@ Status IcebergOrcReader::init_reader( const std::unordered_map* slot_id_to_filter_conjuncts) { _file_format = Fileformat::ORC; auto* orc_reader = static_cast(_file_format_reader.get()); - _col_id_name_map = col_id_name_map; - _file_col_names = file_col_names; - _colname_to_value_range = colname_to_value_range; - RETURN_IF_ERROR(_gen_col_name_maps(orc_reader)); - _gen_file_col_names(); - _gen_new_colname_to_value_range(); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, col_id_name_map, + colname_to_value_range)); + orc_reader->set_table_col_to_file_col(_table_col_to_file_col); RETURN_IF_ERROR(init_row_filters()); - return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, - conjuncts, false, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + return orc_reader->init_reader(&_all_required_col_names, _not_in_file_col_names, + &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, + row_descriptor, not_single_slot_filter_conjuncts, + slot_id_to_filter_conjuncts); } Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range, @@ -603,8 +515,9 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete OrcReader orc_delete_reader(_profile, _state, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx); std::unordered_map colname_to_value_range; - RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, &colname_to_value_range, - {}, false, {}, {}, nullptr, nullptr)); + RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, {}, + &colname_to_value_range, {}, false, {}, {}, + nullptr, nullptr)); std::unordered_map> partition_columns; @@ -625,61 +538,36 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete return Status::OK(); } -/* - * To support schema evolution, Iceberg write the column id to column name map to - * parquet file key_value_metadata. - * This function is to compare the table schema from FE (_col_id_name_map) with - * the schema in key_value_metadata for the current parquet file and generate two maps - * for future use: - * 1. table column name to parquet column name. - * 2. parquet column name to table column name. - * For example, parquet file has a column 'col1', - * after this file was written, iceberg changed the column name to 'col1_new'. - * The two maps would contain: - * 1. col1_new -> col1 - * 2. col1 -> col1_new - */ -Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor& field_desc) { +// To support schema evolution, Iceberg write the column id to column name map to parquet file key_value_metadata. +Status IcebergParquetReader::get_file_col_id_to_name( + bool& exist_schema, std::map& file_col_id_to_name) { + auto* parquet_reader = static_cast(_file_format_reader.get()); + FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema(); + if (field_desc.has_parquet_field_id()) { - for (const auto& pair : _col_id_name_map) { - auto name_slice = field_desc.get_column_name_from_field_id(pair.first); - if (name_slice.get_size() == 0) { - _has_schema_change = true; - } else { - auto name_string = name_slice.to_string(); - _table_col_to_file_col.emplace(pair.second, name_string); - _file_col_to_table_col.emplace(name_string, pair.second); - if (name_string != pair.second) { - _has_schema_change = true; - } - } - } + file_col_id_to_name = field_desc.get_field_id_name_map(); + } else { + //For early iceberg version, it doesn't write any schema information to Parquet file. + exist_schema = false; } + return Status::OK(); } -Status IcebergOrcReader::_gen_col_name_maps(OrcReader* orc_reader) { +//To support schema evolution, Iceberg write the column id to orc file attribute. +Status IcebergOrcReader::get_file_col_id_to_name( + bool& exist_schema, std::map& file_col_id_to_name) { + auto* orc_reader = static_cast(_file_format_reader.get()); + std::vector col_names; - std::vector col_ids; - RETURN_IF_ERROR( - orc_reader->get_schema_col_name_attribute(&col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE)); - _has_iceberg_schema = true; - _table_col_to_file_col.clear(); - _file_col_to_table_col.clear(); - for (size_t i = 0; i < col_ids.size(); i++) { - auto col_id = col_ids[i]; - auto& file_col_name = col_names[i]; - - if (_col_id_name_map.find(col_id) == _col_id_name_map.end()) { - _has_schema_change = true; - continue; - } - auto& table_col_name = _col_id_name_map[col_id]; - _table_col_to_file_col.emplace(table_col_name, file_col_name); - _file_col_to_table_col.emplace(file_col_name, table_col_name); - if (table_col_name != file_col_name) { - _has_schema_change = true; - } + std::vector col_ids; + RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute( + &col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema)); + if (!exist_schema) { + return Status::OK(); + } + for (auto i = 0; i < col_names.size(); i++) { + file_col_id_to_name.emplace(col_ids[i], std::move(col_names[i])); } return Status::OK(); } diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 8a2b253d16f9c5..5407415060b098 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -65,7 +65,7 @@ class GenericReader; class ShardedKVCache; class VExprContext; -class IcebergTableReader : public TableFormatReader { +class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHelper { public: struct PositionDeleteRange { std::vector data_file_path; @@ -82,9 +82,6 @@ class IcebergTableReader : public TableFormatReader { Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; - Status get_columns(std::unordered_map* name_to_type, - std::unordered_set* missing_cols) final; - enum { DATA, POSITION_DELETE, EQUALITY_DELETE }; enum Fileformat { NONE, PARQUET, ORC, AVRO }; @@ -116,9 +113,6 @@ class IcebergTableReader : public TableFormatReader { PositionDeleteRange _get_range(const ColumnString& file_path_column); - void _gen_file_col_names(); - - void _gen_new_colname_to_value_range(); static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; } Status _position_delete_base(const std::string data_file_path, @@ -138,28 +132,9 @@ class IcebergTableReader : public TableFormatReader { ShardedKVCache* _kv_cache; IcebergProfile _iceberg_profile; std::vector _iceberg_delete_rows; - // col names from _file_slot_descs - std::vector _file_col_names; - // file column name to table column name map. For iceberg schema evolution. - std::unordered_map _file_col_to_table_col; - // table column name to file column name map. For iceberg schema evolution. - std::unordered_map _table_col_to_file_col; - const std::unordered_map* _colname_to_value_range; - // copy from _colname_to_value_range with new column name that is in parquet/orc file, to support schema evolution. - std::unordered_map _new_colname_to_value_range; - // column id to name map. Collect from FE slot descriptor. - std::unordered_map _col_id_name_map; - // col names in the parquet,orc file - std::vector _all_required_col_names; - // col names in table but not in parquet,orc file - std::vector _not_in_file_col_names; - // equality delete should read the primary columns std::vector _expand_col_names; std::vector _expand_columns; - bool _has_schema_change = false; - bool _has_iceberg_schema = false; - Fileformat _file_format = Fileformat::NONE; const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; @@ -194,7 +169,7 @@ class IcebergParquetReader final : public IcebergTableReader { kv_cache, io_ctx) {} Status init_reader( const std::vector& file_col_names, - const std::unordered_map& col_id_name_map, + const std::unordered_map& col_id_name_map, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -210,7 +185,8 @@ class IcebergParquetReader final : public IcebergTableReader { parquet_reader->set_delete_rows(&_iceberg_delete_rows); } - Status _gen_col_name_maps(const FieldDescriptor& field_desc); + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; protected: std::unique_ptr _create_equality_reader( @@ -240,7 +216,7 @@ class IcebergOrcReader final : public IcebergTableReader { Status init_reader( const std::vector& file_col_names, - const std::unordered_map& col_id_name_map, + const std::unordered_map& col_id_name_map, const std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, @@ -248,7 +224,8 @@ class IcebergOrcReader final : public IcebergTableReader { const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts); - Status _gen_col_name_maps(OrcReader* orc_reader); + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; protected: std::unique_ptr _create_equality_reader( diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index e3fba810bbaf72..4dfe71337676da 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -37,65 +37,19 @@ PaimonReader::PaimonReader(std::unique_ptr file_format_reader, ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile); } -Status PaimonReader::gen_file_col_name( - const std::vector& read_table_col_names, - const std::unordered_map& table_col_id_table_name_map, - const std::unordered_map* - table_col_name_to_value_range) { - // It is a bit similar to iceberg. I will consider integrating it when I write hudi schema change later. - _table_col_to_file_col.clear(); - _file_col_to_table_col.clear(); - - if (!_params.__isset.paimon_schema_info) [[unlikely]] { +Status PaimonReader::get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) { + if (!_params.__isset.history_schema_info) [[unlikely]] { return Status::RuntimeError("miss paimon schema info."); } - if (!_params.paimon_schema_info.contains(_range.table_format_params.paimon_params.schema_id)) + if (!_params.history_schema_info.contains(_range.table_format_params.paimon_params.schema_id)) [[unlikely]] { return Status::InternalError("miss paimon schema info."); } - const auto& table_id_to_file_name = - _params.paimon_schema_info.at(_range.table_format_params.paimon_params.schema_id); - for (auto [table_col_id, file_col_name] : table_id_to_file_name) { - if (table_col_id_table_name_map.find(table_col_id) == table_col_id_table_name_map.end()) { - continue; - } - auto& table_col_name = table_col_id_table_name_map.at(table_col_id); - - _table_col_to_file_col.emplace(table_col_name, file_col_name); - _file_col_to_table_col.emplace(file_col_name, table_col_name); - if (table_col_name != file_col_name) { - _has_schema_change = true; - } - } - - _all_required_col_names.clear(); - _not_in_file_col_names.clear(); - for (auto name : read_table_col_names) { - auto iter = _table_col_to_file_col.find(name); - if (iter == _table_col_to_file_col.end()) { - auto name_low = to_lower(name); - _all_required_col_names.emplace_back(name_low); - - _table_col_to_file_col.emplace(name, name_low); - _file_col_to_table_col.emplace(name_low, name); - if (name != name_low) { - _has_schema_change = true; - } - } else { - _all_required_col_names.emplace_back(iter->second); - } - } - - for (auto& it : *table_col_name_to_value_range) { - auto iter = _table_col_to_file_col.find(it.first); - if (iter == _table_col_to_file_col.end()) { - _new_colname_to_value_range.emplace(it.first, it.second); - } else { - _new_colname_to_value_range.emplace(iter->second, it.second); - } - } + file_col_id_to_name = + _params.history_schema_info.at(_range.table_format_params.paimon_params.schema_id); return Status::OK(); } @@ -168,29 +122,9 @@ Status PaimonReader::init_row_filters() { } Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _table_col_to_file_col.find(col.name); - if (iter != _table_col_to_file_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } - + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block)); RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); - - if (_has_schema_change) { - for (int i = 0; i < block->columns(); i++) { - ColumnWithTypeAndName& col = block->get_by_position(i); - auto iter = _file_col_to_table_col.find(col.name); - if (iter != _file_col_to_table_col.end()) { - col.name = iter->second; - } - } - block->initialize_index_by_name(); - } + RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block)); return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_reader.h index 11fffca943788f..f4a98940bfce5e 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_reader.h @@ -25,7 +25,7 @@ #include "vec/exec/format/table/table_format_reader.h" namespace doris::vectorized { -class PaimonReader : public TableFormatReader { +class PaimonReader : public TableFormatReader, public TableSchemaChangeHelper { public: PaimonReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, @@ -37,11 +37,8 @@ class PaimonReader : public TableFormatReader { Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; - Status gen_file_col_name( - const std::vector& read_table_col_names, - const std::unordered_map& table_col_id_table_name_map, - const std::unordered_map* - table_col_name_to_value_range); + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) final; protected: struct PaimonProfile { @@ -51,16 +48,6 @@ class PaimonReader : public TableFormatReader { std::vector _delete_rows; PaimonProfile _paimon_profile; - std::unordered_map _new_colname_to_value_range; - - std::unordered_map _file_col_to_table_col; - std::unordered_map _table_col_to_file_col; - - std::vector _all_required_col_names; - std::vector _not_in_file_col_names; - - bool _has_schema_change = false; - virtual void set_delete_rows() = 0; }; @@ -80,21 +67,22 @@ class PaimonOrcReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, - const std::unordered_map& table_col_id_table_name_map, + const std::unordered_map& table_col_id_table_name_map, const std::unordered_map* table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { - RETURN_IF_ERROR(gen_file_col_name(read_table_col_names, table_col_id_table_name_map, - table_col_name_to_value_range)); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info( + read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range)); + auto* orc_reader = static_cast(_file_format_reader.get()); orc_reader->set_table_col_to_file_col(_table_col_to_file_col); - return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, - conjuncts, false, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, - slot_id_to_filter_conjuncts); + return orc_reader->init_reader( + &_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, + conjuncts, false, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); } }; @@ -114,7 +102,7 @@ class PaimonParquetReader final : public PaimonReader { Status init_reader( const std::vector& read_table_col_names, - const std::unordered_map& table_col_id_table_name_map, + const std::unordered_map& table_col_id_table_name_map, const std::unordered_map* table_col_name_to_value_range, const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, @@ -122,8 +110,8 @@ class PaimonParquetReader final : public PaimonReader { const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts) { - RETURN_IF_ERROR(gen_file_col_name(read_table_col_names, table_col_id_table_name_map, - table_col_name_to_value_range)); + RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info( + read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range)); auto* parquet_reader = static_cast(_file_format_reader.get()); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp b/be/src/vec/exec/format/table/table_format_reader.cpp new file mode 100644 index 00000000000000..86f0dea38e4ff7 --- /dev/null +++ b/be/src/vec/exec/format/table/table_format_reader.cpp @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "table_format_reader.h" + +#include +#include + +#include "common/status.h" +#include "vec/core/block.h" +#include "vec/exec/format/generic_reader.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +Status TableSchemaChangeHelper::init_schema_info( + const std::vector& read_table_col_names, + const std::unordered_map& table_id_to_name, + const std::unordered_map* + table_col_name_to_value_range) { + bool exist_schema = true; + std::map file_id_to_name; + RETURN_IF_ERROR(get_file_col_id_to_name(exist_schema, file_id_to_name)); + if (!exist_schema) { + file_id_to_name.clear(); + for (const auto& [table_col_id, table_col_name] : table_id_to_name) { + file_id_to_name.emplace(table_col_id, table_col_name); + } + } + + /** This is to compare the table schema from FE (table_id_to_name) with + * the current file schema (file_id_to_name) , generate two maps for future use: + * 1. table column name to file column name. + * 2. file column name to table column name. + * For example, file has a column 'col1', + * after this file was written, iceberg changed the column name to 'col1_new'. + * The two maps would contain: + * 1. col1_new -> col1 + * 2. col1 -> col1_new + */ + for (const auto& [file_col_id, file_col_name] : file_id_to_name) { + if (table_id_to_name.find(file_col_id) == table_id_to_name.end()) { + continue; + } + + auto& table_col_name = table_id_to_name.at(file_col_id); + _table_col_to_file_col.emplace(table_col_name, file_col_name); + _file_col_to_table_col.emplace(file_col_name, table_col_name); + if (table_col_name != file_col_name) { + _has_schema_change = true; + } + } + + /** Generate _all_required_col_names and _not_in_file_col_names. + * + * _all_required_col_names is all the columns required by user sql. + * If the column name has been modified after the data file was written, + * put the old name in data file to _all_required_col_names. + * + * _not_in_file_col_names is all the columns required by user sql but not in the data file. + * e.g. New columns added after this data file was written. + * The columns added with names used by old dropped columns should consider as a missing column, + * which should be in _not_in_file_col_names. + */ + _all_required_col_names.clear(); + _not_in_file_col_names.clear(); + for (auto table_col_name : read_table_col_names) { + auto iter = _table_col_to_file_col.find(table_col_name); + if (iter == _table_col_to_file_col.end()) { + _all_required_col_names.emplace_back(table_col_name); + _not_in_file_col_names.emplace_back(table_col_name); + } else { + _all_required_col_names.emplace_back(iter->second); + } + } + + /** Generate _new_colname_to_value_range, by replacing the column name in + * _colname_to_value_range with column name in data file. + */ + for (auto& it : *table_col_name_to_value_range) { + auto iter = _table_col_to_file_col.find(it.first); + if (iter == _table_col_to_file_col.end()) { + _new_colname_to_value_range.emplace(it.first, it.second); + } else { + _new_colname_to_value_range.emplace(iter->second, it.second); + } + } + return Status::OK(); +} + +Status TableSchemaChangeHelper::get_next_block_before(Block* block) const { + if (_has_schema_change) { + for (int i = 0; i < block->columns(); i++) { + ColumnWithTypeAndName& col = block->get_by_position(i); + auto iter = _table_col_to_file_col.find(col.name); + if (iter != _table_col_to_file_col.end()) { + col.name = iter->second; + } + } + block->initialize_index_by_name(); + } + return Status::OK(); +} + +Status TableSchemaChangeHelper::get_next_block_after(Block* block) const { + if (_has_schema_change) { + for (int i = 0; i < block->columns(); i++) { + ColumnWithTypeAndName& col = block->get_by_position(i); + auto iter = _file_col_to_table_col.find(col.name); + if (iter != _file_col_to_table_col.end()) { + col.name = iter->second; + } + } + block->initialize_index_by_name(); + } + return Status::OK(); +} +#include "common/compile_check_end.h" +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index 0257a94a09b79a..72569dc9106fee 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -22,6 +22,7 @@ #include #include "common/status.h" +#include "exec/olap_common.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "vec/core/block.h" @@ -79,7 +80,7 @@ class TableFormatReader : public GenericReader { virtual Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) = 0; Status get_columns(std::unordered_map* name_to_type, - std::unordered_set* missing_cols) override { + std::unordered_set* missing_cols) final { return _file_format_reader->get_columns(name_to_type, missing_cols); } @@ -91,7 +92,7 @@ class TableFormatReader : public GenericReader { Status set_fill_columns( const std::unordered_map>& partition_columns, - const std::unordered_map& missing_columns) override { + const std::unordered_map& missing_columns) final { return _file_format_reader->set_fill_columns(partition_columns, missing_columns); } @@ -115,4 +116,44 @@ class TableFormatReader : public GenericReader { } }; +class TableSchemaChangeHelper { +public: + /** Get the mapping from the unique ID of the column in the current file to the file column name. + * Iceberg/Hudi/Paimon usually maintains field IDs to support schema changes. If you cannot obtain this + * information (maybe the old version does not have this information), you need to set `exist_schema` = `false`. + */ + virtual Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) = 0; + + virtual ~TableSchemaChangeHelper() = default; + +protected: + /** table_id_to_name : table column unique id to table name map */ + Status init_schema_info(const std::vector& read_table_col_names, + const std::unordered_map& table_id_to_name, + const std::unordered_map* + table_col_name_to_value_range); + + /** To support schema evolution. We change the column name in block to + * make it match with the column name in file before reading data. and + * set the name back to table column name before return this block. + */ + Status get_next_block_before(Block* block) const; + + /** Set the name back to table column name before return this block.*/ + Status get_next_block_after(Block* block) const; + + // copy from _colname_to_value_range with new column name that is in parquet/orc file + std::unordered_map _new_colname_to_value_range; + // all the columns required by user sql. + std::vector _all_required_col_names; + // col names in table but not in parquet,orc file + std::vector _not_in_file_col_names; + bool _has_schema_change = false; + // file column name to table column name map + std::unordered_map _file_col_to_table_col; + // table column name to file column name map. + std::unordered_map _table_col_to_file_col; +}; + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index f1d02c3639926f..620ca8673631a9 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -60,8 +60,8 @@ Status TransactionalHiveReader::init_reader( _col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end()); Status status = orc_reader->init_reader( - &_col_names, colname_to_value_range, conjuncts, true, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + &_col_names, {}, colname_to_value_range, conjuncts, true, tuple_descriptor, + row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); return status; } @@ -78,12 +78,6 @@ Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t* read_ return res; } -Status TransactionalHiveReader::get_columns( - std::unordered_map* name_to_type, - std::unordered_set* missing_cols) { - return _file_format_reader->get_columns(name_to_type, missing_cols); -} - Status TransactionalHiveReader::init_row_filters() { std::string data_file_path = _range.path; // the path in _range is remove the namenode prefix, @@ -123,9 +117,9 @@ Status TransactionalHiveReader::init_row_filters() { OrcReader delete_reader(_profile, _state, _params, delete_range, _MIN_BATCH_SIZE, _state->timezone(), _io_ctx, false); - RETURN_IF_ERROR( - delete_reader.init_reader(&TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, - nullptr, {}, false, nullptr, nullptr, nullptr, nullptr)); + RETURN_IF_ERROR(delete_reader.init_reader( + &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, {}, nullptr, {}, false, + nullptr, nullptr, nullptr, nullptr)); std::unordered_map> partition_columns; diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h index f27f33f45635fc..5fbca8635243d9 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.h +++ b/be/src/vec/exec/format/table/transactional_hive_reader.h @@ -91,9 +91,6 @@ class TransactionalHiveReader : public TableFormatReader { Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; - Status get_columns(std::unordered_map* name_to_type, - std::unordered_set* missing_cols) final; - Status init_reader( const std::vector& column_names, const std::unordered_map* colname_to_value_range, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 1c17e788b005be..724297a5f90094 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -62,6 +62,7 @@ #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/exec/format/table/hudi_jni_reader.h" +#include "vec/exec/format/table/hudi_reader.h" #include "vec/exec/format/table/iceberg_reader.h" #include "vec/exec/format/table/lakesoul_jni_reader.h" #include "vec/exec/format/table/max_compute_jni_reader.h" @@ -971,6 +972,17 @@ Status VFileScanner::_get_next_reader() { &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "hudi") { + std::unique_ptr hudi_reader = + HudiParquetReader::create_unique(std::move(parquet_reader), _profile, + _state, *_params, range, _io_ctx.get()); + init_status = hudi_reader->init_reader( + _file_col_names, _col_id_name_map, _colname_to_value_range, + _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), + _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, + &_slot_id_to_filter_conjuncts); + _cur_reader = std::move(hudi_reader); } else { bool hive_parquet_use_column_names = true; @@ -1035,6 +1047,16 @@ Status VFileScanner::_get_next_reader() { &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "hudi") { + std::unique_ptr hudi_reader = HudiOrcReader::create_unique( + std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get()); + + init_status = hudi_reader->init_reader( + _file_col_names, _col_id_name_map, _colname_to_value_range, + _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); + _cur_reader = std::move(hudi_reader); } else { bool hive_orc_use_column_names = true; @@ -1044,7 +1066,7 @@ Status VFileScanner::_get_next_reader() { hive_orc_use_column_names = _state->query_options().hive_orc_use_column_names; } init_status = orc_reader->init_reader( - &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, + &_file_col_names, {}, _colname_to_value_range, _push_down_conjuncts, false, _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, hive_orc_use_column_names); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 9bd1f3e2aa1533..5b1604209d49d0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -107,7 +107,7 @@ class VFileScanner : public VScanner { // col names from _file_slot_descs std::vector _file_col_names; // column id to name map. Collect from FE slot descriptor. - std::unordered_map _col_id_name_map; + std::unordered_map _col_id_name_map; // Partition source slot descriptors std::vector _partition_slot_descs; diff --git a/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp b/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp index 8dafed48a97bf4..8063549afc5ced 100644 --- a/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp +++ b/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp @@ -51,11 +51,26 @@ class PaimonMockReader final : public PaimonReader { table_col_to_file_col_ans["d"] = "struct_col"; table_col_to_file_col_ans["a"] = "vvv"; table_col_to_file_col_ans["c"] = "k"; - table_col_to_file_col_ans["nonono"] = "nonono"; for (auto [table_col, file_col] : table_col_to_file_col_ans) { ASSERT_TRUE(_table_col_to_file_col[table_col] == file_col); ASSERT_TRUE(_file_col_to_table_col[file_col] == table_col); } + ASSERT_TRUE(_all_required_col_names.size() == 6); + + std::set all_required_col_names_set; + all_required_col_names_set.emplace("map_col"); + all_required_col_names_set.emplace("array_col"); + all_required_col_names_set.emplace("struct_col"); + all_required_col_names_set.emplace("vvv"); + all_required_col_names_set.emplace("k"); + all_required_col_names_set.emplace("nonono"); + + for (auto i : _all_required_col_names) { + ASSERT_TRUE(all_required_col_names_set.contains(i)); + } + + ASSERT_TRUE(_not_in_file_col_names.size() == 1); + ASSERT_TRUE(_not_in_file_col_names.back() == "nonono"); } }; @@ -65,7 +80,6 @@ class PaimonReaderTest : public ::testing::Test { _profile = new RuntimeProfile("test_profile"); _state = new RuntimeState(TQueryGlobals()); _io_ctx = new io::IOContext(); - _schema_file_path = "./be/test/exec/test_data/paimon_scanner/schema-0"; } void TearDown() override { @@ -77,11 +91,10 @@ class PaimonReaderTest : public ::testing::Test { RuntimeProfile* _profile; RuntimeState* _state; io::IOContext* _io_ctx; - std::string _schema_file_path; }; TEST_F(PaimonReaderTest, ReadSchemaFile) { - std::map file_id_to_name; + std::map file_id_to_name; file_id_to_name[0] = "k"; file_id_to_name[1] = "vvv"; file_id_to_name[2] = "array_col"; @@ -92,8 +105,8 @@ TEST_F(PaimonReaderTest, ReadSchemaFile) { params.file_type = TFileType::FILE_LOCAL; params.properties = {}; params.hdfs_params = {}; - params.__isset.paimon_schema_info = true; - params.paimon_schema_info[0] = file_id_to_name; + params.__isset.history_schema_info = true; + params.history_schema_info[0] = file_id_to_name; TFileRangeDesc range; range.table_format_params.paimon_params.schema_id = 0; @@ -118,7 +131,7 @@ TEST_F(PaimonReaderTest, ReadSchemaFile) { read_table_col_names.emplace_back("e"); read_table_col_names.emplace_back("nonono"); - std::unordered_map table_col_id_table_name_map; + std::unordered_map table_col_id_table_name_map; table_col_id_table_name_map[1] = "a"; table_col_id_table_name_map[6] = "b"; table_col_id_table_name_map[0] = "c"; @@ -127,8 +140,8 @@ TEST_F(PaimonReaderTest, ReadSchemaFile) { table_col_id_table_name_map[10] = "nonono"; std::unordered_map table_col_name_to_value_range; - Status status = reader.gen_file_col_name(read_table_col_names, table_col_id_table_name_map, - &table_col_name_to_value_range); + Status status = reader.init_schema_info(read_table_col_names, table_col_id_table_name_map, + &table_col_name_to_value_range); ASSERT_TRUE(status.ok()); reader.check(); } diff --git a/be/test/vec/exec/format/table/table_schema_change_helper_test.cpp b/be/test/vec/exec/format/table/table_schema_change_helper_test.cpp new file mode 100644 index 00000000000000..a7940eb4a544d1 --- /dev/null +++ b/be/test/vec/exec/format/table/table_schema_change_helper_test.cpp @@ -0,0 +1,471 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include + +#include "common/status.h" +#include "vec/columns/column_string.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_string.h" +#include "vec/exec/format/table/table_format_reader.h" + +namespace doris::vectorized { +class MockTableSchemaChangeHelper : public TableSchemaChangeHelper { +public: + MockTableSchemaChangeHelper(std::map file_schema, bool exist_schema = true) + : _file_schema(std::move(file_schema)), _exist_schema(exist_schema) {} + + Status get_file_col_id_to_name(bool& exist_schema, + std::map& file_col_id_to_name) override { + exist_schema = _exist_schema; + if (_exist_schema) { + file_col_id_to_name = _file_schema; + } + return Status::OK(); + } + + bool has_schema_change() const { return _has_schema_change; } + const std::vector& all_required_col_names() const { + return _all_required_col_names; + } + const std::vector& not_in_file_col_names() const { return _not_in_file_col_names; } + const std::unordered_map& file_col_to_table_col() const { + return _file_col_to_table_col; + } + const std::unordered_map& table_col_to_file_col() const { + return _table_col_to_file_col; + } + const std::unordered_map& new_colname_to_value_range() + const { + return _new_colname_to_value_range; + } + +private: + std::map _file_schema; + bool _exist_schema; +}; + +TEST(TableSchemaChangeHelperTest, NoSchemaChange) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}}; + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::vector read_cols = {"col1", "col3"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 2); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col3"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); +} + +TEST(TableSchemaChangeHelperTest, WithSchemaChange) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}, {3, "col3_old"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3_new"}}; + + std::vector read_cols = {"col1", "col2_new", "col3_new"}; + + std::unordered_map col_ranges = { + {"col2_new", ColumnValueRangeType()}}; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 3); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2_old"); + ASSERT_EQ(helper.all_required_col_names()[2], "col3_old"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); + + ASSERT_EQ(helper.table_col_to_file_col().size(), 3); + ASSERT_EQ(helper.table_col_to_file_col().at("col2_new"), "col2_old"); + ASSERT_EQ(helper.table_col_to_file_col().at("col3_new"), "col3_old"); + + ASSERT_EQ(helper.file_col_to_table_col().size(), 3); + ASSERT_EQ(helper.file_col_to_table_col().at("col2_old"), "col2_new"); + ASSERT_EQ(helper.file_col_to_table_col().at("col3_old"), "col3_new"); + + ASSERT_EQ(helper.new_colname_to_value_range().size(), 1); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col2_old") != + helper.new_colname_to_value_range().end()); +} + +TEST(TableSchemaChangeHelperTest, MissingColumns) { + std::map file_schema = {{1, "col1"}, {2, "col2"} + + }; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col3"}, {4, "col4"}}; + std::vector read_cols = {"col1", "col3", "col4"}; + std::unordered_map col_ranges = { + {"col3", ColumnValueRangeType()}}; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 3); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col3"); + ASSERT_EQ(helper.all_required_col_names()[2], "col4"); + ASSERT_EQ(helper.not_in_file_col_names().size(), 2); + ASSERT_EQ(helper.not_in_file_col_names()[0], "col3"); + ASSERT_EQ(helper.not_in_file_col_names()[1], "col4"); +} + +TEST(TableSchemaChangeHelperTest, NoFileSchema) { + std::map file_schema; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::vector read_cols = {"col1", "col2"}; + std::unordered_map col_ranges; + MockTableSchemaChangeHelper helper(file_schema, false); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 2); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); +} + +TEST(TableSchemaChangeHelperTest, MixedScenario) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}, {4, "col4_old"}}; + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3"}, {4, "col4_new"}, {5, "col5"}}; + std::vector read_cols = {"col1", "col2_new", "col3", "col4_new", "col5"}; + std::unordered_map col_ranges = { + {"col2_new", ColumnValueRangeType()}, + {"col3", ColumnValueRangeType()}, + {"col5", ColumnValueRangeType()}}; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + ASSERT_TRUE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 5); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2_old"); + ASSERT_EQ(helper.all_required_col_names()[2], "col3"); + ASSERT_EQ(helper.all_required_col_names()[3], "col4_old"); + ASSERT_EQ(helper.all_required_col_names()[4], "col5"); + ASSERT_EQ(helper.not_in_file_col_names().size(), 2); + ASSERT_EQ(helper.not_in_file_col_names()[0], "col3"); + ASSERT_EQ(helper.not_in_file_col_names()[1], "col5"); + ASSERT_EQ(helper.table_col_to_file_col().at("col2_new"), "col2_old"); + ASSERT_EQ(helper.table_col_to_file_col().at("col4_new"), "col4_old"); + ASSERT_EQ(helper.new_colname_to_value_range().size(), 3); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col2_old") != + helper.new_colname_to_value_range().end()); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col3") != + helper.new_colname_to_value_range().end()); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col5") != + helper.new_colname_to_value_range().end()); +} + +TEST(TableSchemaChangeHelperTest, EmptySchemas) { + std::map file_schema; + std::unordered_map table_id_to_name; + std::vector read_cols; + std::unordered_map col_ranges; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_TRUE(helper.all_required_col_names().empty()); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); + ASSERT_TRUE(helper.table_col_to_file_col().empty()); + ASSERT_TRUE(helper.file_col_to_table_col().empty()); + ASSERT_TRUE(helper.new_colname_to_value_range().empty()); +} + +TEST(TableSchemaChangeHelperTest, IdMismatch) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::unordered_map table_id_to_name = { + {10, "col1"}, {20, "col2"}, {30, "col3"}}; + + std::vector read_cols = {"col1", "col2", "col3"}; + + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 3); + ASSERT_EQ(helper.not_in_file_col_names().size(), 3); + ASSERT_TRUE(helper.table_col_to_file_col().empty()); + ASSERT_TRUE(helper.file_col_to_table_col().empty()); +} + +TEST(TableSchemaChangeHelperTest, DuplicateColumnNames) { + std::map file_schema = {{1, "col1"}, {2, "col2"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col2"}, {4, "col1"}}; + + std::vector read_cols = {"col1", "col2"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 2); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); + ASSERT_EQ(helper.table_col_to_file_col().size(), 2); +} + +TEST(TableSchemaChangeHelperTest, ValueRangeForNonReadColumns) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}, {4, "col4"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3"}, {4, "col4"}}; + + std::vector read_cols = {"col1", "col3"}; + + std::unordered_map col_ranges = { + {"col1", ColumnValueRangeType()}, + {"col2_new", ColumnValueRangeType()}, + {"col4", ColumnValueRangeType()}}; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + ASSERT_EQ(helper.all_required_col_names().size(), 2); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col3"); + ASSERT_TRUE(helper.not_in_file_col_names().empty()); + + ASSERT_EQ(helper.new_colname_to_value_range().size(), 3); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col1") != + helper.new_colname_to_value_range().end()); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col2") != + helper.new_colname_to_value_range().end()); + ASSERT_TRUE(helper.new_colname_to_value_range().find("col4") != + helper.new_colname_to_value_range().end()); +} + +TEST(TableSchemaChangeHelperTest, PartialIdMatch) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}, {4, "col4"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {20, "col2"}, {3, "col3_new"}, {40, "col4_new"}}; + std::vector read_cols = {"col1", "col2", "col3_new", "col4_new"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + + ASSERT_EQ(helper.all_required_col_names().size(), 4); + ASSERT_EQ(helper.all_required_col_names()[0], "col1"); + ASSERT_EQ(helper.all_required_col_names()[1], "col2"); + ASSERT_EQ(helper.all_required_col_names()[2], "col3"); + ASSERT_EQ(helper.all_required_col_names()[3], "col4_new"); + + ASSERT_EQ(helper.not_in_file_col_names().size(), 2); + ASSERT_EQ(helper.not_in_file_col_names()[0], "col2"); + ASSERT_EQ(helper.not_in_file_col_names()[1], "col4_new"); + + ASSERT_EQ(helper.table_col_to_file_col().size(), 2); + ASSERT_EQ(helper.table_col_to_file_col().at("col1"), "col1"); + ASSERT_EQ(helper.table_col_to_file_col().at("col3_new"), "col3"); +} + +Block create_test_block(const std::vector& column_names) { + Block block; + for (const auto& name : column_names) { + auto column = ColumnString::create(); + block.insert( + ColumnWithTypeAndName(std::move(column), std::make_shared(), name)); + } + return block; +} + +TEST(TableSchemaChangeHelperTest, BasicColumnNameConversion) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}, {3, "col3_old"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3_new"}}; + + std::vector read_cols = {"col1", "col2_new", "col3_new"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + + Block before_block = create_test_block({"col1", "col2_new", "col3_new"}); + ASSERT_TRUE(helper.get_next_block_before(&before_block).ok()); + + ASSERT_EQ(before_block.get_by_position(0).name, "col1"); + ASSERT_EQ(before_block.get_by_position(1).name, "col2_old"); + ASSERT_EQ(before_block.get_by_position(2).name, "col3_old"); + + Block after_block = create_test_block({"col1", "col2_old", "col3_old"}); + ASSERT_TRUE(helper.get_next_block_after(&after_block).ok()); + + ASSERT_EQ(after_block.get_by_position(0).name, "col1"); + ASSERT_EQ(after_block.get_by_position(1).name, "col2_new"); + ASSERT_EQ(after_block.get_by_position(2).name, "col3_new"); +} + +TEST(TableSchemaChangeHelperTest, NoSchemaChangeBlocks) { + std::map file_schema = {{1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2"}, {3, "col3"}}; + + std::vector read_cols = {"col1", "col2", "col3"}; + + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_FALSE(helper.has_schema_change()); + + Block before_block = create_test_block({"col1", "col2", "col3"}); + ASSERT_TRUE(helper.get_next_block_before(&before_block).ok()); + + ASSERT_EQ(before_block.get_by_position(0).name, "col1"); + ASSERT_EQ(before_block.get_by_position(1).name, "col2"); + ASSERT_EQ(before_block.get_by_position(2).name, "col3"); + + Block after_block = create_test_block({"col1", "col2", "col3"}); + ASSERT_TRUE(helper.get_next_block_after(&after_block).ok()); + + ASSERT_EQ(after_block.get_by_position(0).name, "col1"); + ASSERT_EQ(after_block.get_by_position(1).name, "col2"); + ASSERT_EQ(after_block.get_by_position(2).name, "col3"); +} + +TEST(TableSchemaChangeHelperTest, MixedColumnNameConversion) { + std::map file_schema = { + {1, "col1"}, {2, "col2_old"}, {3, "col3"}, {4, "col4_old"}}; + + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3"}, {4, "col4_new"}, {5, "col5"}}; + + std::vector read_cols = {"col1", "col2_new", "col3", "col4_new", "col5"}; + + std::unordered_map col_ranges; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + Block before_block = + create_test_block({"col1", "col2_new", "col3", "col4_new", "col5", "extra_col"}); + + ASSERT_TRUE(helper.get_next_block_before(&before_block).ok()); + + ASSERT_EQ(before_block.get_by_position(0).name, "col1"); + ASSERT_EQ(before_block.get_by_position(1).name, "col2_old"); + ASSERT_EQ(before_block.get_by_position(2).name, "col3"); + ASSERT_EQ(before_block.get_by_position(3).name, "col4_old"); + ASSERT_EQ(before_block.get_by_position(4).name, "col5"); + ASSERT_EQ(before_block.get_by_position(5).name, "extra_col"); + + Block after_block = + create_test_block({"col1", "col2_old", "col3", "col4_old", "col5", "extra_col"}); + + ASSERT_TRUE(helper.get_next_block_after(&after_block).ok()); + ASSERT_EQ(after_block.get_by_position(0).name, "col1"); + ASSERT_EQ(after_block.get_by_position(1).name, "col2_new"); + ASSERT_EQ(after_block.get_by_position(2).name, "col3"); + ASSERT_EQ(after_block.get_by_position(3).name, "col4_new"); + ASSERT_EQ(after_block.get_by_position(4).name, "col5"); + ASSERT_EQ(after_block.get_by_position(5).name, "extra_col"); +} + +TEST(TableSchemaChangeHelperTest, EmptyAndSingleColumnBlocks) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}}; + + std::unordered_map table_id_to_name = {{1, "col1"}, {2, "col2_new"}}; + + std::vector read_cols = {"col1", "col2_new"}; + std::unordered_map col_ranges; + + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + ASSERT_TRUE(helper.has_schema_change()); + + Block empty_block; + ASSERT_TRUE(helper.get_next_block_before(&empty_block).ok()); + ASSERT_TRUE(helper.get_next_block_after(&empty_block).ok()); + ASSERT_EQ(empty_block.columns(), 0); + + Block single_block1 = create_test_block({"col1"}); + ASSERT_TRUE(helper.get_next_block_before(&single_block1).ok()); + ASSERT_EQ(single_block1.get_by_position(0).name, "col1"); + + ASSERT_TRUE(helper.get_next_block_after(&single_block1).ok()); + ASSERT_EQ(single_block1.get_by_position(0).name, "col1"); + + Block single_block2 = create_test_block({"col2_new"}); + ASSERT_TRUE(helper.get_next_block_before(&single_block2).ok()); + ASSERT_EQ(single_block2.get_by_position(0).name, "col2_old"); + + Block single_block3 = create_test_block({"col2_old"}); + ASSERT_TRUE(helper.get_next_block_after(&single_block3).ok()); + ASSERT_EQ(single_block3.get_by_position(0).name, "col2_new"); +} + +TEST(TableSchemaChangeHelperTest, ColumnOrderChange) { + std::map file_schema = {{1, "col1"}, {2, "col2_old"}, {3, "col3_old"}}; + std::unordered_map table_id_to_name = { + {1, "col1"}, {2, "col2_new"}, {3, "col3_new"}}; + std::vector read_cols = {"col1", "col2_new", "col3_new"}; + std::unordered_map col_ranges; + MockTableSchemaChangeHelper helper(file_schema); + ASSERT_TRUE(helper.init_schema_info(read_cols, table_id_to_name, &col_ranges).ok()); + + ASSERT_TRUE(helper.has_schema_change()); + + Block before_block = create_test_block({"col3_new", "col1", "col2_new"}); + ASSERT_TRUE(helper.get_next_block_before(&before_block).ok()); + + ASSERT_EQ(before_block.get_by_position(0).name, "col3_old"); + ASSERT_EQ(before_block.get_by_position(1).name, "col1"); + ASSERT_EQ(before_block.get_by_position(2).name, "col2_old"); + + Block after_block = create_test_block({"col3_old", "col1", "col2_old"}); + ASSERT_TRUE(helper.get_next_block_after(&after_block).ok()); + + ASSERT_EQ(after_block.get_by_position(0).name, "col3_new"); + ASSERT_EQ(after_block.get_by_position(1).name, "col1"); + ASSERT_EQ(after_block.get_by_position(2).name, "col2_new"); +} +} // namespace doris::vectorized diff --git a/be/test/vec/exec/orc_reader_test.cpp b/be/test/vec/exec/orc_reader_test.cpp index ff7452ae625428..e27bdf08c9d5e7 100644 --- a/be/test/vec/exec/orc_reader_test.cpp +++ b/be/test/vec/exec/orc_reader_test.cpp @@ -66,8 +66,8 @@ class OrcReaderTest : public testing::Test { range.start_offset = 0; range.size = 1293; auto reader = OrcReader::create_unique(params, range, "", nullptr, true); - auto status = reader->init_reader(&column_names, nullptr, {}, false, tuple_desc, &row_desc, - nullptr, nullptr); + auto status = reader->init_reader(&column_names, {}, nullptr, {}, false, tuple_desc, + &row_desc, nullptr, nullptr); EXPECT_TRUE(status.ok()); // deserialize expr diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run10.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run10.sql new file mode 100644 index 00000000000000..1a3d844ef6027e --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run10.sql @@ -0,0 +1,48 @@ + +use demo.test_db; + +CREATE TABLE sc_drop_add_orc ( + id BIGINT, + name STRING, + age INT +) +USING iceberg +PARTITIONED BY (id) +TBLPROPERTIES ('format'='orc'); + +INSERT INTO sc_drop_add_orc VALUES (1, 'Alice', 25); +INSERT INTO sc_drop_add_orc VALUES (2, 'Bob', 30); + +ALTER TABLE sc_drop_add_orc DROP COLUMN age; + +INSERT INTO sc_drop_add_orc (id, name) VALUES (3, 'Charlie'); +INSERT INTO sc_drop_add_orc (id, name) VALUES (4, 'David'); + +ALTER TABLE sc_drop_add_orc ADD COLUMN age INT; + +INSERT INTO sc_drop_add_orc VALUES (5, 'Eve', 28); +INSERT INTO sc_drop_add_orc VALUES (6, 'Frank', 35); + + + +CREATE TABLE sc_drop_add_parquet ( + id BIGINT, + name STRING, + age INT +) +USING iceberg +PARTITIONED BY (id) +TBLPROPERTIES ('format'='parquet'); + +INSERT INTO sc_drop_add_parquet VALUES (1, 'Alice', 25); +INSERT INTO sc_drop_add_parquet VALUES (2, 'Bob', 30); + +ALTER TABLE sc_drop_add_parquet DROP COLUMN age; + +INSERT INTO sc_drop_add_parquet (id, name) VALUES (3, 'Charlie'); +INSERT INTO sc_drop_add_parquet (id, name) VALUES (4, 'David'); + +ALTER TABLE sc_drop_add_parquet ADD COLUMN age INT; + +INSERT INTO sc_drop_add_parquet VALUES (5, 'Eve', 28); +INSERT INTO sc_drop_add_parquet VALUES (6, 'Frank', 35); \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index 74bf07fff1aacb..623325e39bf07b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -836,7 +836,6 @@ public static InternalSchema getHudiTableSchema(HMSExternalTable table, boolean[ } } - public static T ugiDoAs(Configuration conf, PrivilegedExceptionAction action) { // if hive config is not ready, then use hadoop kerberos to login AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(conf); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java index 843dded27969ad..abb89dc32e7705 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java @@ -18,7 +18,7 @@ package org.apache.doris.datasource.hudi.source; import org.apache.doris.common.util.LocationPath; -import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.TableFormatType; import org.apache.doris.spi.Split; import org.apache.hadoop.conf.Configuration; @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.stream.Collectors; public class COWIncrementalRelation implements IncrementalRelation { @@ -212,19 +213,22 @@ public List collectSplits() throws HoodieException { Option partitionColumns = metaClient.getTableConfig().getPartitionFields(); List partitionNames = partitionColumns.isPresent() ? Arrays.asList(partitionColumns.get()) : Collections.emptyList(); - for (String baseFile : filteredMetaBootstrapFullPaths) { + + Consumer generatorSplit = baseFile -> { HoodieWriteStat stat = fileToWriteStat.get(baseFile); - splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0, - stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), - 0, new String[0], - HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath()))); + LocationPath locationPath = new LocationPath(baseFile, optParams); + HudiSplit hudiSplit = new HudiSplit(locationPath, 0, + stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), new String[0], + HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath())); + hudiSplit.setTableFormatType(TableFormatType.HUDI); + splits.add(hudiSplit); + }; + + for (String baseFile : filteredMetaBootstrapFullPaths) { + generatorSplit.accept(baseFile); } for (String baseFile : filteredRegularFullPaths) { - HoodieWriteStat stat = fileToWriteStat.get(baseFile); - splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0, - stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), - 0, new String[0], - HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath()))); + generatorSplit.accept(baseFile); } return splits; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 96ee3c08605a31..b717a881643dd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -30,7 +30,6 @@ import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; @@ -60,10 +59,12 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.storage.StoragePath; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -72,6 +73,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -212,6 +214,9 @@ protected void doInitialize() throws UserException { .getExtMetaCacheMgr() .getFsViewProcessor(hmsTable.getCatalog()) .getFsView(hmsTable.getDbName(), hmsTable.getName(), hudiClient); + if (hudiSchemaCacheValue.isEnableSchemaEvolution()) { + params.setHistorySchemaInfo(new ConcurrentHashMap<>()); + } } @Override @@ -250,17 +255,31 @@ private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(hudiSplit.getTableFormatType().value()); THudiFileDesc fileDesc = new THudiFileDesc(); - fileDesc.setInstantTime(hudiSplit.getInstantTime()); - fileDesc.setSerde(hudiSplit.getSerde()); - fileDesc.setInputFormat(hudiSplit.getInputFormat()); - fileDesc.setBasePath(hudiSplit.getBasePath()); - fileDesc.setDataFilePath(hudiSplit.getDataFilePath()); - fileDesc.setDataFileLength(hudiSplit.getFileLength()); - fileDesc.setDeltaLogs(hudiSplit.getHudiDeltaLogs()); - fileDesc.setColumnNames(hudiSplit.getHudiColumnNames()); - fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes()); - // TODO(gaoxin): support complex types - // fileDesc.setNestedFields(hudiSplit.getNestedFields()); + if (rangeDesc.getFormatType() == TFileFormatType.FORMAT_JNI) { + fileDesc.setInstantTime(hudiSplit.getInstantTime()); + fileDesc.setInputFormat(hudiSplit.getInputFormat()); + fileDesc.setSerde(hudiSplit.getSerde()); + fileDesc.setBasePath(hudiSplit.getBasePath()); + fileDesc.setDataFilePath(hudiSplit.getDataFilePath()); + fileDesc.setDataFileLength(hudiSplit.getFileLength()); + fileDesc.setDeltaLogs(hudiSplit.getHudiDeltaLogs()); + fileDesc.setColumnNames(hudiSplit.getHudiColumnNames()); + fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes()); + // TODO(gaoxin): support complex types + // fileDesc.setNestedFields(hudiSplit.getNestedFields()); + } else { + HudiSchemaCacheValue hudiSchemaCacheValue = HudiUtils.getSchemaCacheValue(hmsTable, queryInstant); + if (hudiSchemaCacheValue.isEnableSchemaEvolution()) { + long commitInstantTime = Long.parseLong(FSUtils.getCommitTime( + new File(hudiSplit.getPath().get()).getName())); + InternalSchema internalSchema = hudiSchemaCacheValue + .getCommitInstantInternalSchema(hudiClient, commitInstantTime); + params.history_schema_info.computeIfAbsent( + internalSchema.schemaId(), + k -> HudiUtils.getSchemaInfo(internalSchema)); + fileDesc.setSchemaId(internalSchema.schemaId()); //for schema change. (native reader) + } + } tableFormatFileDesc.setHudiParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } @@ -318,6 +337,7 @@ private List getIncrementalSplits() { incrementalRelation.getEndTs())).collect(Collectors.toList()); } + private void getPartitionSplits(HivePartition partition, List splits) throws IOException { String partitionName; @@ -332,11 +352,14 @@ private void getPartitionSplits(HivePartition partition, List splits) thr fsView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { noLogsSplitNum.incrementAndGet(); String filePath = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); // Need add hdfs host to location LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); - splits.add(new FileSplit(locationPath, 0, fileSize, fileSize, 0, - new String[0], partition.getPartitionValues())); + HudiSplit hudiSplit = new HudiSplit(locationPath, 0, fileSize, fileSize, + new String[0], partition.getPartitionValues()); + hudiSplit.setTableFormatType(TableFormatType.HUDI); + splits.add(hudiSplit); }); } else { fsView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 25539c8247704c..87680686986e13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; public class PaimonScanNode extends FileQueryScanNode { @@ -132,7 +133,7 @@ protected void doInitialize() throws UserException { source = new PaimonSource(desc); serializedTable = encodeObjectToString(source.getPaimonTable()); Preconditions.checkNotNull(source); - params.setPaimonSchemaInfo(new HashMap<>()); + params.setHistorySchemaInfo(new ConcurrentHashMap<>()); } @VisibleForTesting @@ -170,12 +171,12 @@ protected Optional getSerializedTable() { return Optional.of(serializedTable); } - private Map getSchemaInfo(Long schemaId) { + private Map getSchemaInfo(Long schemaId) { PaimonExternalTable table = (PaimonExternalTable) source.getTargetTable(); TableSchema tableSchema = table.getPaimonSchemaCacheValue(schemaId).getTableSchema(); - Map columnIdToName = new HashMap<>(tableSchema.fields().size()); + Map columnIdToName = new HashMap<>(tableSchema.fields().size()); for (DataField dataField : tableSchema.fields()) { - columnIdToName.put((long) dataField.id(), dataField.name().toLowerCase()); + columnIdToName.put(dataField.id(), dataField.name().toLowerCase()); } return columnIdToName; @@ -203,7 +204,7 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) throw new RuntimeException("Unsupported file format: " + fileFormat); } fileDesc.setSchemaId(paimonSplit.getSchemaId()); - params.paimon_schema_info.computeIfAbsent(paimonSplit.getSchemaId(), this::getSchemaInfo); + params.history_schema_info.computeIfAbsent(paimonSplit.getSchemaId(), this::getSchemaInfo); } fileDesc.setFileFormat(fileFormat); fileDesc.setPaimonPredicate(encodeObjectToString(predicates)); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index c4a01bf2ec4e79..33f8f74e14f05e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -379,6 +379,7 @@ struct THudiFileDesc { 9: optional list column_types; 10: optional list nested_fields; 11: optional string hudi_jni_scanner; // deprecated + 12: optional i64 schema_id; // for schema change. (native reader) } struct TLakeSoulFileDesc { @@ -467,7 +468,7 @@ struct TFileScanRangeParams { // 1. Reduce the access to HMS and HDFS on the JNI side. // 2. There will be no inconsistency between the fe and be tables. 24: optional string serialized_table - 25: optional map> paimon_schema_info //paimon map> : for schema change. + 25: optional map> history_schema_info // paimon/hudi map> : for schema change. (native reader) } struct TFileRangeDesc { diff --git a/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out new file mode 100644 index 00000000000000..4da3be0b7c0eab --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out @@ -0,0 +1,65 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !parquet_1 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N +5 Eve 28 +6 Frank 35 + +-- !parquet_2 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N + +-- !parquet_3 -- +5 Eve 28 +6 Frank 35 + +-- !parquet_4 -- +6 Frank 35 + +-- !parquet_5 -- +5 Eve 28 +6 Frank 35 + +-- !parquet_6 -- +5 Eve +6 Frank + +-- !parquet_7 -- +5 28 + +-- !orc_1 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N +5 Eve 28 +6 Frank 35 + +-- !orc_2 -- +1 Alice \N +2 Bob \N +3 Charlie \N +4 David \N + +-- !orc_3 -- +5 Eve 28 +6 Frank 35 + +-- !orc_4 -- +6 Frank 35 + +-- !orc_5 -- +5 Eve 28 +6 Frank 35 + +-- !orc_6 -- +5 Eve +6 Frank + +-- !orc_7 -- +5 28 + diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out b/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out new file mode 100644 index 00000000000000..13c9f5535646f9 --- /dev/null +++ b/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out @@ -0,0 +1,259 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !hudi_0 -- +20250314162744620 20250314162744620_0_0 1 84ab609d-947a-4a24-af9a-6360348cf977-0_0-80-105_20250314162744620.parquet 1 \N Alice \N \N +20250314162747350 20250314162747350_0_0 2 07b6bd0b-0f2c-4500-a8c8-75d3cd90e85e-0_0-88-112_20250314162747350.parquet 2 \N Bob \N \N +20250314162759470 20250314162759470_0_0 3 58382f07-0dca-431d-ad4d-d5d94140d60f-0_0-96-119_20250314162759470.parquet 3 \N Charlie New York \N +20250314162804702 20250314162804702_0_0 4 05d28f5c-acc5-4530-8163-c82bdf96b720-0_0-104-126_20250314162804702.parquet 4 \N David Los Angeles \N +20250314162809486 20250314162809486_0_0 5 9164c294-2606-4537-bb84-e7ba4dbb98e5-0_0-112-133_20250314162809486.parquet 5 \N Eve Chicago \N +20250314162813019 20250314162813019_0_0 6 43b432a3-3581-439b-83b6-6c171bd6492a-0_0-120-140_20250314162813019.parquet 6 85.5 Frank San Francisco \N +20250314162814849 20250314162814849_0_0 7 28ad4dfc-07ae-4108-926e-7ba35b1ac5ce-0_0-130-149_20250314162814849.parquet 7 90.0 Grace Seattle \N +20250314162817433 20250314162817433_0_0 8 a07d9dfb-791a-4cdc-bc7c-5f0d0d0d6a77-0_0-142-160_20250314162817433.parquet 8 95.5 Heidi Portland \N +20250314162822624 20250314162822624_0_0 9 91bcf0a8-708e-4f15-af6a-8a077da68184-0_0-154-171_20250314162822624.parquet 9 88.0 Ivan Denver \N +20250314162828063 20250314162828063_0_0 10 8df59b32-21a7-4a24-9fe4-eb8ef71956eb-0_0-166-182_20250314162828063.parquet 10 101.1 Judy Austin \N +20250314162847946 20250314162847946_0_0 11 6e2a56c2-9fbb-45cf-84fa-58815bda53ce-0_0-178-193_20250314162847946.parquet 11 222.2 QQ cn 24 + +-- !hudi_1 -- +8 95.5 Heidi Portland +10 101.1 Judy Austin +11 222.2 QQ cn + +-- !hudi_2 -- +6 85.5 Frank San Francisco +9 88.0 Ivan Denver + +-- !hudi_3 -- +7 90.0 Grace Seattle + +-- !hudi_4 -- +1 \N Alice \N +2 \N Bob \N +3 \N Charlie New York +4 \N David Los Angeles +5 \N Eve Chicago + +-- !hudi_5 -- +3 \N Charlie New York + +-- !hudi_6 -- +1 \N Alice \N +2 \N Bob \N + +-- !hudi_7 -- +6 85.5 Frank San Francisco + +-- !hudi_8 -- +6 85.5 Frank San Francisco +7 90.0 Grace Seattle +8 95.5 Heidi Portland +9 88.0 Ivan Denver +10 101.1 Judy Austin + +-- !hudi_9 -- +1 Alice + +-- !hudi_10 -- +3 \N Charlie New York +4 \N David Los Angeles +5 \N Eve Chicago +6 85.5 Frank San Francisco +7 90.0 Grace Seattle + +-- !hudi_11 -- +11 222.2 QQ cn 24 + +-- !hudi_12 -- +1 \N Alice \N \N +2 \N Bob \N \N +3 \N Charlie New York \N +4 \N David Los Angeles \N +5 \N Eve Chicago \N +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +10 101.1 Judy Austin \N + +-- !hudi_13 -- +11 222.2 QQ cn 24 + +-- !hudi_14 -- +11 222.2 QQ cn 24 + +-- !hudi_15 -- +11 222.2 QQ cn 24 + +-- !hudi_16 -- +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +11 222.2 QQ cn 24 + +-- !hudi_17 -- +11 222.2 QQ cn 24 + +-- !hudi_18 -- +1 \N Alice \N \N +2 \N Bob \N \N + +-- !hudi_19 -- +11 QQ 24 + +-- !hudi_20 -- +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +10 101.1 Judy Austin \N + +-- !hudi_0 -- +20250314163405965 20250314163405965_0_0 1 193e809e-9620-412e-ab3f-c408a84129ca-0_0-191-205_20250314163405965.parquet 1 \N Alice \N \N +20250314163409045 20250314163409045_0_0 2 d47ed400-2407-4ec3-a3ae-1bb8251edba1-0_0-199-212_20250314163409045.parquet 2 \N Bob \N \N +20250314163412409 20250314163412409_0_0 3 d82c289c-ffcb-4806-b893-d10d4ffe185e-0_0-207-219_20250314163412409.parquet 3 \N Charlie New York \N +20250314163416966 20250314163416966_0_0 4 b0c5e6d8-b9fd-4532-9a55-b65185719b84-0_0-215-226_20250314163416966.parquet 4 \N David Los Angeles \N +20250314163421827 20250314163421827_0_0 5 33648978-cbee-455a-a382-f40744a11509-0_0-223-233_20250314163421827.parquet 5 \N Eve Chicago \N +20250314163425482 20250314163425482_0_0 6 ce12666a-5f10-488c-a143-069d2b478922-0_0-231-240_20250314163425482.parquet 6 85.5 Frank San Francisco \N +20250314163426999 20250314163426999_0_0 7 6175143f-b2ea-40aa-ad98-7c06cf96b013-0_0-241-249_20250314163426999.parquet 7 90.0 Grace Seattle \N +20250314163429429 20250314163429429_0_0 8 fe1dc348-f4ed-4aff-996d-b2d391b92795-0_0-253-260_20250314163429429.parquet 8 95.5 Heidi Portland \N +20250314163434457 20250314163434457_0_0 9 873dbde7-1ca8-4d75-886f-055e1b4ead69-0_0-265-271_20250314163434457.parquet 9 88.0 Ivan Denver \N +20250314163439685 20250314163439685_0_0 10 ca84ae4f-b5b6-4c28-8168-551941f75586-0_0-277-282_20250314163439685.parquet 10 101.1 Judy Austin \N +20250314163446641 20250314163446641_0_0 11 0f944779-eaf3-431f-afc2-11720338bc34-0_0-289-293_20250314163446641.parquet 11 222.2 QQ cn 24 + +-- !hudi_1 -- +8 95.5 Heidi Portland +10 101.1 Judy Austin +11 222.2 QQ cn + +-- !hudi_2 -- +6 85.5 Frank San Francisco +9 88.0 Ivan Denver + +-- !hudi_3 -- +7 90.0 Grace Seattle + +-- !hudi_4 -- +1 \N Alice \N +2 \N Bob \N +3 \N Charlie New York +4 \N David Los Angeles +5 \N Eve Chicago + +-- !hudi_5 -- +3 \N Charlie New York + +-- !hudi_6 -- +1 \N Alice \N +2 \N Bob \N + +-- !hudi_7 -- +6 85.5 Frank San Francisco + +-- !hudi_8 -- +6 85.5 Frank San Francisco +7 90.0 Grace Seattle +8 95.5 Heidi Portland +9 88.0 Ivan Denver +10 101.1 Judy Austin + +-- !hudi_9 -- +1 Alice + +-- !hudi_10 -- +3 \N Charlie New York +4 \N David Los Angeles +5 \N Eve Chicago +6 85.5 Frank San Francisco +7 90.0 Grace Seattle + +-- !hudi_11 -- +11 222.2 QQ cn 24 + +-- !hudi_12 -- +1 \N Alice \N \N +2 \N Bob \N \N +3 \N Charlie New York \N +4 \N David Los Angeles \N +5 \N Eve Chicago \N +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +10 101.1 Judy Austin \N + +-- !hudi_13 -- +11 222.2 QQ cn 24 + +-- !hudi_14 -- +11 222.2 QQ cn 24 + +-- !hudi_15 -- +11 222.2 QQ cn 24 + +-- !hudi_16 -- +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +11 222.2 QQ cn 24 + +-- !hudi_17 -- +11 222.2 QQ cn 24 + +-- !hudi_18 -- +1 \N Alice \N \N +2 \N Bob \N \N + +-- !hudi_19 -- +11 QQ 24 + +-- !hudi_20 -- +6 85.5 Frank San Francisco \N +7 90.0 Grace Seattle \N +8 95.5 Heidi Portland \N +9 88.0 Ivan Denver \N +10 101.1 Judy Austin \N + +-- !orc_time_travel -- +20250314162744620 20250314162744620_0_0 1 84ab609d-947a-4a24-af9a-6360348cf977-0_0-80-105_20250314162744620.parquet 1 \N Alice \N \N +20250314162747350 20250314162747350_0_0 2 07b6bd0b-0f2c-4500-a8c8-75d3cd90e85e-0_0-88-112_20250314162747350.parquet 2 \N Bob \N \N +20250314162759470 20250314162759470_0_0 3 58382f07-0dca-431d-ad4d-d5d94140d60f-0_0-96-119_20250314162759470.parquet 3 \N Charlie New York \N +20250314162804702 20250314162804702_0_0 4 05d28f5c-acc5-4530-8163-c82bdf96b720-0_0-104-126_20250314162804702.parquet 4 \N David Los Angeles \N +20250314162809486 20250314162809486_0_0 5 9164c294-2606-4537-bb84-e7ba4dbb98e5-0_0-112-133_20250314162809486.parquet 5 \N Eve Chicago \N +20250314162813019 20250314162813019_0_0 6 43b432a3-3581-439b-83b6-6c171bd6492a-0_0-120-140_20250314162813019.parquet 6 85.5 Frank San Francisco \N +20250314162814849 20250314162814849_0_0 7 28ad4dfc-07ae-4108-926e-7ba35b1ac5ce-0_0-130-149_20250314162814849.parquet 7 90.0 Grace Seattle \N +20250314162817433 20250314162817433_0_0 8 a07d9dfb-791a-4cdc-bc7c-5f0d0d0d6a77-0_0-142-160_20250314162817433.parquet 8 95.5 Heidi Portland \N + +-- !parquet_time_travel -- +20250314163405965 20250314163405965_0_0 1 193e809e-9620-412e-ab3f-c408a84129ca-0_0-191-205_20250314163405965.parquet 1 \N Alice \N \N +20250314163409045 20250314163409045_0_0 2 d47ed400-2407-4ec3-a3ae-1bb8251edba1-0_0-199-212_20250314163409045.parquet 2 \N Bob \N \N +20250314163412409 20250314163412409_0_0 3 d82c289c-ffcb-4806-b893-d10d4ffe185e-0_0-207-219_20250314163412409.parquet 3 \N Charlie New York \N +20250314163416966 20250314163416966_0_0 4 b0c5e6d8-b9fd-4532-9a55-b65185719b84-0_0-215-226_20250314163416966.parquet 4 \N David Los Angeles \N +20250314163421827 20250314163421827_0_0 5 33648978-cbee-455a-a382-f40744a11509-0_0-223-233_20250314163421827.parquet 5 \N Eve Chicago \N +20250314163425482 20250314163425482_0_0 6 ce12666a-5f10-488c-a143-069d2b478922-0_0-231-240_20250314163425482.parquet 6 85.5 Frank San Francisco \N + +-- !parquet_inc_1 -- +20250314163425482 20250314163425482_0_0 6 ce12666a-5f10-488c-a143-069d2b478922-0_0-231-240_20250314163425482.parquet 6 85.5 Frank San Francisco \N +20250314163426999 20250314163426999_0_0 7 6175143f-b2ea-40aa-ad98-7c06cf96b013-0_0-241-249_20250314163426999.parquet 7 90.0 Grace Seattle \N +20250314163429429 20250314163429429_0_0 8 fe1dc348-f4ed-4aff-996d-b2d391b92795-0_0-253-260_20250314163429429.parquet 8 95.5 Heidi Portland \N +20250314163434457 20250314163434457_0_0 9 873dbde7-1ca8-4d75-886f-055e1b4ead69-0_0-265-271_20250314163434457.parquet 9 88.0 Ivan Denver \N +20250314163439685 20250314163439685_0_0 10 ca84ae4f-b5b6-4c28-8168-551941f75586-0_0-277-282_20250314163439685.parquet 10 101.1 Judy Austin \N +20250314163446641 20250314163446641_0_0 11 0f944779-eaf3-431f-afc2-11720338bc34-0_0-289-293_20250314163446641.parquet 11 222.2 QQ cn 24 + +-- !parquet_inc_2 -- +20250314163425482 20250314163425482_0_0 6 ce12666a-5f10-488c-a143-069d2b478922-0_0-231-240_20250314163425482.parquet 6 85.5 Frank San Francisco \N +20250314163426999 20250314163426999_0_0 7 6175143f-b2ea-40aa-ad98-7c06cf96b013-0_0-241-249_20250314163426999.parquet 7 90.0 Grace Seattle \N +20250314163429429 20250314163429429_0_0 8 fe1dc348-f4ed-4aff-996d-b2d391b92795-0_0-253-260_20250314163429429.parquet 8 95.5 Heidi Portland \N +20250314163434457 20250314163434457_0_0 9 873dbde7-1ca8-4d75-886f-055e1b4ead69-0_0-265-271_20250314163434457.parquet 9 88.0 Ivan Denver \N + +-- !orc_inc_1 -- +20250314162814849 20250314162814849_0_0 7 28ad4dfc-07ae-4108-926e-7ba35b1ac5ce-0_0-130-149_20250314162814849.parquet 7 90.0 Grace Seattle \N +20250314162817433 20250314162817433_0_0 8 a07d9dfb-791a-4cdc-bc7c-5f0d0d0d6a77-0_0-142-160_20250314162817433.parquet 8 95.5 Heidi Portland \N +20250314162822624 20250314162822624_0_0 9 91bcf0a8-708e-4f15-af6a-8a077da68184-0_0-154-171_20250314162822624.parquet 9 88.0 Ivan Denver \N +20250314162828063 20250314162828063_0_0 10 8df59b32-21a7-4a24-9fe4-eb8ef71956eb-0_0-166-182_20250314162828063.parquet 10 101.1 Judy Austin \N +20250314162847946 20250314162847946_0_0 11 6e2a56c2-9fbb-45cf-84fa-58815bda53ce-0_0-178-193_20250314162847946.parquet 11 222.2 QQ cn 24 + +-- !orc_inc_2 -- +20250314162814849 20250314162814849_0_0 7 28ad4dfc-07ae-4108-926e-7ba35b1ac5ce-0_0-130-149_20250314162814849.parquet 7 90.0 Grace Seattle \N +20250314162817433 20250314162817433_0_0 8 a07d9dfb-791a-4cdc-bc7c-5f0d0d0d6a77-0_0-142-160_20250314162817433.parquet 8 95.5 Heidi Portland \N +20250314162822624 20250314162822624_0_0 9 91bcf0a8-708e-4f15-af6a-8a077da68184-0_0-154-171_20250314162822624.parquet 9 88.0 Ivan Denver \N + diff --git a/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy new file mode 100644 index 00000000000000..efaf7e1bfefb00 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_schema_change2", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String catalog_name = "iceberg_schema_change2" + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """ use test_db;""" + + qt_parquet_1 """ select * from sc_drop_add_parquet order by id; """ + qt_parquet_2 """ select * from sc_drop_add_parquet where age is NULL order by id; """ + qt_parquet_3 """ select * from sc_drop_add_parquet where age is not NULL order by id; """ + qt_parquet_4 """ select * from sc_drop_add_parquet where age > 28 order by id; """ + qt_parquet_5 """ select * from sc_drop_add_parquet where age >= 28 order by id; """ + qt_parquet_6 """ select id, name from sc_drop_add_parquet where age >= 28 order by id; """ + qt_parquet_7 """ select id, age from sc_drop_add_parquet where name="Eve" order by id; """ + + + + qt_orc_1 """ select * from sc_drop_add_orc order by id; """ + qt_orc_2 """ select * from sc_drop_add_orc where age is NULL order by id; """ + qt_orc_3 """ select * from sc_drop_add_orc where age is not NULL order by id; """ + qt_orc_4 """ select * from sc_drop_add_orc where age > 28 order by id; """ + qt_orc_5 """ select * from sc_drop_add_orc where age >= 28 order by id; """ + qt_orc_6 """ select id, name from sc_drop_add_orc where age >= 28 order by id; """ + qt_orc_7 """ select id, age from sc_drop_add_orc where name="Eve" order by id; """ + +} diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy new file mode 100644 index 00000000000000..648a4079a6eaed --- /dev/null +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hudi_schema_change", "p2,external,hudi,external_remote,external_remote_hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable hudi test") + return + } + + String catalog_name = "test_hudi_schema_change" + String props = context.config.otherConfigs.get("hudiEmrCatalog") + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + ${props} + ); + """ + + sql """ switch ${catalog_name};""" + sql """ use regression_hudi;""" + sql """ set enable_fallback_to_original_planner=false """ + sql """set force_jni_scanner = false;""" + + def hudi_sc_tbs = ["hudi_sc_orc_cow","hudi_sc_parquet_cow"] + + for (String hudi_sc_tb : hudi_sc_tbs) { + qt_hudi_0 """ SELECT * FROM ${hudi_sc_tb} ORDER BY id; """ + qt_hudi_1 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score > 90 ORDER BY id; """ + qt_hudi_2 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score < 90 ORDER BY id; """ + qt_hudi_3 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score = 90 ORDER BY id; """ + qt_hudi_4 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score IS NULL ORDER BY id; """ + qt_hudi_5 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE location = 'New York' ORDER BY id; """ + qt_hudi_6 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE location IS NULL ORDER BY id; """ + qt_hudi_7 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score > 85 AND location = 'San Francisco' ORDER BY id; """ + qt_hudi_8 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE score < 100 OR location = 'Austin' ORDER BY id; """ + qt_hudi_9 """ SELECT id, full_name FROM ${hudi_sc_tb} WHERE full_name LIKE 'A%' ORDER BY id; """ + qt_hudi_10 """ SELECT id, score, full_name, location FROM ${hudi_sc_tb} WHERE id BETWEEN 3 AND 7 ORDER BY id; """ + qt_hudi_11 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE age > 20 ORDER BY id; """ + qt_hudi_12 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE age IS NULL ORDER BY id; """ + qt_hudi_13 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE score > 100 AND age IS NOT NULL ORDER BY id; """ + qt_hudi_14 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE location = 'cn' ORDER BY id; """ + qt_hudi_15 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE full_name = 'QQ' AND age > 20 ORDER BY id; """ + qt_hudi_16 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE score < 100 OR age < 25 ORDER BY id; """ + qt_hudi_17 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE age BETWEEN 20 AND 30 ORDER BY id; """ + qt_hudi_18 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE location IS NULL AND age IS NULL ORDER BY id; """ + qt_hudi_19 """ SELECT id, full_name, age FROM ${hudi_sc_tb} WHERE full_name LIKE 'Q%' AND age IS NOT NULL ORDER BY id; """ + qt_hudi_20 """ SELECT id, score, full_name, location, age FROM ${hudi_sc_tb} WHERE id > 5 AND age IS NULL ORDER BY id; """ + + + } + qt_orc_time_travel """ select * from hudi_sc_orc_cow FOR TIME AS OF "20250314162817433_0_0" order by id; """ //1-8 + qt_parquet_time_travel """ select * from hudi_sc_parquet_cow FOR TIME AS OF "20250314163425482" order by id; """//1-6 + + qt_parquet_inc_1 """ SELECT * from hudi_sc_parquet_cow@incr('beginTime'='20250314163421827') order by id; """ + qt_parquet_inc_2 """ SELECT * from hudi_sc_parquet_cow@incr('beginTime'='20250314163421827','endTime'="20250314163434457") order by id; """ + + qt_orc_inc_1 """ SELECT * from hudi_sc_orc_cow@incr('beginTime'='20250314162813019') order by id; """ + qt_orc_inc_2 """ SELECT * from hudi_sc_orc_cow@incr('beginTime'='20250314162813019','endTime'='20250314162822624') order by id; """ + + + sql """drop catalog if exists ${catalog_name};""" +} +/* + +spark-sql \ +--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ +--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ +--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ +--conf spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes=false + +set hoodie.schema.on.read.enable=true; +set hoodie.metadata.enable=false; +set hoodie.parquet.small.file.limit = 100; + + +CREATE TABLE hudi_sc_orc_cow ( + id int, + name string, + age int +) USING hudi +OPTIONS ( + type = 'cow', + primaryKey = 'id', + hoodie.base.file.format= 'orc' +); + +desc hudi_sc_orc_cow; +select * from hudi_sc_orc_cow; + +INSERT INTO hudi_sc_orc_cow VALUES (1, 'Alice', 25); +INSERT INTO hudi_sc_orc_cow VALUES (2, 'Bob', 30); + +-- id name age city +ALTER TABLE hudi_sc_orc_cow ADD COLUMNS (city string); +INSERT INTO hudi_sc_orc_cow VALUES (3, 'Charlie', 28, 'New York'); + +-- id name city +ALTER TABLE hudi_sc_orc_cow DROP COLUMN age; +INSERT INTO hudi_sc_orc_cow VALUES (4, 'David', 'Los Angeles'); + +-- id full_name city +ALTER TABLE hudi_sc_orc_cow RENAME COLUMN name TO full_name; +INSERT INTO hudi_sc_orc_cow VALUES (5, 'Eve', 'Chicago'); + +-- id score full_name city +ALTER TABLE hudi_sc_orc_cow ADD COLUMNS (score float AFTER id); +INSERT INTO hudi_sc_orc_cow VALUES (6,85.5, 'Frank', 'San Francisco'); + +-- id city score full_name +ALTER TABLE hudi_sc_orc_cow CHANGE COLUMN city city string AFTER id; +INSERT INTO hudi_sc_orc_cow VALUES (7, 'Seattle', 90.0, 'Grace'); + +ALTER TABLE hudi_sc_orc_cow CHANGE COLUMN score score double; +INSERT INTO hudi_sc_orc_cow VALUES (8, 'Portland', 95.5 , 'Heidi'); + +-- id location score full_name +ALTER TABLE hudi_sc_orc_cow RENAME COLUMN city TO location; +INSERT INTO hudi_sc_orc_cow VALUES (9, 'Denver', 88.0, 'Ivan'); + +-- id score full_name location +ALTER TABLE hudi_sc_orc_cow ALTER COLUMN location AFTER full_name; +INSERT INTO hudi_sc_orc_cow VALUES (10, 101.1,'Judy', 'Austin'); + + +select id,score,full_name,location from hudi_sc_orc_cow order by id; +1 NULL Alice NULL +2 NULL Bob NULL +3 NULL Charlie New York +4 NULL David Los Angeles +5 NULL Eve Chicago +6 85.5 Frank San Francisco +7 90.0 Grace Seattle +8 95.5 Heidi Portland +9 88.0 Ivan Denver +10 101.1 Judy Austin + +-- id score full_name location age +ALTER TABLE hudi_sc_orc_cow ADD COLUMN age int; +INSERT INTO hudi_sc_orc_cow VALUES (11, 222.2,'QQ', 'cn', 24); +*/ +