Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ std::string TabletReader::ReaderParams::to_string() const {
ss << " end_keys=" << key;
}

// for (auto& condition : conditions) {
// ss << " conditions=" << apache::thrift::ThriftDebugString(condition.filter);
// }

return ss.str();
}

Expand Down
51 changes: 24 additions & 27 deletions be/src/vec/exec/format/generic_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ Status ExprPushDownHelper::_extract_predicates(const VExprSPtr& expr, int& cid,
return Status::OK();
}

Status ExprPushDownHelper::convert_predicates(
const VExprSPtrs& exprs, std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
std::unique_ptr<MutilColumnBlockPredicate>& root, Arena& arena) {
Status ExprPushDownHelper::convert_predicates(const VExprSPtrs& exprs,
std::unique_ptr<MutilColumnBlockPredicate>& root,
Arena& arena) {
if (exprs.empty()) {
return Status::OK();
}
Expand All @@ -79,29 +79,29 @@ Status ExprPushDownHelper::convert_predicates(
case TExprNodeType::BINARY_PRED: {
RETURN_IF_ERROR(_extract_predicates(expr, cid, data_type, values, false, parsed));
if (parsed) {
std::shared_ptr<ColumnPredicate> predicate;
if (expr->op() == TExprOpcode::EQ) {
predicates.push_back(create_comparison_predicate0<PredicateType::EQ>(
cid, data_type, values[0], false, arena));
predicate = create_comparison_predicate0<PredicateType::EQ>(
cid, data_type, values[0], false, arena);
} else if (expr->op() == TExprOpcode::NE) {
predicates.push_back(create_comparison_predicate0<PredicateType::NE>(
cid, data_type, values[0], false, arena));
predicate = create_comparison_predicate0<PredicateType::NE>(
cid, data_type, values[0], false, arena);
} else if (expr->op() == TExprOpcode::LT) {
predicates.push_back(create_comparison_predicate0<PredicateType::LT>(
cid, data_type, values[0], false, arena));
predicate = create_comparison_predicate0<PredicateType::LT>(
cid, data_type, values[0], false, arena);
} else if (expr->op() == TExprOpcode::LE) {
predicates.push_back(create_comparison_predicate0<PredicateType::LE>(
cid, data_type, values[0], false, arena));
predicate = create_comparison_predicate0<PredicateType::LE>(
cid, data_type, values[0], false, arena);
} else if (expr->op() == TExprOpcode::GT) {
predicates.push_back(create_comparison_predicate0<PredicateType::GT>(
cid, data_type, values[0], false, arena));
predicate = create_comparison_predicate0<PredicateType::GT>(
cid, data_type, values[0], false, arena);
} else if (expr->op() == TExprOpcode::GE) {
predicates.push_back(create_comparison_predicate0<PredicateType::GE>(
cid, data_type, values[0], false, arena));
predicate = create_comparison_predicate0<PredicateType::GE>(
cid, data_type, values[0], false, arena);
} else {
break;
}
root->add_column_predicate(
SingleColumnBlockPredicate::create_unique(predicates.back()));
root->add_column_predicate(SingleColumnBlockPredicate::create_unique(predicate));
}
break;
}
Expand Down Expand Up @@ -157,10 +157,9 @@ Status ExprPushDownHelper::convert_predicates(
set->insert(reinterpret_cast<const void*>(values[i].data));
}
}
predicates.push_back(create_in_list_predicate<PredicateType::IN_LIST>(
cid, data_type, set, false));
root->add_column_predicate(
SingleColumnBlockPredicate::create_unique(predicates.back()));
root->add_column_predicate(SingleColumnBlockPredicate::create_unique(
create_in_list_predicate<PredicateType::IN_LIST>(cid, data_type, set,
false)));
}
break;
}
Expand All @@ -174,15 +173,15 @@ Status ExprPushDownHelper::convert_predicates(
switch (expr->op()) {
case TExprOpcode::COMPOUND_AND: {
for (const auto& child : expr->children()) {
RETURN_IF_ERROR(convert_predicates({child}, predicates, root, arena));
RETURN_IF_ERROR(convert_predicates({child}, root, arena));
}
break;
}
case TExprOpcode::COMPOUND_OR: {
std::unique_ptr<MutilColumnBlockPredicate> new_root =
OrBlockColumnPredicate::create_unique();
for (const auto& child : expr->children()) {
RETURN_IF_ERROR(convert_predicates({child}, predicates, new_root, arena));
RETURN_IF_ERROR(convert_predicates({child}, new_root, arena));
}
root->add_column_predicate(std::move(new_root));
break;
Expand All @@ -199,11 +198,9 @@ Status ExprPushDownHelper::convert_predicates(
if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
RETURN_IF_ERROR(_extract_predicates(expr, cid, data_type, values, true, parsed));
if (parsed) {
predicates.push_back(
root->add_column_predicate(SingleColumnBlockPredicate::create_unique(
NullPredicate::create_shared(cid, true, data_type->get_primitive_type(),
fn_name == "is_not_null_pred"));
root->add_column_predicate(
SingleColumnBlockPredicate::create_unique(predicates.back()));
fn_name == "is_not_null_pred")));
}
}
break;
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ class ExprPushDownHelper {
virtual ~ExprPushDownHelper() = default;
bool check_expr_can_push_down(const VExprSPtr& expr) const;
Status convert_predicates(const VExprSPtrs& exprs,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
std::unique_ptr<MutilColumnBlockPredicate>& root, Arena& arena);

protected:
Expand Down
7 changes: 3 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,7 @@ Status ParquetReader::set_fill_columns(

if (check_expr_can_push_down(expr)) {
_push_down_predicates.push_back(AndBlockColumnPredicate::create_unique());
RETURN_IF_ERROR(convert_predicates({expr}, _useless_predicates,
_push_down_predicates.back(), _arena));
RETURN_IF_ERROR(convert_predicates({expr}, _push_down_predicates.back(), _arena));
}
}

Expand Down Expand Up @@ -707,8 +706,8 @@ Status ParquetReader::_next_row_group_reader() {
// for min-max filter.
if (check_expr_can_push_down(binary_expr)) {
_push_down_predicates.push_back(AndBlockColumnPredicate::create_unique());
RETURN_IF_ERROR(convert_predicates({binary_expr}, _useless_predicates,
_push_down_predicates.back(), _arena));
RETURN_IF_ERROR(convert_predicates({binary_expr}, _push_down_predicates.back(),
_arena));
}
}
}
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper {
// Since the filtering conditions for topn are dynamic, the filtering is delayed until create next row group reader.
VExprSPtrs _top_runtime_vexprs;
std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
std::vector<std::shared_ptr<ColumnPredicate>> _useless_predicates;
Arena _arena;
};
#include "common/compile_check_end.h"
Expand Down
69 changes: 36 additions & 33 deletions be/test/pipeline/operator/scan_normalize_predicate_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1132,39 +1132,42 @@ TEST_F(ScanNormalizePredicate, test_double_predicate) {
output_range);
}
// test is not null
// {
// auto local_state = std::make_shared<MockScanLocalState>(state.get(), op.get());
// ColumnValueRange<TYPE_DOUBLE> range("mock", true, 0, 0);
// local_state->_slot_id_to_value_range[SlotId] = std::make_pair(&nullable_slot_desc, range);
// auto slot_ref = std::make_shared<MockSlotRef>(
// 0, std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat64>()));
// auto fn_eq = MockFnCall::create("is_not_null_pred");
//
// fn_eq->add_child(slot_ref);
// fn_eq->_node_type = TExprNodeType::FUNCTION_CALL;
// slot_ref->_slot_id = SlotId;
// EXPECT_FALSE(fn_eq->is_constant());
//
// auto ctx = VExprContext::create_shared(fn_eq);
// ctx->_prepared = true;
// ctx->_opened = true;
//
// vectorized::VExprSPtr new_root;
// auto conjunct_expr_root = ctx;
// EXPECT_TRUE(local_state->_normalize_predicate(conjunct_expr_root.get(), new_root));
// auto& output_range = local_state->_slot_id_to_value_range[SlotId];
// std::visit(
// [](auto&& arg) {
// using T = std::decay_t<decltype(arg)>;
// if constexpr (std::is_same_v<T, ColumnValueRange<TYPE_DOUBLE>>) {
// EXPECT_FALSE(arg.is_fixed_value_range());
// EXPECT_FALSE(arg.contain_null());
// } else {
// FAIL() << "unexpected type";
// }
// },
// output_range);
// }
{
auto local_state = std::make_shared<MockScanLocalState>(state.get(), op.get());
ColumnValueRange<TYPE_DOUBLE> range("mock", true, 0, 0);
local_state->_slot_id_to_value_range[SlotId] = range;
local_state->_slot_id_to_predicates[SlotId] =
std::vector<std::shared_ptr<ColumnPredicate>>();
op->_slot_id_to_slot_desc[SlotId] = &slot_desc;
auto slot_ref = std::make_shared<MockSlotRef>(
0, std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat64>()));
auto fn_eq = MockFnCall::create("is_not_null_pred");

fn_eq->add_child(slot_ref);
fn_eq->_node_type = TExprNodeType::FUNCTION_CALL;
slot_ref->_slot_id = SlotId;
EXPECT_FALSE(fn_eq->is_constant());

auto ctx = VExprContext::create_shared(fn_eq);
ctx->_prepared = true;
ctx->_opened = true;

vectorized::VExprSPtr new_root;
auto conjunct_expr_root = ctx;
EXPECT_TRUE(local_state->_normalize_predicate(conjunct_expr_root.get(), new_root));
auto& output_range = local_state->_slot_id_to_value_range[SlotId];
std::visit(
[](auto&& arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, ColumnValueRange<TYPE_DOUBLE>>) {
EXPECT_FALSE(arg.is_fixed_value_range());
EXPECT_FALSE(arg.contain_null());
} else {
FAIL() << "unexpected type";
}
},
output_range);
}
// test less
for (auto const_v : test_values) {
// std::cout << "test less const_v=" << const_v << std::endl;
Expand Down
3 changes: 1 addition & 2 deletions be/test/vec/exec/format/parquet/parquet_expr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1256,8 +1256,7 @@ TEST_F(ParquetExprTest, test_expr_push_down_and) {
std::map<int, std::vector<std::shared_ptr<ColumnPredicate>>> push_down_simple_predicates;
push_down_simple_predicates.emplace(2, std::vector<std::shared_ptr<ColumnPredicate>> {});
p_reader->_push_down_predicates.push_back(AndBlockColumnPredicate::create_unique());
ASSERT_TRUE(p_reader->convert_predicates({and_expr}, push_down_simple_predicates[2],
p_reader->_push_down_predicates.back(),
ASSERT_TRUE(p_reader->convert_predicates({and_expr}, p_reader->_push_down_predicates.back(),
p_reader->_arena)
.ok());

Expand Down
Loading