Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ Status PushBrokerReader::_get_next_reader() {

init_status = parquet_reader->init_reader(
_all_col_names, &_col_name_to_block_idx, _push_down_exprs, _slot_id_to_predicates,
_or_predicates, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id,
_real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
vectorized::TableSchemaChangeHelper::ConstNode::get_instance(), false);
_cur_reader = std::move(parquet_reader);
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ class PushBrokerReader {
std::unordered_map<std::string, uint32_t> _col_name_to_block_idx;
vectorized::VExprContextSPtrs _push_down_exprs;
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> _slot_id_to_predicates;
std::vector<std::shared_ptr<MutilColumnBlockPredicate>> _or_predicates;
const std::unordered_map<std::string, int>* _col_name_to_slot_id;
// single slot filter conjuncts
std::unordered_map<int, vectorized::VExprContextSPtrs> _slot_id_to_filter_conjuncts;
Expand Down
15 changes: 8 additions & 7 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,14 @@ bool FileScanLocalState::_should_push_down_or_predicate_recursively(

PushDownType FileScanLocalState::_should_push_down_or_predicate(
const vectorized::VExprContext* expr_ctx) const {
auto expr = expr_ctx->root()->get_impl() ? expr_ctx->root()->get_impl() : expr_ctx->root();
if (expr->node_type() == TExprNodeType::COMPOUND_PRED &&
expr->op() == TExprOpcode::COMPOUND_OR) {
if (_should_push_down_or_predicate_recursively(expr)) {
return PushDownType::PARTIAL_ACCEPTABLE;
}
}
// TODO(gabriel): Do not push down OR predicate for the time being.
// auto expr = expr_ctx->root()->get_impl() ? expr_ctx->root()->get_impl() : expr_ctx->root();
// if (expr->node_type() == TExprNodeType::COMPOUND_PRED &&
// expr->op() == TExprOpcode::COMPOUND_OR) {
// if (_should_push_down_or_predicate_recursively(expr)) {
// return PushDownType::PARTIAL_ACCEPTABLE;
// }
// }
return PushDownType::UNACCEPTABLE;
}

Expand Down
382 changes: 152 additions & 230 deletions be/src/pipeline/exec/scan_operator.cpp

Large diffs are not rendered by default.

19 changes: 8 additions & 11 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,23 +250,22 @@ class ScanLocalState : public ScanLocalStateBase {
// Normalize a conjunct and try to convert it to column predicate recursively.
Status _normalize_predicate(vectorized::VExprContext* context,
const vectorized::VExprSPtr& root,
vectorized::VExprSPtr& output_expr,
MutilColumnBlockPredicate* parent);
vectorized::VExprSPtr& output_expr);
Status _eval_const_conjuncts(vectorized::VExprContext* expr_ctx, PushDownType* pdt);

Status _normalize_bloom_filter(vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt, MutilColumnBlockPredicate* parent);
PushDownType* pdt);
Status _normalize_topn_filter(vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt, MutilColumnBlockPredicate* parent);
PushDownType* pdt);

Status _normalize_bitmap_filter(vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root,
SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt, MutilColumnBlockPredicate* parent);
PushDownType* pdt);

Status _normalize_function_filters(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot,
PushDownType* pdt);
Expand All @@ -278,25 +277,23 @@ class ScanLocalState : public ScanLocalStateBase {
Status _normalize_in_and_eq_predicate(vectorized::VExprContext* expr_ctx,
vectorized::VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
ColumnValueRange<T>& range, PushDownType* pdt,
MutilColumnBlockPredicate* parent);
ColumnValueRange<T>& range, PushDownType* pdt);
template <PrimitiveType T>
Status _normalize_not_in_and_not_eq_predicate(
vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates, ColumnValueRange<T>& range,
PushDownType* pdt, MutilColumnBlockPredicate* parent);
PushDownType* pdt);

template <PrimitiveType T>
Status _normalize_noneq_binary_predicate(
vectorized::VExprContext* expr_ctx, vectorized::VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates, ColumnValueRange<T>& range,
PushDownType* pdt, MutilColumnBlockPredicate* parent);
PushDownType* pdt);
template <PrimitiveType T>
Status _normalize_is_null_predicate(vectorized::VExprContext* expr_ctx,
vectorized::VExprSPtr& root, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
ColumnValueRange<T>& range, PushDownType* pdt,
MutilColumnBlockPredicate* parent);
ColumnValueRange<T>& range, PushDownType* pdt);

