Skip to content

Commit

Permalink
[runtimefilter](nerieds)support Non equal runtime filter for nested l…
Browse files Browse the repository at this point in the history
…oop join #25193
  • Loading branch information
englefly authored and pull[bot] committed Dec 1, 2023
1 parent 569b3b9 commit 1433116
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TRuntimeFilterType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -1392,7 +1393,7 @@ public PlanFragment visitPhysicalNestedLoopJoin(
.getRuntimeFilterOfHashJoinNode(nestedLoopJoin);
filters.forEach(filter -> runtimeFilterTranslator
.createLegacyRuntimeFilter(filter, nestedLoopJoinNode, context));
if (!filters.isEmpty()) {
if (filters.stream().anyMatch(filter -> filter.getType() == TRuntimeFilterType.BITMAP)) {
nestedLoopJoinNode.setOutputLeftSideOnly(true);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, P
if (!hasInvalidTarget) {
org.apache.doris.planner.RuntimeFilter origFilter
= org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
filter.getId(), node, src, filter.getExprOrder(), targetExprList,
targetTupleIdMapList, filter.getType(), context.getLimits(), filter.getBuildSideNdv());
filter, node, src, targetExprList,
targetTupleIdMapList, context.getLimits());
if (node instanceof HashJoinNode) {
origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import org.apache.doris.nereids.stats.ExpressionEstimation;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.GreaterThan;
import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
Expand Down Expand Up @@ -53,6 +58,7 @@
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
import org.apache.doris.thrift.TRuntimeFilterType;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -113,6 +119,10 @@ public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? ext
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
join.right().accept(this, context);
join.left().accept(this, context);
if (RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) {
join.right().getOutput().forEach(slot ->
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
}
collectPushDownCTEInfos(join, context);
if (!getPushDownCTECandidates(ctx).isEmpty()) {
pushDownRuntimeFilterIntoCTE(ctx);
Expand Down Expand Up @@ -142,29 +152,19 @@ public PhysicalCTEProducer visitPhysicalCTEProducer(PhysicalCTEProducer producer
return producer;
}

@Override
public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
// TODO: we need to support all type join
join.right().accept(this, context);
join.left().accept(this, context);
private void generateBitMapRuntimeFilterForNLJ(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join,
RuntimeFilterContext ctx) {
if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN && join.getJoinType() != JoinType.CROSS_JOIN) {
return join;
return;
}
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();

if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) == 0) {
//only generate BITMAP filter for nested loop join
return join;
}
List<Slot> leftSlots = join.left().getOutput();
List<Slot> rightSlots = join.right().getOutput();
List<Expression> bitmapRuntimeFilterConditions = JoinUtils.extractBitmapRuntimeFilterConditions(leftSlots,
rightSlots, join.getOtherJoinConjuncts());
if (!JoinUtils.extractExpressionForHashTable(leftSlots, rightSlots, join.getOtherJoinConjuncts())
.first.isEmpty()) {
return join;
return;
}
int bitmapRFCount = bitmapRuntimeFilterConditions.size();
for (int i = 0; i < bitmapRFCount; i++) {
Expand Down Expand Up @@ -193,6 +193,104 @@ public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
}
}
}

/**
* A join B on B.x < A.x
* transform B.x < A.x to A.x > B.x,
* otherwise return null
*/
private ComparisonPredicate normalizeNonEqual(AbstractPhysicalJoin<? extends Plan, ? extends Plan> join,
Expression expr) {
if (!(expr instanceof ComparisonPredicate)) {
return null;
}
if (!(expr.child(0) instanceof SlotReference)) {
return null;
}
if (!(expr.child(1) instanceof SlotReference)) {
return null;
}
if (! join.left().getOutput().contains(expr.child(0))
|| ! join.right().getOutput().contains(expr.child(1))) {
if (join.left().getOutput().contains(expr.child(1))
&& join.right().getOutput().contains(expr.child(0))) {
return ((ComparisonPredicate) expr).commute();
}
} else {
return (ComparisonPredicate) expr;
}
return null;
}

private TMinMaxRuntimeFilterType getMinMaxType(ComparisonPredicate compare) {
if (compare instanceof LessThan || compare instanceof LessThanEqual) {
return TMinMaxRuntimeFilterType.MAX;
}
if (compare instanceof GreaterThan || compare instanceof GreaterThanEqual) {
return TMinMaxRuntimeFilterType.MIN;
}
return TMinMaxRuntimeFilterType.MIN_MAX;
}

/**
* A join B on A.x < B.y
* min-max filter (A.x < N, N=max(B.y)) could be applied to A.x
*/
private void generateMinMaxRuntimeFilter(AbstractPhysicalJoin<? extends Plan, ? extends Plan> join,
RuntimeFilterContext ctx) {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
int hashCondionSize = join.getHashJoinConjuncts().size();
for (int idx = 0; idx < join.getOtherJoinConjuncts().size(); idx++) {
int exprOrder = idx + hashCondionSize;
Expression expr = join.getOtherJoinConjuncts().get(exprOrder);
ComparisonPredicate compare = normalizeNonEqual(join, expr);
if (compare != null) {
Slot unwrappedSlot = checkTargetChild(compare.child(0));
if (unwrappedSlot == null) {
continue;
}
Pair<PhysicalRelation, Slot> pair = aliasTransferMap.get(unwrappedSlot);
if (pair == null) {
continue;
}
Slot olapScanSlot = pair.second;
PhysicalRelation scan = pair.first;
Preconditions.checkState(olapScanSlot != null && scan != null);
long buildSideNdv = getBuildSideNdv(join, compare);
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
compare.child(1), ImmutableList.of(olapScanSlot), ImmutableList.of(olapScanSlot),
TRuntimeFilterType.MIN_MAX, exprOrder, join, true, buildSideNdv,
getMinMaxType(compare));
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(scan.getRelationId(), olapScanSlot);
}
}
}

