Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3846 RD support OR split during pushdown #11439

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions ydb/library/yql/providers/common/pushdown/physical_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,29 @@ using namespace NNodes;

namespace {

TPredicateNode SplitForPartialPushdown(
const NPushdown::TPredicateNode& predicateTree,
TExprContext& ctx,
TPositionHandle pos) {
TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree, TExprContext& ctx, TPositionHandle pos, const TSettings& settings) {
if (predicateTree.CanBePushed) {
return predicateTree;
}

if (predicateTree.Op != NPushdown::EBoolOp::And) {
return NPushdown::TPredicateNode(); // Not valid, => return the same node from optimizer
if (predicateTree.Op != NPushdown::EBoolOp::And && (!settings.IsEnabled(TSettings::EFeatureFlag::SplitOrOperator) || predicateTree.Op != NPushdown::EBoolOp::Or)) {
// Predicat can't be splited, so return invalid value and skip this branch
GrigoriyPA marked this conversation as resolved.
Show resolved Hide resolved
return NPushdown::TPredicateNode();
}

std::vector<NPushdown::TPredicateNode> pushable;
for (auto& predicate : predicateTree.Children) {
if (predicate.CanBePushed) {
pushable.emplace_back(predicate);
NPushdown::TPredicateNode pushablePredicate = SplitForPartialPushdown(predicate, ctx, pos, settings);
if (pushablePredicate.IsValid()) {
pushable.emplace_back(pushablePredicate);
} else if (predicateTree.Op == NPushdown::EBoolOp::Or) {
// One of or branch was invalid, so whole predicat is invalid
GrigoriyPA marked this conversation as resolved.
Show resolved Hide resolved
return NPushdown::TPredicateNode();
}
}

NPushdown::TPredicateNode predicateToPush;
predicateToPush.SetPredicates(pushable, ctx, pos);
predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op);
return predicateToPush;
}

Expand All @@ -51,7 +54,7 @@ TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContex
NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg.Get(), TExprBase(lambdaArg), settings);
YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid");

NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos);
NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos, settings);
if (!predicateToPush.IsValid()) {
return {};
}
Expand Down
44 changes: 28 additions & 16 deletions ydb/library/yql/providers/common/pushdown/predicate_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,38 @@ bool TPredicateNode::IsValid() const {
return res && ExprNode.IsValid();
}

void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op) {
auto predicatesSize = predicates.size();
if (predicatesSize == 0) {
return;
} else if (predicatesSize == 1) {
}
if (predicatesSize == 1) {
*this = predicates[0];
} else {
Op = EBoolOp::And;
Children = predicates;
CanBePushed = true;

TVector<NNodes::TExprBase> exprNodes;
exprNodes.reserve(predicatesSize);
for (auto& pred : predicates) {
exprNodes.emplace_back(pred.ExprNode.Cast());
CanBePushed &= pred.CanBePushed;
}
ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos)
.Add(exprNodes)
.Done();
return;
}

Op = op;
Children = predicates;
CanBePushed = true;

TVector<NNodes::TExprBase> exprNodes;
exprNodes.reserve(predicatesSize);
for (auto& pred : predicates) {
exprNodes.emplace_back(pred.ExprNode.Cast());
CanBePushed &= pred.CanBePushed;
}

switch (op) {
case EBoolOp::And:
ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos).Add(exprNodes).Done();
break;

case EBoolOp::Or:
ExprNode = NNodes::Build<NNodes::TCoOr>(ctx, pos).Add(exprNodes).Done();
break;

default:
throw yexception() << "Unsupported operator for predicate node creation: " << static_cast<int>(op);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct TPredicateNode {
~TPredicateNode();

bool IsValid() const;
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos);
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op);