template <bool IsFixed, PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc>
Status _change_value_range(ColumnValueRange<PrimitiveType>& range, const void* value,
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ class RowGroupReader : public ProfileCollector {

phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>
slot_id_to_predicates;
std::vector<std::shared_ptr<MutilColumnBlockPredicate>> or_predicates;
bool can_lazy_read = false;
// block->rows() returns the number of rows of the first column,
// so we should check and resize the first column
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ Status ParquetReader::init_reader(
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
std::vector<std::shared_ptr<MutilColumnBlockPredicate>>& or_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand Down Expand Up @@ -384,7 +383,6 @@ Status ParquetReader::init_reader(
// build column predicates for column lazy read
_lazy_read_ctx.conjuncts = conjuncts;
_lazy_read_ctx.slot_id_to_predicates = slot_id_to_predicates;
_lazy_read_ctx.or_predicates = or_predicates;
return Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class ParquetReader : public GenericReader {
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
std::vector<std::shared_ptr<MutilColumnBlockPredicate>>& or_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/table/hive_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ Status HiveParquetReader::init_reader(
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
std::vector<std::shared_ptr<MutilColumnBlockPredicate>>& or_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand Down Expand Up @@ -289,9 +288,8 @@ Status HiveParquetReader::init_reader(

return parquet_reader->init_reader(
read_table_col_names, col_name_to_block_idx, conjuncts, slot_id_to_predicates,
or_predicates, tuple_descriptor, row_descriptor, colname_to_slot_id,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr,
true, column_ids, filter_column_ids);
tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids);
}

ColumnIdResult HiveParquetReader::_create_column_ids(const FieldDescriptor* field_desc,
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/table/hive_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class HiveParquetReader final : public HiveReader {
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
std::vector<std::shared_ptr<MutilColumnBlockPredicate>>& or_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/exec/format/table/hudi_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ Status HudiParquetReader::init_reader(
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
std::vector<std::shared_ptr<MutilColumnBlockPredicate>>& or_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand All @@ -50,10 +49,10 @@ Status HudiParquetReader::init_reader(
RETURN_IF_ERROR(gen_table_info_node_by_field_id(
_params, _range.table_format_params.hudi_params.schema_id, tuple_descriptor,
*field_desc));
return parquet_reader->init_reader(
read_table_col_names, col_name_to_block_idx, conjuncts, slot_id_to_predicates,
or_predicates, tuple_descriptor, row_descriptor, colname_to_slot_id,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr);
return parquet_reader->init_reader(read_table_col_names, col_name_to_block_idx, conjuncts,
slot_id_to_predicates, tuple_descriptor, row_descriptor,
colname_to_slot_id, not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts, table_info_node_ptr);
}

#include "common/compile_check_end.h"
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/table/hudi_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class HudiParquetReader final : public HudiReader {
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
std::vector<std::shared_ptr<MutilColumnBlockPredicate>>& or_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand Down
14 changes: 5 additions & 9 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,9 @@ Status IcebergTableReader::_equality_delete_base(
}
if (auto* parquet_reader = typeid_cast<ParquetReader*>(delete_reader.get())) {
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
std::vector<std::shared_ptr<MutilColumnBlockPredicate>> or_predicates;
RETURN_IF_ERROR(parquet_reader->init_reader(
equality_delete_col_names, &delete_col_name_to_block_idx, {}, tmp,
or_predicates, nullptr, nullptr, nullptr, nullptr, nullptr,
equality_delete_col_names, &delete_col_name_to_block_idx, {}, tmp, nullptr,
nullptr, nullptr, nullptr, nullptr,
TableSchemaChangeHelper::ConstNode::get_instance(), false));
} else if (auto* orc_reader = typeid_cast<OrcReader*>(delete_reader.get())) {
RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names,
Expand Down Expand Up @@ -448,7 +447,6 @@ Status IcebergParquetReader::init_reader(
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
std::vector<std::shared_ptr<MutilColumnBlockPredicate>>& or_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand Down Expand Up @@ -493,9 +491,8 @@ Status IcebergParquetReader::init_reader(
}
return parquet_reader->init_reader(
_all_required_col_names, _col_name_to_block_idx, conjuncts, slot_id_to_predicates,
or_predicates, tuple_descriptor, row_descriptor, colname_to_slot_id,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr,
true, column_ids, filter_column_ids);
tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts, table_info_node_ptr, true, column_ids, filter_column_ids);
}

ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* field_desc,
Expand Down Expand Up @@ -566,11 +563,10 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d
READ_DELETE_FILE_BATCH_SIZE, &_state->timezone_obj(),
_io_ctx, _state, _meta_cache);
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
std::vector<std::shared_ptr<MutilColumnBlockPredicate>> or_predicates;
RETURN_IF_ERROR(parquet_delete_reader.init_reader(
delete_file_col_names,
const_cast<std::unordered_map<std::string, uint32_t>*>(&DELETE_COL_NAME_TO_BLOCK_IDX),
{}, tmp, or_predicates, nullptr, nullptr, nullptr, nullptr, nullptr,
{}, tmp, nullptr, nullptr, nullptr, nullptr, nullptr,
TableSchemaChangeHelper::ConstNode::get_instance(), false));

std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ class IcebergParquetReader final : public IcebergTableReader {
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
std::vector<std::shared_ptr<MutilColumnBlockPredicate>>& or_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/exec/format/table/paimon_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ class PaimonParquetReader final : public PaimonReader {
const VExprContextSPtrs& conjuncts,
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
slot_id_to_predicates,
std::vector<std::shared_ptr<MutilColumnBlockPredicate>>& or_predicates,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand All @@ -122,10 +121,10 @@ class PaimonParquetReader final : public PaimonReader {
_params, _range.table_format_params.paimon_params.schema_id, tuple_descriptor,
*field_desc));

return parquet_reader->init_reader(
read_table_col_names, col_name_to_block_idx, conjuncts, slot_id_to_predicates,
or_predicates, tuple_descriptor, row_descriptor, colname_to_slot_id,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, table_info_node_ptr);
return parquet_reader->init_reader(read_table_col_names, col_name_to_block_idx, conjuncts,
slot_id_to_predicates, tuple_descriptor, row_descriptor,
colname_to_slot_id, not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts, table_info_node_ptr);
}
};
#include "common/compile_check_end.h"
Expand Down
Loading
Loading