Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.Memo;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.TopnFilterContext;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleFactory;
import org.apache.doris.nereids.rules.RuleSet;
Expand Down Expand Up @@ -107,6 +108,7 @@ public class CascadesContext implements ScheduleContext {
// subqueryExprIsAnalyzed: whether the subquery has been analyzed.
private final Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed;
private final RuntimeFilterContext runtimeFilterContext;
private final TopnFilterContext topnFilterContext = new TopnFilterContext();
private Optional<Scope> outerScope = Optional.empty();
private Map<Long, TableIf> tables = null;

Expand Down Expand Up @@ -280,6 +282,10 @@ public RuntimeFilterContext getRuntimeFilterContext() {
return runtimeFilterContext;
}

public TopnFilterContext getTopnFilterContext() {
return topnFilterContext;
}

public void setCurrentJobContext(JobContext currentJobContext) {
this.currentJobContext = currentJobContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -741,6 +742,10 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
)
);
olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
if (context.getTopnFilterContext().isTopnFilterTarget(olapScan)) {
olapScanNode.setUseTopnOpt(true);
context.getTopnFilterContext().addLegacyTarget(olapScan, olapScanNode);
}
// TODO: we need to remove all finalizeForNereids
olapScanNode.finalizeForNereids();
// Create PlanFragment
Expand All @@ -764,6 +769,10 @@ public PlanFragment visitPhysicalDeferMaterializeOlapScan(
PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanTranslatorContext context) {
PlanFragment planFragment = visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot();
if (context.getTopnFilterContext().isTopnFilterTarget(deferMaterializeOlapScan)) {
olapScanNode.setUseTopnOpt(true);
context.getTopnFilterContext().addLegacyTarget(deferMaterializeOlapScan, olapScanNode);
}
TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId());
for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
if (deferMaterializeOlapScan.getDeferMaterializeSlotIds()
Expand Down Expand Up @@ -2026,20 +2035,23 @@ public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sor
public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTranslatorContext context) {
PlanFragment inputFragment = topN.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(topN.child(0));

// 2. According to the type of sort, generate physical plan
if (!topN.getSortPhase().isMerge()) {
// For localSort or Gather->Sort, we just need to add TopNNode
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
if (topN.isEnableRuntimeFilter()) {
if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
sortNode.setUseTopnOpt(true);
PlanNode child = sortNode.getChild(0);
Preconditions.checkArgument(child instanceof OlapScanNode,
"topN opt expect OlapScanNode, but we get " + child);
OlapScanNode scanNode = ((OlapScanNode) child);
scanNode.setUseTopnOpt(true);
context.getTopnFilterContext().getTargets(topN).forEach(
olapScan -> {
Optional<OlapScanNode> legacyScan =
context.getTopnFilterContext().getLegacyScanNode(olapScan);
Preconditions.checkState(legacyScan.isPresent(),
"cannot find OlapScanNode for topn filter");
legacyScan.get().addTopnFilterSortNode(sortNode);
}
);
}
// push sort to scan opt
if (sortNode.getChild(0) instanceof OlapScanNode) {
Expand Down Expand Up @@ -2084,12 +2096,23 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra
@Override
public PlanFragment visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
PlanTranslatorContext context) {

PlanFragment planFragment = visitPhysicalTopN(topN.getPhysicalTopN(), context);
if (planFragment.getPlanRoot() instanceof SortNode) {
SortNode sortNode = (SortNode) planFragment.getPlanRoot();
sortNode.setUseTwoPhaseReadOpt(true);
sortNode.getSortInfo().setUseTwoPhaseRead();
if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
sortNode.setUseTopnOpt(true);
context.getTopnFilterContext().getTargets(topN).forEach(
olapScan -> {
Optional<OlapScanNode> legacyScan =
context.getTopnFilterContext().getLegacyScanNode(olapScan);
Preconditions.checkState(legacyScan.isPresent(),
"cannot find OlapScanNode for topn filter");
legacyScan.get().addTopnFilterSortNode(sortNode);
}
);
}
TupleDescriptor tupleDescriptor = sortNode.getSortInfo().getSortTupleDescriptor();
for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
if (topN.getDeferMaterializeSlotIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.processor.post.TopnFilterContext;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.SlotReference;
Expand Down Expand Up @@ -70,7 +71,7 @@ public class PlanTranslatorContext {
private final DescriptorTable descTable = new DescriptorTable();

private final RuntimeFilterTranslator translator;

private final TopnFilterContext topnFilterContext;
/**
* index from Nereids' slot to legacy slot.
*/
Expand Down Expand Up @@ -115,12 +116,14 @@ public class PlanTranslatorContext {
public PlanTranslatorContext(CascadesContext ctx) {
this.connectContext = ctx.getConnectContext();
this.translator = new RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
}

@VisibleForTesting
public PlanTranslatorContext() {
this.connectContext = null;
this.translator = null;
this.topnFilterContext = new TopnFilterContext();
}

/**
Expand Down Expand Up @@ -187,6 +190,10 @@ public Optional<RuntimeFilterTranslator> getRuntimeTranslator() {
return Optional.ofNullable(translator);
}

public TopnFilterContext getTopnFilterContext() {
return topnFilterContext;
}

public PlanFragmentId nextFragmentId() {
return fragmentIdGenerator.getNextId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,103 +19,57 @@

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.algebra.TopN;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.qe.ConnectContext;

import java.util.Optional;
/**
* topN opt
* refer to:
* <a href="https://github.com/apache/doris/pull/15558">...</a>
* <a href="https://github.com/apache/doris/pull/15663">...</a>
*
* // only support simple case: select ... from tbl [where ...] order by ... limit ...
* // [deprecated] only support simple case: select ... from tbl [where ...] order by ... limit ...
*/

public class TopNScanOpt extends PlanPostProcessor {

@Override
public Plan visit(Plan plan, CascadesContext context) {
return plan;
}

@Override
public Plan visitPhysicalSink(PhysicalSink<? extends Plan> physicalSink, CascadesContext context) {
if (physicalSink.child() instanceof TopN) {
return super.visit(physicalSink, context);
} else if (physicalSink.child() instanceof Project && physicalSink.child().child(0) instanceof TopN) {
PhysicalTopN<?> oldTopN = (PhysicalTopN<?>) physicalSink.child().child(0);
PhysicalTopN<?> newTopN = (PhysicalTopN<?>) oldTopN.accept(this, context);
if (newTopN == oldTopN) {
return physicalSink;
} else {
return physicalSink.withChildren(physicalSink.child().withChildren(newTopN));
}
}
return physicalSink;
}

@Override
public Plan visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute, CascadesContext context) {
if (distribute.child() instanceof TopN && distribute.child() instanceof AbstractPhysicalSort
&& ((AbstractPhysicalSort<?>) distribute.child()).getSortPhase() == SortPhase.LOCAL_SORT) {
return super.visit(distribute, context);
}
return distribute;
}

@Override
public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
if (topN.getSortPhase() == SortPhase.LOCAL_SORT) {
Plan child = topN.child();
topN = rewriteTopN(topN);
if (child != topN.child()) {
topN = ((PhysicalTopN<? extends Plan>) topN.withChildren(child)).copyStatsAndGroupIdFrom(topN);
}
return topN;
} else if (topN.getSortPhase() == SortPhase.MERGE_SORT) {
return (PhysicalTopN<? extends Plan>) super.visit(topN, ctx);
}
Optional<OlapScan> scanOpt = findScanForTopnFilter(topN);
scanOpt.ifPresent(scan -> ctx.getTopnFilterContext().addTopnFilter(topN, scan));
topN.child().accept(this, ctx);
return topN;
}

@Override
public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
CascadesContext context) {
if (topN.getSortPhase() == SortPhase.LOCAL_SORT) {
PhysicalTopN<? extends Plan> rewrittenTopN = rewriteTopN(topN.getPhysicalTopN());
if (topN.getPhysicalTopN() != rewrittenTopN) {
topN = topN.withPhysicalTopN(rewrittenTopN).copyStatsAndGroupIdFrom(topN);
}
return topN;
} else if (topN.getSortPhase() == SortPhase.MERGE_SORT) {
return super.visit(topN, context);
}
Optional<OlapScan> scanOpt = findScanForTopnFilter(topN.getPhysicalTopN());
scanOpt.ifPresent(scan -> context.getTopnFilterContext().addTopnFilter(topN, scan));
topN.child().accept(this, context);
return topN;
}

private PhysicalTopN<? extends Plan> rewriteTopN(PhysicalTopN<? extends Plan> topN) {
Plan child = topN.child();
private Optional<OlapScan> findScanForTopnFilter(PhysicalTopN<? extends Plan> topN) {
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
return topN;
return Optional.empty();
}
if (topN.getOrderKeys().isEmpty()) {
return topN;
return Optional.empty();
}

// topn opt
long topNOptLimitThreshold = getTopNOptLimitThreshold();
if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) {
return topN;
return Optional.empty();
}
// if firstKey's column is not present, it means the firstKey is not an original column from scan node
// for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is
Expand All @@ -125,27 +79,44 @@ private PhysicalTopN<? extends Plan> rewriteTopN(PhysicalTopN<? extends Plan> to
// see Alias::toSlot() method to get how column info is passed around by alias of slotReference
Expression firstKey = topN.getOrderKeys().get(0).getExpr();
if (!firstKey.isColumnFromTable()) {
return topN;
return Optional.empty();
}
if (firstKey.getDataType().isFloatType()
|| firstKey.getDataType().isDoubleType()) {
return topN;
return Optional.empty();
}

OlapScan olapScan;
while (child instanceof Project || child instanceof Filter) {
child = child.child(0);
if (! (firstKey instanceof SlotReference)) {
return Optional.empty();
}
if (!(child instanceof OlapScan)) {
return topN;
OlapScan olapScan = findScanNodeBySlotReference(topN, (SlotReference) firstKey);
if (olapScan != null
&& olapScan.getTable().isDupKeysOrMergeOnWrite()
&& olapScan instanceof PhysicalCatalogRelation) {
return Optional.of(olapScan);
}
olapScan = (OlapScan) child;

if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
return topN.withEnableRuntimeFilter(true).copyStatsAndGroupIdFrom(topN);
}
return Optional.empty();
}

return topN;
private OlapScan findScanNodeBySlotReference(Plan root, SlotReference slot) {
OlapScan target = null;
if (root instanceof OlapScan && root.getOutputSet().contains(slot)) {
return (OlapScan) root;
} else {
if (! root.children().isEmpty()) {
// for join and intersect, push topn-filter to their left child.
// TODO for union, topn-filter can be pushed down to all of its children.
Plan child = root.child(0);
if (!(child instanceof PhysicalWindow) && child.getOutputSet().contains(slot)) {
target = findScanNodeBySlotReference(child, slot);
if (target != null) {
return target;
}
}
}
}
return target;
}

private long getTopNOptLimitThreshold() {
Expand Down
Loading