NNodes::TMaybeNode<NNodes::TExprBase> ExprNode;
std::vector<TPredicateNode> Children;
Expand Down
6 changes: 5 additions & 1 deletion ydb/library/yql/providers/common/pushdown/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace NYql::NPushdown {

struct TSettings {
enum EFeatureFlag : ui64 {
// Supported operators
LikeOperator = 1,
LikeOperatorOnlyForUtf8 = 1 << 1,
JsonQueryOperators = 1 << 2,
Expand All @@ -27,7 +28,10 @@ struct TSettings {
TimestampCtor = 1 << 17,
JustPassthroughOperators = 1 << 18, // if + coalesce + just
InOperator = 1 << 19, // IN()
IsDistinctOperator = 1 << 20 // IS NOT DISTINCT FROM / IS DISTINCT FROM
IsDistinctOperator = 1 << 20, // IS NOT DISTINCT FROM / IS DISTINCT FROM

// Expression spliting features
SplitOrOperator = 1ll << 32 // (...) OR (...) OR (...)
GrigoriyPA marked this conversation as resolved.
Show resolved Hide resolved
};

explicit TSettings(NLog::EComponent logComponent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ namespace NYql {
}

#undef MATCH_ATOM
#undef MATCH_ARITHMETICAL

#define EXPR_NODE_TO_COMPARE_TYPE(TExprNodeType, COMPARE_TYPE) \
if (!opMatched && compare.Maybe<TExprNodeType>()) { \
Expand All @@ -118,7 +119,7 @@ namespace NYql {
EXPR_NODE_TO_COMPARE_TYPE(TCoAggrNotEqual, ID);

if (proto->operation() == TPredicate::TComparison::COMPARISON_OPERATION_UNSPECIFIED) {
err << "unknown operation: " << compare.Raw()->Content();
err << "unknown compare operation: " << compare.Raw()->Content();
return false;
}
return SerializeExpression(compare.Left(), proto->mutable_left_value(), arg, err) && SerializeExpression(compare.Right(), proto->mutable_right_value(), arg, err);
Expand Down Expand Up @@ -181,7 +182,7 @@ namespace NYql {
} else if (auto maybeAsList = expr.Maybe<TCoAsList>()) {
collection = maybeAsList.Cast().Ptr();
} else {
err << "unknown operation: " << expr.Ref().Content();
err << "unknown source for in: " << expr.Ref().Content();
return false;
}

Expand All @@ -195,7 +196,7 @@ namespace NYql {

bool SerializeIsNotDistinctFrom(const TExprBase& predicate, TPredicate* predicateProto, const TCoArgument& arg, TStringBuilder& err, bool invert) {
if (predicate.Ref().ChildrenSize() != 2) {
err << "unknown predicate, expected 2, children size " << predicate.Ref().ChildrenSize();
err << "invalid IsNotDistinctFrom predicate, expected 2 children but got " << predicate.Ref().ChildrenSize();
return false;
}
TPredicate::TComparison* proto = predicateProto->mutable_comparison();
Expand Down Expand Up @@ -356,7 +357,7 @@ namespace NYql {

auto left = FormatExpression(expression.left_value());
auto right = FormatExpression(expression.right_value());
return left + operation + right;
return TStringBuilder() << "(" << left << operation << right << ")";
}

TString FormatNegation(const TPredicate_TNegation& negation) {
Expand Down Expand Up @@ -525,14 +526,22 @@ namespace NYql {

TString FormatIn(const TPredicate_TIn& in) {
auto value = FormatExpression(in.value());
TString list;
TStringStream list;
for (const auto& expr : in.set()) {
if (!list.empty()) {
list += ",";
list << ", ";
} else {
list << value << " IN (";
}
list += FormatExpression(expr);
list << FormatExpression(expr);
}
return value + " IN (" + list + ")";

if (list.empty()) {
throw yexception() << "failed to format IN statement, no operands";
}

list << ")";
return list.Str();
}

TString FormatPredicate(const TPredicate& predicate, bool topLevel ) {
Expand Down
10 changes: 9 additions & 1 deletion ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@ namespace {
: NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
{
using EFlag = NPushdown::TSettings::EFeatureFlag;
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator | EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators);
Enable(
// Operator features
EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 |
EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator |
EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators |

// Split features
EFlag::SplitOrOperator
);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
}
}
NPushdown::TPredicateNode predicateToPush;
predicateToPush.SetPredicates(pushable, ctx, pos);
predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op);
return predicateToPush;
}

Expand Down
8 changes: 6 additions & 2 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def test_filters_non_optional_field(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_filter")
self.init_topics("test_filters_non_optional_field")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
Expand Down Expand Up @@ -335,7 +335,7 @@ def test_filters_optional_field(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_filter")
self.init_topics("test_filters_optional_field")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
Expand All @@ -349,12 +349,16 @@ def test_filters_optional_field(self, kikimr, client):
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `data` = \\"hello2\\"')
filter = 'flag'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `flag`')
filter = 'time * (field2 - field1) != 0'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` * (`field2` - `field1`)) <> 0')
filter = ' event IS NOT DISTINCT FROM "event2"'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS NOT DISTINCT FROM \\"event2\\"')
filter = ' event IS DISTINCT FROM "event1"'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"')
filter = ' field1 IS DISTINCT FROM field2'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `field1` IS DISTINCT FROM `field2`')
filter = 'time == 102 OR (field2 IS NOT DISTINCT FROM 1005 AND Random(field1) < 10.0)'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` = 102 OR `field2` IS NOT DISTINCT FROM 1005)')
filter = 'event IN ("event2")'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
Expand Down
Loading