Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -3072,8 +3073,10 @@ public AutoIncrementGenerator getAutoIncrementGenerator() {
* @param selectedIndexId the index want to scan
*/
public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) {
boolean useStoreRow = this.storeRowColumn()
&& CollectionUtils.isEmpty(getTableProperty().getCopiedRowStoreColumns());
TFetchOption fetchOption = new TFetchOption();
fetchOption.setFetchRowStore(this.storeRowColumn());
fetchOption.setFetchRowStore(useStoreRow);
fetchOption.setUseTwoPhaseFetch(true);

// get backend by tag
Expand All @@ -3094,7 +3097,7 @@ public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) {

fetchOption.setNodesInfo(nodesInfo);

if (!this.storeRowColumn()) {
if (!useStoreRow) {
List<TColumn> columnsDesc = Lists.newArrayList();
getColumnDesc(selectedIndexId, columnsDesc, null, null);
fetchOption.setColumnDesc(columnsDesc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1916,7 +1916,13 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
List<Expr> allProjectionExprs = Lists.newArrayList();
List<Slot> slots = null;
// TODO FE/BE do not support multi-layer-project on MultiDataSink now.
if (project.hasMultiLayerProjection() && !(inputFragment instanceof MultiCastPlanFragment)) {
if (project.hasMultiLayerProjection()
&& !(inputFragment instanceof MultiCastPlanFragment)
// TODO support for two phase read with project, remove it after refactor
&& !(project.child() instanceof PhysicalDeferMaterializeTopN)
&& !(project.child() instanceof PhysicalDeferMaterializeOlapScan
|| (project.child() instanceof PhysicalFilter
&& ((PhysicalFilter<?>) project.child()).child() instanceof PhysicalDeferMaterializeOlapScan))) {
int layerCount = project.getMultiLayerProjects().size();
for (int i = 0; i < layerCount; i++) {
List<NamedExpression> layer = project.getMultiLayerProjects().get(i);
Expand Down Expand Up @@ -2024,37 +2030,28 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
}

if (inputPlanNode instanceof ScanNode) {
TupleDescriptor projectionTuple = null;
// slotIdsByOrder is used to ensure the ScanNode's output order is same with current Project
// if we change the output order in translate project, the upper node will receive wrong order
// tuple, since they get the order from project.getOutput() not scan.getOutput()./
projectionTuple = generateTupleDesc(slots,
((ScanNode) inputPlanNode).getTupleDesc().getTable(), context);
inputPlanNode.setProjectList(projectionExprs);
inputPlanNode.setOutputTupleDesc(projectionTuple);

// TODO: this is a temporary scheme to support two phase read when has project.
// we need to refactor all topn opt into rbo stage.
// TODO support for two phase read with project, remove this if after refactor
if (!(project.child() instanceof PhysicalDeferMaterializeOlapScan
|| (project.child() instanceof PhysicalFilter
&& ((PhysicalFilter<?>) project.child()).child() instanceof PhysicalDeferMaterializeOlapScan))) {
TupleDescriptor projectionTuple = generateTupleDesc(slots,
((ScanNode) inputPlanNode).getTupleDesc().getTable(), context);
inputPlanNode.setProjectList(projectionExprs);
inputPlanNode.setOutputTupleDesc(projectionTuple);
}
if (inputPlanNode instanceof OlapScanNode) {
ArrayList<SlotDescriptor> olapScanSlots =
context.getTupleDesc(inputPlanNode.getTupleIds().get(0)).getSlots();
SlotDescriptor lastSlot = olapScanSlots.get(olapScanSlots.size() - 1);
if (lastSlot.getColumn() != null
&& lastSlot.getColumn().getName().equals(Column.ROWID_COL)) {
injectRowIdColumnSlot(projectionTuple);
SlotRef slotRef = new SlotRef(lastSlot);
inputPlanNode.getProjectList().add(slotRef);
requiredByProjectSlotIdSet.add(lastSlot.getId());
requiredSlotIdSet.add(lastSlot.getId());
}
((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet);
}
updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
requiredByProjectSlotIdSet, context);
} else {
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
inputPlanNode.setProjectList(projectionExprs);
inputPlanNode.setOutputTupleDesc(tupleDescriptor);
if (project.child() instanceof PhysicalDeferMaterializeTopN) {
inputFragment.setOutputExprs(projectionExprs);
} else {
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, null, context);
inputPlanNode.setProjectList(projectionExprs);
inputPlanNode.setOutputTupleDesc(tupleDescriptor);
}
}
return inputFragment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -44,6 +45,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* rewrite simple top n query to defer materialize slot not use for sort or predicate
Expand All @@ -54,51 +56,155 @@ public class DeferMaterializeTopNResult implements RewriteRuleFactory {
public List<Rule> buildRules() {
return ImmutableList.of(
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalTopN(logicalOlapScan()))
.when(r -> r.child().getLimit() < getTopNOptLimitThreshold())
.whenNot(r -> r.child().getOrderKeys().isEmpty())
.when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
.when(r -> r.child().child().getTable().getEnableLightSchemaChange())
.when(r -> r.child().child().getTable().isDupKeysOrMergeOnWrite())
.then(r -> deferMaterialize(r, r.child(), Optional.empty(), r.child().child()))
logicalResultSink(
logicalTopN(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> t.getOrderKeys().stream()
.map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
).then(r -> deferMaterialize(r, r.child(),
Optional.empty(), Optional.empty(), r.child().child()))
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalTopN(logicalFilter(logicalOlapScan())))
.when(r -> r.child().getLimit() < getTopNOptLimitThreshold())
.whenNot(r -> r.child().getOrderKeys().isEmpty())
.when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
.when(r -> r.child().child().child().getTable().getEnableLightSchemaChange())
.when(r -> r.child().child().child().getTable().isDupKeysOrMergeOnWrite())
.then(r -> {
LogicalFilter<LogicalOlapScan> filter = r.child().child();
return deferMaterialize(r, r.child(), Optional.of(filter), filter.child());
})
logicalResultSink(
logicalTopN(
logicalFilter(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> t.getOrderKeys().stream()
.map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
).then(r -> {
LogicalFilter<LogicalOlapScan> filter = r.child().child();
return deferMaterialize(r, r.child(), Optional.empty(),
Optional.of(filter), filter.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(
logicalTopN(
logicalProject(
logicalFilter(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
SlotReference slotRef = (SlotReference) orderKey.getExpr();
// do not support alias in project now
if (!t.child().getProjects().contains(slotRef)) {
return false;
}
}
return true;
})
).then(r -> {
LogicalProject<LogicalFilter<LogicalOlapScan>> project = r.child().child();
LogicalFilter<LogicalOlapScan> filter = project.child();
return deferMaterialize(r, r.child(), Optional.of(project),
Optional.of(filter), filter.child());
})
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalProject(
logicalTopN(
logicalProject(logicalFilter(
logicalOlapScan()
.when(s -> s.getTable().getEnableLightSchemaChange())
.when(s -> s.getTable().isDupKeysOrMergeOnWrite())
)
)
).when(t -> t.getLimit() < getTopNOptLimitThreshold())
.whenNot(t -> t.getOrderKeys().isEmpty())
.when(t -> {
for (OrderKey orderKey : t.getOrderKeys()) {
if (!orderKey.getExpr().isColumnFromTable()) {
return false;
}
if (!(orderKey.getExpr() instanceof SlotReference)) {
return false;
}
SlotReference slotRef = (SlotReference) orderKey.getExpr();
// do not support alias in project now
if (!t.child().getProjects().contains(slotRef)) {
return false;
}
}
return true;
})
)).then(r -> {
LogicalProject<?> upperProject = r.child();
LogicalProject<LogicalFilter<LogicalOlapScan>> bottomProject = r.child().child().child();
List<NamedExpression> projections = upperProject.mergeProjections(bottomProject);
LogicalProject<?> project = upperProject.withProjects(projections);
LogicalFilter<LogicalOlapScan> filter = bottomProject.child();
return deferMaterialize(r, r.child().child(), Optional.of(project),
Optional.of(filter), filter.child());
})
)
);
}

private Plan deferMaterialize(LogicalResultSink<? extends Plan> logicalResultSink,
LogicalTopN<? extends Plan> logicalTopN, Optional<LogicalFilter<? extends Plan>> logicalFilter,
LogicalOlapScan logicalOlapScan) {
LogicalTopN<? extends Plan> logicalTopN, Optional<LogicalProject<? extends Plan>> logicalProject,
Optional<LogicalFilter<? extends Plan>> logicalFilter, LogicalOlapScan logicalOlapScan) {
Column rowId = new Column(Column.ROWID_COL, Type.STRING, false, null, false, "", "rowid column");
SlotReference columnId = SlotReference.fromColumn(
logicalOlapScan.getTable(), rowId, logicalOlapScan.getQualifier());
Set<Slot> orderKeys = Sets.newHashSet();
Set<ExprId> deferredMaterializedExprIds = Sets.newHashSet(logicalOlapScan.getOutputExprIdSet());
logicalFilter.ifPresent(filter -> filter.getConjuncts()
.forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds())));
logicalTopN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.peek(orderKeys::add)
.map(NamedExpression::getExprId)
.filter(Objects::nonNull)
.forEach(deferredMaterializedExprIds::remove);
if (logicalProject.isPresent()) {
deferredMaterializedExprIds.retainAll(logicalProject.get().getInputSlots().stream()
.map(NamedExpression::getExprId).collect(Collectors.toSet()));
}
LogicalDeferMaterializeOlapScan deferOlapScan = new LogicalDeferMaterializeOlapScan(
logicalOlapScan, deferredMaterializedExprIds, columnId);
Plan root = logicalFilter.map(f -> f.withChildren(deferOlapScan)).orElse(deferOlapScan);
Set<Slot> inputSlots = Sets.newHashSet();
logicalFilter.ifPresent(filter -> inputSlots.addAll(filter.getInputSlots()));
if (logicalProject.isPresent()) {
ImmutableList.Builder<NamedExpression> requiredSlots = ImmutableList.builder();
inputSlots.addAll(logicalProject.get().getInputSlots());
for (Slot output : root.getOutput()) {
if (inputSlots.contains(output) || orderKeys.contains(output)) {
requiredSlots.add(output);
}
}
requiredSlots.add(columnId);
root = new LogicalProject<>(requiredSlots.build(), root);
}
root = new LogicalDeferMaterializeTopN<>((LogicalTopN<? extends Plan>) logicalTopN.withChildren(root),
deferredMaterializedExprIds, columnId);
if (logicalProject.isPresent()) {
root = logicalProject.get().withChildren(root);
}
root = logicalResultSink.withChildren(root);
return new LogicalDeferMaterializeResultSink<>((LogicalResultSink<? extends Plan>) root,
logicalOlapScan.getTable(), logicalOlapScan.getSelectedIndexId());
Expand Down
Loading
Loading