@Override
public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
// TODO: we need to support all type join
join.right().accept(this, context);
join.left().accept(this, context);

if (RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) {
join.right().getOutput().forEach(slot ->
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
return join;
}
RuntimeFilterContext ctx = context.getRuntimeFilterContext();

if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) != 0) {
generateBitMapRuntimeFilterForNLJ(join, ctx);
}

if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.MIN_MAX.getValue()) != 0) {
generateMinMaxRuntimeFilter(join, ctx);
}

return join;
}

Expand Down Expand Up @@ -233,14 +331,16 @@ public PhysicalRelation visitPhysicalRelation(PhysicalRelation relation, Cascade
return relation;
}

private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends Plan> join, EqualTo equalTo) {
// runtime filter build side ndv
private long getBuildSideNdv(AbstractPhysicalJoin<? extends Plan, ? extends Plan> join,
ComparisonPredicate compare) {
AbstractPlan right = (AbstractPlan) join.right();
//make ut test friendly
if (right.getStats() == null) {
return -1L;
}
ExpressionEstimation estimator = new ExpressionEstimation();
ColumnStatistic buildColStats = equalTo.right().accept(estimator, right.getStats());
ColumnStatistic buildColStats = compare.right().accept(estimator, right.getStats());
return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.trees.plans.algebra.Join;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;

import com.google.common.collect.ImmutableList;
Expand All @@ -41,6 +42,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Abstract class for all physical join node.
Expand Down Expand Up @@ -214,4 +216,30 @@ public List<Slot> computeOutput() {
? ImmutableList.of(markJoinSlotReference.get()) : ImmutableList.of())
.build();
}

@Override
public String toString() {
List<Object> args = Lists.newArrayList("type", joinType,
"hashCondition", hashJoinConjuncts,
"otherCondition", otherJoinConjuncts,
"stats", statistics);
if (markJoinSlotReference.isPresent()) {
args.add("isMarkJoin");
args.add("true");
}
if (markJoinSlotReference.isPresent()) {
args.add("MarkJoinSlotReference");
args.add(markJoinSlotReference.get());
}
if (hint != JoinHint.NONE) {
args.add("hint");
args.add(hint);
}
if (!runtimeFilters.isEmpty()) {
args.add("runtimeFilters");
args.add(runtimeFilters.stream().map(rf -> rf.toString() + " ").collect(Collectors.toList()));
}
return Utils.toSqlString(this.getClass().getName() + "[" + id.asInt() + "]" + getGroupIdWithPrefix(),
args.toArray());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@
import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.MutableState;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;
Expand Down Expand Up @@ -140,33 +138,6 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalHashJoin(this, context);
}

@Override
public String toString() {
List<Object> args = Lists.newArrayList("type", joinType,
"hashJoinCondition", hashJoinConjuncts,
"otherJoinCondition", otherJoinConjuncts,
"stats", statistics,
"fr", getMutableState(AbstractPlan.FRAGMENT_ID));
if (markJoinSlotReference.isPresent()) {
args.add("isMarkJoin");
args.add("true");
}
if (markJoinSlotReference.isPresent()) {
args.add("MarkJoinSlotReference");
args.add(markJoinSlotReference.get());
}
if (hint != JoinHint.NONE) {
args.add("hint");
args.add(hint);
}
if (!runtimeFilters.isEmpty()) {
args.add("runtimeFilters");
args.add(runtimeFilters.stream().map(rf -> rf.toString() + " ").collect(Collectors.toList()));
}
return Utils.toSqlString("PhysicalHashJoin[" + id.asInt() + "]" + getGroupIdWithPrefix(),
args.toArray());
}

@Override
public PhysicalHashJoin<Plan, Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.MutableState;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -113,17 +112,17 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalNestedLoopJoin(this, context);
}

@Override
public String toString() {
// TODO: Maybe we could pull up this to the abstract class in the future.
return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"type", joinType,
"otherJoinCondition", otherJoinConjuncts,
"isMarkJoin", markJoinSlotReference.isPresent(),
"markJoinSlotReference", markJoinSlotReference.isPresent() ? markJoinSlotReference.get() : "empty",
"stats", statistics
);
}
// @Override
// public String toString() {
// // TODO: Maybe we could pull up this to the abstract class in the future.
// return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() + "]" + getGroupIdWithPrefix(),
// "type", joinType,
// "otherJoinCondition", otherJoinConjuncts,
// "isMarkJoin", markJoinSlotReference.isPresent(),
// "markJoinSlotReference", markJoinSlotReference.isPresent() ? markJoinSlotReference.get() : "empty",
// "stats", statistics
// );
// }

@Override
public PhysicalNestedLoopJoin<Plan, Plan> withChildren(List<Plan> children) {
Expand Down
Loading

0 comments on commit 1433116

Please sign in to comment.