Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ Status ParquetReader::init_reader(
if (required_file_columns.find(name) != required_file_columns.end()) {
_read_file_columns.emplace_back(name);
_read_table_columns.emplace_back(required_file_columns[name]);
_read_table_columns_set.insert(required_file_columns[name]);
}
}
// build column predicates for column lazy read
Expand All @@ -365,7 +366,13 @@ Status ParquetReader::init_reader(
}

bool ParquetReader::_exists_in_file(const VSlotRef* slot_ref) const {
return _table_info_node_ptr->children_column_exists(slot_ref->expr_name());
// `_read_table_columns_set` is used to ensure that only columns actually read are subject to min-max filtering.
// This primarily handles cases where partition columns also exist in a file. The reason it's not modified
// in `_table_info_node_ptr` is that Iceberg、Hudi has inconsistent requirements for this node;
// Iceberg partition evolution need read partition columns from a file.
// hudi set `hoodie.datasource.write.drop.partition.columns=false` not need read partition columns from a file.
return _table_info_node_ptr->children_column_exists(slot_ref->expr_name()) &&
_read_table_columns_set.contains(slot_ref->expr_name());
}

bool ParquetReader::_type_matches(const VSlotRef* slot_ref) const {
Expand Down Expand Up @@ -946,6 +953,12 @@ Status ParquetReader::_process_page_index_filter(
// complex type, not support page index yet.
return false;
}
if (!_col_offsets.contains(parquet_col_id)) {
// If the file contains partition columns and the query applies filters on those
// partition columns, then reading the page index is unnecessary.
return false;
}

auto& column_chunk = row_group.columns[parquet_col_id];
if (column_chunk.column_index_length == 0 || column_chunk.offset_index_length == 0) {
// column no page index.
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
//sequence in file, need to read
std::vector<std::string> _read_table_columns;
std::vector<std::string> _read_file_columns;

// The set of file columns to be read; only columns within this set will be filtered using the min-max predicate.
std::set<std::string> _read_table_columns_set;
// Deleted rows will be marked by Iceberg/Paimon. So we should filter deleted rows when reading it.
const std::vector<int64_t>* _delete_rows = nullptr;
int64_t _delete_rows_index = 0;
Expand Down
19 changes: 11 additions & 8 deletions be/test/vec/exec/format/parquet/parquet_expr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ TEST_F(ParquetExprTest, test_ne) {
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeInt64>({100}));

slot_ref->set_expr_name("int32_all_null_col");
fn_eq->add_child(slot_ref);
fn_eq->add_child(const_val);
fn_eq->_node_type = TExprNodeType::BINARY_PRED;
Expand All @@ -417,7 +418,7 @@ TEST_F(ParquetExprTest, test_eq) {
auto fn_eq = MockFnCall::create("eq");
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeInt32>({100}));

slot_ref->set_expr_name("int32_all_null_col");
fn_eq->add_child(slot_ref);
fn_eq->add_child(const_val);
fn_eq->_node_type = TExprNodeType::BINARY_PRED;
Expand All @@ -437,7 +438,7 @@ TEST_F(ParquetExprTest, test_le) {
auto fn_eq = MockFnCall::create("le");
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeInt32>({100}));

slot_ref->set_expr_name("int32_all_null_col");
fn_eq->add_child(slot_ref);
fn_eq->add_child(const_val);
fn_eq->_node_type = TExprNodeType::BINARY_PRED;
Expand All @@ -457,7 +458,7 @@ TEST_F(ParquetExprTest, test_ge) {
auto fn_eq = MockFnCall::create("ge");
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeInt32>({100}));

slot_ref->set_expr_name("int32_all_null_col");
fn_eq->add_child(slot_ref);
fn_eq->add_child(const_val);
fn_eq->_node_type = TExprNodeType::BINARY_PRED;
Expand All @@ -477,7 +478,7 @@ TEST_F(ParquetExprTest, test_gt) {
auto fn_eq = MockFnCall::create("gt");
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeInt32>({100}));

slot_ref->set_expr_name("int32_all_null_col");
fn_eq->add_child(slot_ref);
fn_eq->add_child(const_val);
fn_eq->_node_type = TExprNodeType::BINARY_PRED;
Expand All @@ -497,7 +498,7 @@ TEST_F(ParquetExprTest, test_lt) {
auto fn_eq = MockFnCall::create("lt");
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeInt32>({100}));

slot_ref->set_expr_name("int32_all_null_col");
fn_eq->add_child(slot_ref);
fn_eq->add_child(const_val);
fn_eq->_node_type = TExprNodeType::BINARY_PRED;
Expand Down Expand Up @@ -1170,6 +1171,7 @@ TEST_F(ParquetExprTest, test_expr_push_down_and) {
auto fn_le = MockFnCall::create("le");
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeInt64>({10000000002}));
slot_ref->set_expr_name("int64_col");
fn_le->add_child(slot_ref);
fn_le->add_child(const_val);
fn_le->_node_type = TExprNodeType::BINARY_PRED;
Expand All @@ -1189,7 +1191,7 @@ TEST_F(ParquetExprTest, test_expr_push_down_and) {
auto fn_le = MockFnCall::create("gt");
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeInt64>({100}));

slot_ref->set_expr_name("int64_col");
fn_le->add_child(slot_ref);
fn_le->add_child(const_val);
fn_le->_node_type = TExprNodeType::BINARY_PRED;
Expand All @@ -1209,7 +1211,7 @@ TEST_F(ParquetExprTest, test_expr_push_down_and) {
auto fn_le = MockFnCall::create("ge");
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeInt64>({900}));

slot_ref->set_expr_name("int64_col");
fn_le->add_child(slot_ref);
fn_le->add_child(const_val);
fn_le->_node_type = TExprNodeType::BINARY_PRED;
Expand Down Expand Up @@ -1271,7 +1273,7 @@ TEST_F(ParquetExprTest, test_expr_push_down_or_string) {
auto fn_lt = MockFnCall::create("lt");
auto const_val = std::make_shared<MockLiteral>(
ColumnHelper::create_column_with_name<DataTypeString>({"name_1"}));

slot_ref->set_expr_name("string_col");
fn_lt->add_child(slot_ref);
fn_lt->add_child(const_val);
fn_lt->_node_type = TExprNodeType::BINARY_PRED;
Expand All @@ -1290,6 +1292,7 @@ TEST_F(ParquetExprTest, test_expr_push_down_or_string) {
{
auto slot_ref = std::make_shared<MockSlotRef>(5, std::make_shared<DataTypeString>());
auto fn_is_not_null = MockFnCall::create("is_not_null_pred");
slot_ref->set_expr_name("string_col");

fn_is_not_null->add_child(slot_ref);
fn_is_not_null->_node_type = TExprNodeType::FUNCTION_CALL;
Expand Down
Loading