Skip to content

Commit

Permalink
[opt](nereids) support partitionTopn for multi window exprs (#39687)
Browse files Browse the repository at this point in the history
## Proposed changes

pick from #38393

Co-authored-by: xiongzhongjian <xiongzhongjian@selectdb.com>
  • Loading branch information
xzj7019 and xiongzhongjian authored Aug 22, 2024
1 parent 021982f commit 8f580b5
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,16 @@

package org.apache.doris.nereids.rules.rewrite;

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLikeLiteral;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

/**
* Push down the 'partitionTopN' into the 'window'.
* It will convert the filter condition to the 'limit value' and push down below the 'window'.
Expand Down Expand Up @@ -89,82 +73,17 @@ public Rule build() {
return filter;
}

List<NamedExpression> windowExprs = window.getWindowExpressions();
if (windowExprs.size() != 1) {
return filter;
}
NamedExpression windowExpr = windowExprs.get(0);
if (windowExpr.children().size() != 1 || !(windowExpr.child(0) instanceof WindowExpression)) {
return filter;
}

// Check the filter conditions. Now, we currently only support simple conditions of the form
// 'column </ <=/ = constant'. We will extract some related conjuncts and do some check.
Set<Expression> conjuncts = filter.getConjuncts();
Set<Expression> relatedConjuncts = extractRelatedConjuncts(conjuncts, windowExpr.getExprId());

boolean hasPartitionLimit = false;
long partitionLimit = Long.MAX_VALUE;

for (Expression conjunct : relatedConjuncts) {
Preconditions.checkArgument(conjunct instanceof BinaryOperator);
BinaryOperator op = (BinaryOperator) conjunct;
Expression leftChild = op.children().get(0);
Expression rightChild = op.children().get(1);

Preconditions.checkArgument(leftChild instanceof SlotReference
&& rightChild instanceof IntegerLikeLiteral);

long limitVal = ((IntegerLikeLiteral) rightChild).getLongValue();
// Adjust the value for 'limitVal' based on the comparison operators.
if (conjunct instanceof LessThan) {
limitVal--;
}
if (limitVal < 0) {
return new LogicalEmptyRelation(ctx.statementContext.getNextRelationId(), filter.getOutput());
}
if (hasPartitionLimit) {
partitionLimit = Math.min(partitionLimit, limitVal);
} else {
partitionLimit = limitVal;
hasPartitionLimit = true;
}
}

if (!hasPartitionLimit) {
return filter;
}

Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, false);
if (!newWindow.isPresent()) {
Pair<WindowExpression, Long> windowFuncPair = window.getPushDownWindowFuncAndLimit(filter, Long.MAX_VALUE);
if (windowFuncPair == null) {
return filter;
} else if (windowFuncPair.second == -1) {
// limit -1 indicating a empty relation case
return new LogicalEmptyRelation(ctx.statementContext.getNextRelationId(), filter.getOutput());
} else {
Plan newWindow = window.pushPartitionLimitThroughWindow(windowFuncPair.first,
windowFuncPair.second, false);
return filter.withChildren(newWindow);
}
return filter.withChildren(newWindow.get());
}).toRule(RuleType.CREATE_PARTITION_TOPN_FOR_WINDOW);
}

private Set<Expression> extractRelatedConjuncts(Set<Expression> conjuncts, ExprId slotRefID) {
Predicate<Expression> condition = conjunct -> {
if (!(conjunct instanceof BinaryOperator)) {
return false;
}
BinaryOperator op = (BinaryOperator) conjunct;
Expression leftChild = op.children().get(0);
Expression rightChild = op.children().get(1);

if (!(conjunct instanceof LessThan || conjunct instanceof LessThanEqual || conjunct instanceof EqualTo)) {
return false;
}

// TODO: Now, we only support the column on the left side.
if (!(leftChild instanceof SlotReference) || !(rightChild instanceof IntegerLikeLiteral)) {
return false;
}
return ((SlotReference) leftChild).getExprId() == slotRefID;
};

return conjuncts.stream()
.filter(condition)
.collect(ImmutableSet.toImmutableSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.doris.nereids.rules.rewrite;

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Limit;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
Expand All @@ -31,7 +33,6 @@
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;

/**
* Rules to push {@link org.apache.doris.nereids.trees.plans.logical.LogicalLimit} down.
Expand Down Expand Up @@ -72,11 +73,17 @@ public List<Rule> buildRules() {
.then(limit -> {
LogicalWindow<Plan> window = limit.child();
long partitionLimit = limit.getLimit() + limit.getOffset();
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true);
if (!newWindow.isPresent()) {
if (partitionLimit <= 0) {
return limit;
}
return limit.withChildren(newWindow.get());
Pair<WindowExpression, Long> windowFuncLongPair = window
.getPushDownWindowFuncAndLimit(null, partitionLimit);
if (windowFuncLongPair == null) {
return limit;
}
Plan newWindow = window.pushPartitionLimitThroughWindow(windowFuncLongPair.first,
windowFuncLongPair.second, true);
return limit.withChildren(newWindow);
}).toRule(RuleType.PUSH_LIMIT_THROUGH_WINDOW),

// limit -> project -> window
Expand All @@ -85,11 +92,17 @@ public List<Rule> buildRules() {
LogicalProject<LogicalWindow<Plan>> project = limit.child();
LogicalWindow<Plan> window = project.child();
long partitionLimit = limit.getLimit() + limit.getOffset();
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true);
if (!newWindow.isPresent()) {
if (partitionLimit <= 0) {
return limit;
}
Pair<WindowExpression, Long> windowFuncLongPair = window
.getPushDownWindowFuncAndLimit(null, partitionLimit);
if (windowFuncLongPair == null) {
return limit;
}
return limit.withChildren(project.withChildren(newWindow.get()));
Plan newWindow = window.pushPartitionLimitThroughWindow(windowFuncLongPair.first,
windowFuncLongPair.second, true);
return limit.withChildren(project.withChildren(newWindow));
}).toRule(RuleType.PUSH_LIMIT_THROUGH_PROJECT_WINDOW),

// limit -> union
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.rules.rewrite;

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
Expand All @@ -33,7 +34,6 @@
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;

/**
* PushdownTopNThroughWindow push down the TopN through the Window and generate the PartitionTopN.
Expand All @@ -54,11 +54,14 @@ public List<Rule> buildRules() {
return topn;
}
long partitionLimit = topn.getLimit() + topn.getOffset();
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true);
if (!newWindow.isPresent()) {
Pair<WindowExpression, Long> windowFuncLongPair = window
.getPushDownWindowFuncAndLimit(null, partitionLimit);
if (windowFuncLongPair == null) {
return topn;
}
return topn.withChildren(newWindow.get());
Plan newWindow = window.pushPartitionLimitThroughWindow(windowFuncLongPair.first,
windowFuncLongPair.second, true);
return topn.withChildren(newWindow);
}).toRule(RuleType.PUSH_DOWN_TOP_N_THROUGH_WINDOW),

// topn -> projection -> window
Expand All @@ -74,11 +77,14 @@ public List<Rule> buildRules() {
return topn;
}
long partitionLimit = topn.getLimit() + topn.getOffset();
Optional<Plan> newWindow = window.pushPartitionLimitThroughWindow(partitionLimit, true);
if (!newWindow.isPresent()) {
Pair<WindowExpression, Long> windowFuncLongPair = window
.getPushDownWindowFuncAndLimit(null, partitionLimit);
if (windowFuncLongPair == null) {
return topn;
}
return topn.withChildren(project.withChildren(newWindow.get()));
Plan newWindow = window.pushPartitionLimitThroughWindow(windowFuncLongPair.first,
windowFuncLongPair.second, true);
return topn.withChildren(project.withChildren(newWindow));
}).toRule(RuleType.PUSH_DOWN_TOP_N_THROUGH_PROJECT_WINDOW)
);
}
Expand Down
Loading

0 comments on commit 8f580b5

Please sign in to comment.