Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,15 @@ Status OrcReader::_create_file_reader() {

Status OrcReader::init_reader(
const std::vector<std::string>* column_names,
const std::vector<std::string>& missing_column_names,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
const bool hive_use_column_names) {
_column_names = column_names;
_missing_column_names_set.insert(missing_column_names.begin(), missing_column_names.end());
_colname_to_value_range = colname_to_value_range;
_lazy_read_ctx.conjuncts = conjuncts;
_is_acid = is_acid;
Expand Down Expand Up @@ -363,14 +365,21 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
}

Status OrcReader::get_schema_col_name_attribute(std::vector<std::string>* col_names,
std::vector<uint64_t>* col_attributes,
std::string attribute) {
std::vector<int32_t>* col_attributes,
const std::string& attribute,
bool* exist_attribute) {
RETURN_IF_ERROR(_create_file_reader());
*exist_attribute = true;
auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
col_names->emplace_back(get_field_name_lower_case(&root_type, i));

if (!root_type.getSubtype(i)->hasAttributeKey(attribute)) {
*exist_attribute = false;
return Status::OK();
}
col_attributes->emplace_back(
std::stol(root_type.getSubtype(i)->getAttributeValue(attribute)));
std::stoi(root_type.getSubtype(i)->getAttributeValue(attribute)));
}
return Status::OK();
}
Expand All @@ -388,6 +397,11 @@ Status OrcReader::_init_read_columns() {
_scan_params.__isset.slot_name_to_schema_pos;
for (size_t i = 0; i < _column_names->size(); ++i) {
auto& col_name = (*_column_names)[i];
if (_missing_column_names_set.contains(col_name)) {
_missing_cols.emplace_back(col_name);
continue;
}

if (_is_hive1_orc_or_use_idx) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
Expand Down
8 changes: 6 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class OrcReader : public GenericReader {
//If you want to read the file by index instead of column name, set hive_use_column_names to false.
Status init_reader(
const std::vector<std::string>* column_names,
const std::vector<std::string>& missing_column_names,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
Expand Down Expand Up @@ -178,8 +179,8 @@ class OrcReader : public GenericReader {
std::vector<TypeDescriptor>* col_types) override;

Status get_schema_col_name_attribute(std::vector<std::string>* col_names,
std::vector<uint64_t>* col_attributes,
std::string attribute);
std::vector<int32_t>* col_attributes,
const std::string& attribute, bool* exist_attribute);
void set_table_col_to_file_col(
std::unordered_map<std::string, std::string> table_col_to_file_col) {
_table_col_to_file_col = table_col_to_file_col;
Expand Down Expand Up @@ -577,6 +578,9 @@ class OrcReader : public GenericReader {
int64_t _range_size;
const std::string& _ctz;
const std::vector<std::string>* _column_names;
// _missing_column_names_set: used in iceberg/hudi/paimon, the columns are dropped
// but added back(drop column a then add column a). Shouldn't read this column data in this case.
std::set<std::string> _missing_column_names_set;
int32_t _offset_days = 0;
cctz::time_zone _time_zone;

Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class FieldDescriptor {
std::unordered_map<std::string, const FieldSchema*> _name_to_field;
// Used in from_thrift, marking the next schema position that should be parsed
size_t _next_schema_pos;
std::unordered_map<int, std::string> _field_id_name_mapping;
std::map<int32_t, std::string> _field_id_name_mapping;

void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable,
FieldSchema* physical_field);
Expand Down Expand Up @@ -135,6 +135,8 @@ class FieldDescriptor {

bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; }

std::map<int32, std::string> get_field_id_name_map() { return _field_id_name_mapping; }

const doris::Slice get_column_name_from_field_id(int32_t id) const;
};

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ Status ParquetReader::init_reader(
for (const std::string& name : required_columns) {
_missing_cols.emplace_back(name);
}

} else {
const auto& table_column_idxs = _scan_params.column_idxs;
std::map<int, int> table_col_id_to_idx;
Expand Down
92 changes: 92 additions & 0 deletions be/src/vec/exec/format/table/hudi_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "hudi_reader.h"

#include <vector>

#include "common/status.h"
#include "runtime/runtime_state.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

Status HudiReader::get_file_col_id_to_name(bool& exist_schema,
std::map<int, std::string>& file_col_id_to_name) {
if (!_params.__isset.history_schema_info) [[unlikely]] {
exist_schema = false;
return Status::OK();
}

if (!_params.history_schema_info.contains(_range.table_format_params.hudi_params.schema_id))
[[unlikely]] {
return Status::InternalError("hudi file schema info is missing in history schema info.");
}

file_col_id_to_name =
_params.history_schema_info.at(_range.table_format_params.hudi_params.schema_id);
return Status::OK();
}

Status HudiReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block));
RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));
RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block));
return Status::OK();
};

Status HudiOrcReader::init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<int32_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>* table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(
read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range));

auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
return orc_reader->init_reader(&_all_required_col_names, _not_in_file_col_names,
&_new_colname_to_value_range, conjuncts, false, tuple_descriptor,
row_descriptor, not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts);
}

Status HudiParquetReader::init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<int32_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>* table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(
read_table_col_names, table_col_id_table_name_map, table_col_name_to_value_range));
auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);

return parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
}

#include "common/compile_check_end.h"
} // namespace doris::vectorized
85 changes: 85 additions & 0 deletions be/src/vec/exec/format/table/hudi_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <vector>

#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/table_format_reader.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
class HudiReader : public TableFormatReader, public TableSchemaChangeHelper {
public:
HudiReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range,
io::IOContext* io_ctx)
: TableFormatReader(std::move(file_format_reader), state, profile, params, range,
io_ctx) {};

~HudiReader() override = default;

Status get_file_col_id_to_name(bool& exist_schema,
std::map<int, std::string>& file_col_id_to_name) final;

Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final;

Status init_row_filters() final { return Status::OK(); };
};

class HudiOrcReader final : public HudiReader {
public:
ENABLE_FACTORY_CREATOR(HudiOrcReader);
HudiOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx)
: HudiReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {};
~HudiOrcReader() final = default;

Status init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<int32_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
};

class HudiParquetReader final : public HudiReader {
public:
ENABLE_FACTORY_CREATOR(HudiParquetReader);
HudiParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx)
: HudiReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {};
~HudiParquetReader() final = default;

Status init_reader(
const std::vector<std::string>& read_table_col_names,
const std::unordered_map<int32_t, std::string>& table_col_id_table_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
table_col_name_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
};
#include "common/compile_check_end.h"
} // namespace doris::vectorized
Loading
Loading