Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
import org.apache.doris.nereids.rules.expression.ExpressionRewrite;
import org.apache.doris.nereids.rules.expression.HighPriorityColumnCollector;
import org.apache.doris.nereids.rules.expression.QueryColumnCollector;
import org.apache.doris.nereids.rules.rewrite.AddDefaultLimit;
import org.apache.doris.nereids.rules.rewrite.AdjustConjunctsReturnType;
import org.apache.doris.nereids.rules.rewrite.AdjustNullable;
Expand Down Expand Up @@ -455,7 +455,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
new CollectProjectAboveConsumer()
)
),
topic("Collect used column", custom(RuleType.COLLECT_COLUMNS, HighPriorityColumnCollector::new))
topic("Collect used column", custom(RuleType.COLLECT_COLUMNS, QueryColumnCollector::new))
);

private static final List<RewriteJob> WHOLE_TREE_REWRITE_JOBS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.rules.expression.HighPriorityColumnCollector.CollectorContext;
import org.apache.doris.nereids.rules.expression.QueryColumnCollector.CollectorContext;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
Expand Down Expand Up @@ -51,9 +51,9 @@
import java.util.stream.Collectors;

/**
* Used to collect High priority column.
* Used to collect query column.
*/
public class HighPriorityColumnCollector extends DefaultPlanRewriter<CollectorContext> implements CustomRewriter {
public class QueryColumnCollector extends DefaultPlanRewriter<CollectorContext> implements CustomRewriter {

@Override
public Plan rewriteRoot(Plan plan, JobContext jobContext) {
Expand All @@ -64,10 +64,10 @@ public Plan rewriteRoot(Plan plan, JobContext jobContext) {
CollectorContext context = new CollectorContext();
plan.accept(this, context);
if (StatisticsUtil.enableAutoAnalyze()) {
context.queried.removeAll(context.usedInPredicate);
context.midPriority.removeAll(context.highPriority);
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.updateColumnUsedInPredicate(context.usedInPredicate);
analysisManager.updateQueriedColumn(context.queried);
analysisManager.updateHighPriorityColumn(context.highPriority);
analysisManager.updateMidPriorityColumn(context.midPriority);
}
return plan;
}
Expand All @@ -78,9 +78,9 @@ public Plan rewriteRoot(Plan plan, JobContext jobContext) {
public static class CollectorContext {
public Map<Slot/*project output column*/, NamedExpression/*Actual project expr*/> projects = new HashMap<>();

public Set<Slot> usedInPredicate = new HashSet<>();
public Set<Slot> highPriority = new HashSet<>();

public Set<Slot> queried = new HashSet<>();
public Set<Slot> midPriority = new HashSet<>();
}

@Override
Expand All @@ -103,7 +103,7 @@ public Plan visitLogicalProject(LogicalProject<? extends Plan> project, Collecto
List<Slot> outputOfScan = scan.getOutput();
for (Slot slot : outputOfScan) {
if (!allUsed.contains(slot)) {
context.queried.remove(slot);
context.midPriority.remove(slot);
}
}
}
Expand All @@ -114,7 +114,7 @@ public Plan visitLogicalProject(LogicalProject<? extends Plan> project, Collecto
public Plan visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join, CollectorContext context) {
join.child(0).accept(this, context);
join.child(1).accept(this, context);
context.usedInPredicate.addAll(
context.highPriority.addAll(
(join.isMarkJoin() ? join.getLeftConditionSlot() : join.getConditionSlot())
.stream().flatMap(s -> backtrace(s, context).stream())
.collect(Collectors.toSet())
Expand All @@ -125,7 +125,7 @@ public Plan visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join, C
@Override
public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> aggregate, CollectorContext context) {
aggregate.child(0).accept(this, context);
context.usedInPredicate.addAll(aggregate.getGroupByExpressions()
context.highPriority.addAll(aggregate.getGroupByExpressions()
.stream()
.flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof SlotReference).stream())
.flatMap(s -> backtrace(s, context).stream())
Expand All @@ -136,7 +136,7 @@ public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan> aggregate, Co
@Override
public Plan visitLogicalHaving(LogicalHaving<? extends Plan> having, CollectorContext context) {
having.child(0).accept(this, context);
context.usedInPredicate.addAll(
context.highPriority.addAll(
having.getExpressions().stream()
.flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof SlotReference).stream())
.flatMap(s -> backtrace(s, context).stream())
Expand All @@ -147,21 +147,21 @@ public Plan visitLogicalHaving(LogicalHaving<? extends Plan> having, CollectorCo
@Override
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, CollectorContext context) {
List<Slot> slots = olapScan.getOutput();
context.queried.addAll(slots);
context.midPriority.addAll(slots);
return olapScan;
}

@Override
public Plan visitLogicalFileScan(LogicalFileScan fileScan, CollectorContext context) {
List<Slot> slots = fileScan.getOutput();
context.queried.addAll(slots);
context.midPriority.addAll(slots);
return fileScan;
}

@Override
public Plan visitLogicalFilter(LogicalFilter<? extends Plan> filter, CollectorContext context) {
filter.child(0).accept(this, context);
context.usedInPredicate.addAll(filter
context.highPriority.addAll(filter
.getExpressions()
.stream()
.flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof SlotReference).stream())
Expand All @@ -173,7 +173,7 @@ public Plan visitLogicalFilter(LogicalFilter<? extends Plan> filter, CollectorCo
@Override
public Plan visitLogicalWindow(LogicalWindow<? extends Plan> window, CollectorContext context) {
window.child(0).accept(this, context);
context.usedInPredicate.addAll(window
context.highPriority.addAll(window
.getWindowExpressions()
.stream()
.flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof SlotReference).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public class AnalysisManager implements Writable {

private static final Logger LOG = LogManager.getLogger(AnalysisManager.class);

private static final int COLUMN_QUEUE_SIZE = 1000;
public static final int COLUMN_QUEUE_SIZE = 1000;
public final Queue<QueryColumn> highPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
public final Queue<QueryColumn> midPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
public final Map<TableName, Set<String>> highPriorityJobs = new LinkedHashMap<>();
Expand Down Expand Up @@ -1152,11 +1152,11 @@ public boolean canSample(TableIf table) {
}


public void updateColumnUsedInPredicate(Set<Slot> slotReferences) {
public void updateHighPriorityColumn(Set<Slot> slotReferences) {
updateColumn(slotReferences, highPriorityColumns);
}

public void updateQueriedColumn(Collection<Slot> slotReferences) {
public void updateMidPriorityColumn(Collection<Slot> slotReferences) {
updateColumn(slotReferences, midPriorityColumns);
}

Expand Down