diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 9361b7c63e3bc71..5e4097bbfec4722 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -180,6 +180,7 @@ import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPushAggOp; +import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -1392,7 +1393,7 @@ public PlanFragment visitPhysicalNestedLoopJoin( .getRuntimeFilterOfHashJoinNode(nestedLoopJoin); filters.forEach(filter -> runtimeFilterTranslator .createLegacyRuntimeFilter(filter, nestedLoopJoinNode, context)); - if (!filters.isEmpty()) { + if (filters.stream().anyMatch(filter -> filter.getType() == TRuntimeFilterType.BITMAP)) { nestedLoopJoinNode.setOutputLeftSideOnly(true); } }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 982c8677aa7b91c..8ffd307e328a515 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -152,8 +152,8 @@ public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, P if (!hasInvalidTarget) { org.apache.doris.planner.RuntimeFilter origFilter = org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter( - filter.getId(), node, src, filter.getExprOrder(), targetExprList, - targetTupleIdMapList, filter.getType(), context.getLimits(), filter.getBuildSideNdv()); + filter, node, src, targetExprList, + targetTupleIdMapList, context.getLimits()); if (node instanceof HashJoinNode) { origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 2683754d77e69ca..b99b8904e5eb343 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -23,9 +23,14 @@ import org.apache.doris.nereids.stats.ExpressionEstimation; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.CTEId; +import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThan; +import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; +import org.apache.doris.nereids.trees.expressions.LessThan; +import org.apache.doris.nereids.trees.expressions.LessThanEqual; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; @@ -53,6 +58,7 @@ import org.apache.doris.nereids.util.JoinUtils; import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.thrift.TMinMaxRuntimeFilterType; import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Preconditions; @@ -113,6 +119,10 @@ public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin + context.getRuntimeFilterContext().getAliasTransferMap().remove(slot)); + } collectPushDownCTEInfos(join, context); if (!getPushDownCTECandidates(ctx).isEmpty()) { pushDownRuntimeFilterIntoCTE(ctx); @@ -142,29 +152,19 @@ public PhysicalCTEProducer visitPhysicalCTEProducer(PhysicalCTEProducer producer return producer; } - @Override - public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin join, - CascadesContext context) { - // TODO: we need to support all type join - join.right().accept(this, context); - join.left().accept(this, context); + private void generateBitMapRuntimeFilterForNLJ(PhysicalNestedLoopJoin join, + RuntimeFilterContext ctx) { if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN && join.getJoinType() != JoinType.CROSS_JOIN) { - return join; + return; } - RuntimeFilterContext ctx = context.getRuntimeFilterContext(); Map> aliasTransferMap = ctx.getAliasTransferMap(); - - if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) == 0) { - //only generate BITMAP filter for nested loop join - return join; - } List leftSlots = join.left().getOutput(); List rightSlots = join.right().getOutput(); List bitmapRuntimeFilterConditions = JoinUtils.extractBitmapRuntimeFilterConditions(leftSlots, rightSlots, join.getOtherJoinConjuncts()); if (!JoinUtils.extractExpressionForHashTable(leftSlots, rightSlots, join.getOtherJoinConjuncts()) .first.isEmpty()) { - return join; + return; } int bitmapRFCount = bitmapRuntimeFilterConditions.size(); for (int i = 0; i < bitmapRFCount; i++) { @@ -193,6 +193,104 @@ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin B.x, + * otherwise return null + */ + private ComparisonPredicate normalizeNonEqual(AbstractPhysicalJoin join, + Expression expr) { + if (!(expr instanceof ComparisonPredicate)) { + return null; + } + if (!(expr.child(0) instanceof SlotReference)) { + return null; + } + if (!(expr.child(1) instanceof SlotReference)) { + return null; + } + if (! join.left().getOutput().contains(expr.child(0)) + || ! join.right().getOutput().contains(expr.child(1))) { + if (join.left().getOutput().contains(expr.child(1)) + && join.right().getOutput().contains(expr.child(0))) { + return ((ComparisonPredicate) expr).commute(); + } + } else { + return (ComparisonPredicate) expr; + } + return null; + } + + private TMinMaxRuntimeFilterType getMinMaxType(ComparisonPredicate compare) { + if (compare instanceof LessThan || compare instanceof LessThanEqual) { + return TMinMaxRuntimeFilterType.MAX; + } + if (compare instanceof GreaterThan || compare instanceof GreaterThanEqual) { + return TMinMaxRuntimeFilterType.MIN; + } + return TMinMaxRuntimeFilterType.MIN_MAX; + } + + /** + * A join B on A.x < B.y + * min-max filter (A.x < N, N=max(B.y)) could be applied to A.x + */ + private void generateMinMaxRuntimeFilter(AbstractPhysicalJoin join, + RuntimeFilterContext ctx) { + Map> aliasTransferMap = ctx.getAliasTransferMap(); + int hashCondionSize = join.getHashJoinConjuncts().size(); + for (int idx = 0; idx < join.getOtherJoinConjuncts().size(); idx++) { + int exprOrder = idx + hashCondionSize; + Expression expr = join.getOtherJoinConjuncts().get(exprOrder); + ComparisonPredicate compare = normalizeNonEqual(join, expr); + if (compare != null) { + Slot unwrappedSlot = checkTargetChild(compare.child(0)); + if (unwrappedSlot == null) { + continue; + } + Pair pair = aliasTransferMap.get(unwrappedSlot); + if (pair == null) { + continue; + } + Slot olapScanSlot = pair.second; + PhysicalRelation scan = pair.first; + Preconditions.checkState(olapScanSlot != null && scan != null); + long buildSideNdv = getBuildSideNdv(join, compare); + RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + compare.child(1), ImmutableList.of(olapScanSlot), ImmutableList.of(olapScanSlot), + TRuntimeFilterType.MIN_MAX, exprOrder, join, true, buildSideNdv, + getMinMaxType(compare)); + ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); + ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); + ctx.setTargetsOnScanNode(scan.getRelationId(), olapScanSlot); + } + } + } + + @Override + public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin join, + CascadesContext context) { + // TODO: we need to support all type join + join.right().accept(this, context); + join.left().accept(this, context); + + if (RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) { + join.right().getOutput().forEach(slot -> + context.getRuntimeFilterContext().getAliasTransferMap().remove(slot)); + return join; + } + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + + if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) != 0) { + generateBitMapRuntimeFilterForNLJ(join, ctx); + } + + if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.MIN_MAX.getValue()) != 0) { + generateMinMaxRuntimeFilter(join, ctx); + } + return join; } @@ -233,14 +331,16 @@ public PhysicalRelation visitPhysicalRelation(PhysicalRelation relation, Cascade return relation; } - private long getBuildSideNdv(PhysicalHashJoin join, EqualTo equalTo) { + // runtime filter build side ndv + private long getBuildSideNdv(AbstractPhysicalJoin join, + ComparisonPredicate compare) { AbstractPlan right = (AbstractPlan) join.right(); //make ut test friendly if (right.getStats() == null) { return -1L; } ExpressionEstimation estimator = new ExpressionEstimation(); - ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats()); + ColumnStatistic buildColStats = compare.right().accept(estimator, right.getStats()); return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java index eb9ed7cfc975d19..ad7e8ba8cc81e2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Join; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.JoinUtils; +import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; @@ -41,6 +42,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; /** * Abstract class for all physical join node. @@ -214,4 +216,30 @@ public List computeOutput() { ? ImmutableList.of(markJoinSlotReference.get()) : ImmutableList.of()) .build(); } + + @Override + public String toString() { + List args = Lists.newArrayList("type", joinType, + "hashCondition", hashJoinConjuncts, + "otherCondition", otherJoinConjuncts, + "stats", statistics); + if (markJoinSlotReference.isPresent()) { + args.add("isMarkJoin"); + args.add("true"); + } + if (markJoinSlotReference.isPresent()) { + args.add("MarkJoinSlotReference"); + args.add(markJoinSlotReference.get()); + } + if (hint != JoinHint.NONE) { + args.add("hint"); + args.add(hint); + } + if (!runtimeFilters.isEmpty()) { + args.add("runtimeFilters"); + args.add(runtimeFilters.stream().map(rf -> rf.toString() + " ").collect(Collectors.toList())); + } + return Utils.toSqlString(this.getClass().getName() + "[" + id.asInt() + "]" + getGroupIdWithPrefix(), + args.toArray()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java index 6d9583504eae1ce..b60afd673089ca5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java @@ -31,14 +31,12 @@ import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; -import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.JoinHint; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.MutableState; -import org.apache.doris.nereids.util.Utils; import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.Statistics; @@ -140,33 +138,6 @@ public R accept(PlanVisitor visitor, C context) { return visitor.visitPhysicalHashJoin(this, context); } - @Override - public String toString() { - List args = Lists.newArrayList("type", joinType, - "hashJoinCondition", hashJoinConjuncts, - "otherJoinCondition", otherJoinConjuncts, - "stats", statistics, - "fr", getMutableState(AbstractPlan.FRAGMENT_ID)); - if (markJoinSlotReference.isPresent()) { - args.add("isMarkJoin"); - args.add("true"); - } - if (markJoinSlotReference.isPresent()) { - args.add("MarkJoinSlotReference"); - args.add(markJoinSlotReference.get()); - } - if (hint != JoinHint.NONE) { - args.add("hint"); - args.add(hint); - } - if (!runtimeFilters.isEmpty()) { - args.add("runtimeFilters"); - args.add(runtimeFilters.stream().map(rf -> rf.toString() + " ").collect(Collectors.toList())); - } - return Utils.toSqlString("PhysicalHashJoin[" + id.asInt() + "]" + getGroupIdWithPrefix(), - args.toArray()); - } - @Override public PhysicalHashJoin withChildren(List children) { Preconditions.checkArgument(children.size() == 2); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java index d6b08d02c5491ac..d2dca0d2e73e197 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java @@ -28,7 +28,6 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.MutableState; -import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; import com.google.common.base.Preconditions; @@ -113,17 +112,17 @@ public R accept(PlanVisitor visitor, C context) { return visitor.visitPhysicalNestedLoopJoin(this, context); } - @Override - public String toString() { - // TODO: Maybe we could pull up this to the abstract class in the future. - return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() + "]" + getGroupIdWithPrefix(), - "type", joinType, - "otherJoinCondition", otherJoinConjuncts, - "isMarkJoin", markJoinSlotReference.isPresent(), - "markJoinSlotReference", markJoinSlotReference.isPresent() ? markJoinSlotReference.get() : "empty", - "stats", statistics - ); - } + // @Override + // public String toString() { + // // TODO: Maybe we could pull up this to the abstract class in the future. + // return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() + "]" + getGroupIdWithPrefix(), + // "type", joinType, + // "otherJoinCondition", otherJoinConjuncts, + // "isMarkJoin", markJoinSlotReference.isPresent(), + // "markJoinSlotReference", markJoinSlotReference.isPresent() ? markJoinSlotReference.get() : "empty", + // "stats", statistics + // ); + // } @Override public PhysicalNestedLoopJoin withChildren(List children) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java index 74a100dc2372d39..f213a8996ce91fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java @@ -20,6 +20,7 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.planner.RuntimeFilterId; +import org.apache.doris.thrift.TMinMaxRuntimeFilterType; import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.collect.ImmutableList; @@ -45,21 +46,31 @@ public class RuntimeFilter { private final boolean bitmapFilterNotIn; private final long buildSideNdv; + // use for min-max filter only. specify if the min or max side is valid + private final TMinMaxRuntimeFilterType tMinMaxType; /** * constructor */ public RuntimeFilter(RuntimeFilterId id, Expression src, List targets, TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv) { - this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder, builderNode, false, buildSideNdv); + this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder, + builderNode, false, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX); + } + + public RuntimeFilter(RuntimeFilterId id, Expression src, List targets, List targetExpressions, + TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, + boolean bitmapFilterNotIn, long buildSideNdv) { + this(id, src, targets, targetExpressions, type, exprOrder, + builderNode, bitmapFilterNotIn, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX); } /** * constructor */ public RuntimeFilter(RuntimeFilterId id, Expression src, List targets, List targetExpressions, - TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, boolean bitmapFilterNotIn, - long buildSideNdv) { + TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, + boolean bitmapFilterNotIn, long buildSideNdv, TMinMaxRuntimeFilterType tMinMaxType) { this.id = id; this.srcSlot = src; this.targetSlots = Lists.newArrayList(targets); @@ -69,9 +80,14 @@ public RuntimeFilter(RuntimeFilterId id, Expression src, List targets, Lis this.builderNode = builderNode; this.bitmapFilterNotIn = bitmapFilterNotIn; this.buildSideNdv = buildSideNdv <= 0 ? -1L : buildSideNdv; + this.tMinMaxType = tMinMaxType; builderNode.addRuntimeFilter(this); } + public TMinMaxRuntimeFilterType gettMinMaxType() { + return tMinMaxType; + } + public Expression getSrcExpr() { return srcSlot; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index 8f5ce78304bb45b..4d4a2c641a98c31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -150,24 +150,7 @@ protected String getRuntimeFilterExplainString(boolean isBuildNode, boolean isBr } List filtersStr = new ArrayList<>(); for (RuntimeFilter filter : runtimeFilters) { - StringBuilder filterStr = new StringBuilder(); - filterStr.append(filter.getFilterId()); - if (!isBrief) { - filterStr.append("["); - filterStr.append(filter.getType().toString().toLowerCase()); - filterStr.append("]"); - if (isBuildNode) { - filterStr.append(" <- "); - filterStr.append(filter.getSrcExpr().toSql()); - filterStr.append("(").append(filter.getEstimateNdv()).append("/") - .append(filter.getExpectFilterSizeBytes()).append("/") - .append(filter.getFilterSizeBytes()).append(")"); - } else { - filterStr.append(" -> "); - filterStr.append(filter.getTargetExpr(getExchNodeId()).toSql()); - } - } - filtersStr.add(filterStr.toString()); + filtersStr.add(filter.getExplainString(isBuildNode, isBrief, getExchNodeId())); } return Joiner.on(", ").join(filtersStr) + "\n"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 40cfe876b89a3dd..754b9fcfcb25d57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1116,24 +1116,7 @@ protected String getRuntimeFilterExplainString(boolean isBuildNode, boolean isBr } List filtersStr = new ArrayList<>(); for (RuntimeFilter filter : runtimeFilters) { - StringBuilder filterStr = new StringBuilder(); - filterStr.append(filter.getFilterId()); - if (!isBrief) { - filterStr.append("["); - filterStr.append(filter.getType().toString().toLowerCase()); - filterStr.append("]"); - if (isBuildNode) { - filterStr.append(" <- "); - filterStr.append(filter.getSrcExpr().toSql()); - filterStr.append("(").append(filter.getEstimateNdv()).append("/") - .append(filter.getExpectFilterSizeBytes()).append("/") - .append(filter.getFilterSizeBytes()).append(")"); - } else { - filterStr.append(" -> "); - filterStr.append(filter.getTargetExpr(getId()).toSql()); - } - } - filtersStr.add(filterStr.toString()); + filtersStr.add(filter.getExplainString(isBuildNode, isBrief, getId())); } return Joiner.on(", ").join(filtersStr) + "\n"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index c6781e7ed4cd682..d21f390c04dab19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -35,6 +35,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.TMinMaxRuntimeFilterType; import org.apache.doris.thrift.TRuntimeFilterDesc; import org.apache.doris.thrift.TRuntimeFilterType; @@ -109,6 +110,8 @@ public final class RuntimeFilter { private boolean useRemoteRfOpt = true; + private TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType; + /** * Internal representation of a runtime filter target. */ @@ -142,8 +145,10 @@ public String toString() { } private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, Expr srcExpr, int exprOrder, - List origTargetExprs, List>> targetSlots, TRuntimeFilterType type, - RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) { + List origTargetExprs, List>> targetSlots, + TRuntimeFilterType type, + RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv, + TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType) { this.id = filterId; this.builderNode = filterSrcNode; this.srcExpr = srcExpr; @@ -152,16 +157,27 @@ private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, Expr src this.targetSlotsByTid = ImmutableList.copyOf(targetSlots); this.runtimeFilterType = type; this.ndvEstimate = buildSizeNdv; + this.tMinMaxRuntimeFilterType = tMinMaxRuntimeFilterType; computeNdvEstimate(); calculateFilterSize(filterSizeLimits); } + private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, Expr srcExpr, int exprOrder, + List origTargetExprs, List>> targetSlots, TRuntimeFilterType type, + RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) { + this(filterId, filterSrcNode, srcExpr, exprOrder, origTargetExprs, + targetSlots, type, filterSizeLimits, buildSizeNdv, TMinMaxRuntimeFilterType.MIN_MAX); + } + // only for nereids planner - public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id, JoinNodeBase node, Expr srcExpr, - int exprOrder, List origTargetExprs, List>> targetSlots, - TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long buildSizeNdv) { - return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExprs, - targetSlots, type, filterSizeLimits, buildSizeNdv); + public static RuntimeFilter fromNereidsRuntimeFilter( + org.apache.doris.nereids.trees.plans.physical.RuntimeFilter nereidsFilter, + JoinNodeBase node, Expr srcExpr, List origTargetExprs, + List>> targetSlots, + RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) { + return new RuntimeFilter(nereidsFilter.getId(), node, srcExpr, nereidsFilter.getExprOrder(), origTargetExprs, + targetSlots, nereidsFilter.getType(), filterSizeLimits, nereidsFilter.getBuildSideNdv(), + nereidsFilter.gettMinMaxType()); } @Override @@ -224,6 +240,9 @@ public TRuntimeFilterDesc toThrift() { tFilter.setBitmapTargetExpr(targets.get(0).expr.treeToThrift()); tFilter.setBitmapFilterNotIn(bitmapFilterNotIn); } + if (runtimeFilterType.equals(TRuntimeFilterType.MIN_MAX)) { + tFilter.setMinMaxType(tMinMaxRuntimeFilterType); + } tFilter.setOptRemoteRf(optRemoteRf); return tFilter; } @@ -256,6 +275,18 @@ public TRuntimeFilterType getType() { return runtimeFilterType; } + public String getTypeDesc() { + String desc = runtimeFilterType.toString().toLowerCase(); + if (runtimeFilterType == TRuntimeFilterType.MIN_MAX) { + if (tMinMaxRuntimeFilterType == TMinMaxRuntimeFilterType.MIN) { + desc = "min"; + } else if (tMinMaxRuntimeFilterType == TMinMaxRuntimeFilterType.MAX) { + desc = "max"; + } + } + return desc; + } + public void setType(TRuntimeFilterType type) { runtimeFilterType = type; } @@ -685,4 +716,25 @@ public String debugString() { public long getExpectFilterSizeBytes() { return expectFilterSizeBytes; } + + public String getExplainString(boolean isBuildNode, boolean isBrief, PlanNodeId targetNodeId) { + StringBuilder filterStr = new StringBuilder(); + filterStr.append(getFilterId()); + if (!isBrief) { + filterStr.append("["); + filterStr.append(getTypeDesc()); + filterStr.append("]"); + if (isBuildNode) { + filterStr.append(" <- "); + filterStr.append(getSrcExpr().toSql()); + filterStr.append("(").append(getEstimateNdv()).append("/") + .append(getExpectFilterSizeBytes()).append("/") + .append(getFilterSizeBytes()).append(")"); + } else { + filterStr.append(" -> "); + filterStr.append(getTargetExpr(targetNodeId).toSql()); + } + } + return filterStr.toString(); + } } diff --git a/regression-test/suites/correctness_p0/test_runtime_filter.groovy b/regression-test/suites/correctness_p0/test_runtime_filter.groovy new file mode 100644 index 000000000000000..f691bd160606c61 --- /dev/null +++ b/regression-test/suites/correctness_p0/test_runtime_filter.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate +// and modified by Doris. + +suite("test_runtime_filter") { + + sql """ DROP TABLE IF EXISTS rf_tblA """ + sql """ + CREATE TABLE IF NOT EXISTS rf_tblA ( + a int + ) + DUPLICATE KEY(a) + DISTRIBUTED BY HASH(a) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + sql """ DROP TABLE IF EXISTS rf_tblB """ + sql """ + CREATE TABLE IF NOT EXISTS rf_tblB ( + b int + ) + DUPLICATE KEY(b) + DISTRIBUTED BY HASH(b) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + sql """ + CREATE TABLE IF NOT EXISTS rf_tblC ( + c int + ) + DUPLICATE KEY(c) + DISTRIBUTED BY HASH(c) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + sql "set enable_pipeline_engine=true;" + sql "set runtime_filter_type=4" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set disable_join_reorder=true" + + explain{ + sql ("""select * from rf_tblA join rf_tblB on a < b""") + contains "runtime filters: RF000[max] -> a" + contains "runtime filters: RF000[max] <- b" + } + + explain{ + sql ("""select * from rf_tblA join rf_tblB on a > b""") + contains "runtime filters: RF000[min] -> a" + contains "runtime filters: RF000[min] <- b" + } + + explain{ + sql ("""select * from rf_tblA join rf_tblB on b < a""") + contains "runtime filters: RF000[min] -> a" + contains "runtime filters: RF000[min] <- b" + } + + explain{ + sql ("""select * from rf_tblA right outer join rf_tblB on a < b""") + contains "runtime filters: RF000[max] <- b" + contains "runtime filters: RF000[max] -> a" + } + + explain{ + sql ("""select * from rf_tblA left join rf_tblB on a < b; """) + notContains "runtime filters" + } + + explain{ + sql ("""select * from rf_tblA full outer join rf_tblB on a = b; """) + notContains "runtime filters" + } + + explain{ + sql (""" + with x as (select * from rf_tblA join rf_tblB on a=b) + select * from x join rf_tblC on x.b <= rf_tblC.c + union + select * from x join rf_tblC on x.b <= rf_tblC.c + """) + contains "runtime filters: RF001[max] -> b" + contains "runtime filters: RF002[max] -> b" + contains "runtime filters: RF001[max] <- c" + contains "runtime filters: RF002[max] <- c" + + } +}