Skip to content

Commit

Permalink
[Fix](variant) filter with variant access may lead to to parition/tab…
Browse files Browse the repository at this point in the history
…let prune fall through (apache#32560)

Query like `select * from ut_p partitions(p2) where cast(var['a'] as int)  > 0` will fall through parition/tablet prunning since it's plan like
```
mysql> explain analyzed plan select * from ut_p where id = 3 and cast(var['a'] as int) = 789;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Explain String(Nereids Planner)                                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| LogicalResultSink[26] ( outputExprs=[id#0, var#1] )                                                                                                                        |
| +--LogicalProject[25] ( distinct=false, projects=[id#0, var#1], excepts=[] )                                                                                               |
|    +--LogicalFilter[24] ( predicates=((cast(var#4 as INT) = 789) AND (id#0 = 3)) )                                                                                         |
|       +--LogicalFilter[23] ( predicates=(0 = __DORIS_DELETE_SIGN__#2) )                                                                                                    |
|          +--LogicalProject[22] ( distinct=false, projects=[id#0, var#1, __DORIS_DELETE_SIGN__#2, __DORIS_VERSION_COL__#3, element_at(var#1, 'a') AS `var`apache#4], excepts=[] ) |
|             +--LogicalOlapScan ( qualified=regression_test_variant_p0.ut_p, indexName=<index_not_selected>, selectedIndexId=10145, preAgg=ON )                             |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
6 rows in set (0.01 sec)
```
with an extra LogicalProject on top of LogicalOlapScan, so we should handle such case to prune parition/tablet
  • Loading branch information
eldenmoon committed Mar 22, 2024
1 parent 2c3369a commit f6403f9
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,14 @@ public enum RuleType {
MATERIALIZED_INDEX_FILTER_PROJECT_SCAN(RuleTypeClass.REWRITE),

OLAP_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),

OLAP_SCAN_WITH_PROJECT_PARTITION_PRUNE(RuleTypeClass.REWRITE),
FILE_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_JDBC_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_ODBC_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_ES_SCAN(RuleTypeClass.REWRITE),
OLAP_SCAN_TABLET_PRUNE(RuleTypeClass.REWRITE),
OLAP_SCAN_WITH_PROJECT_TABLET_PRUNE(RuleTypeClass.REWRITE),
PUSH_AGGREGATE_TO_OLAP_SCAN(RuleTypeClass.REWRITE),
EXTRACT_SINGLE_TABLE_EXPRESSION_FROM_DISJUNCTION(RuleTypeClass.REWRITE),
HIDE_ONE_ROW_RELATION_UNDER_UNION(RuleTypeClass.REWRITE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.expression.rules.PartitionPruner;
import org.apache.doris.nereids.rules.expression.rules.PartitionPruner.PartitionTableType;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;

import java.util.ArrayList;
Expand All @@ -45,59 +49,75 @@
* Used to prune partition of olap scan, should execute after SwapProjectAndFilter, MergeConsecutiveFilters,
* MergeConsecutiveProjects and all predicate push down related rules.
*/
public class PruneOlapScanPartition extends OneRewriteRuleFactory {
public class PruneOlapScanPartition implements RewriteRuleFactory {
private <T extends Plan> Plan prunePartitions(CascadesContext ctx,
LogicalOlapScan scan, LogicalFilter<T> originalFilter) {
OlapTable table = scan.getTable();
Set<String> partitionColumnNameSet = Utils.execWithReturnVal(table::getPartitionColumnNames);
if (partitionColumnNameSet.isEmpty()) {
return originalFilter;
}

@Override
public Rule build() {
return logicalFilter(logicalOlapScan()).when(p -> !p.child().isPartitionPruned()).thenApply(ctx -> {
LogicalFilter<LogicalOlapScan> filter = ctx.root;
LogicalOlapScan scan = filter.child();
OlapTable table = scan.getTable();
Set<String> partitionColumnNameSet = Utils.execWithReturnVal(table::getPartitionColumnNames);
if (partitionColumnNameSet.isEmpty()) {
return filter;
}
List<Slot> output = scan.getOutput();
Map<String, Slot> scanOutput = Maps.newHashMapWithExpectedSize(output.size() * 2);
for (Slot slot : output) {
scanOutput.put(slot.getName().toLowerCase(), slot);
}

List<Slot> output = scan.getOutput();
Map<String, Slot> scanOutput = Maps.newHashMapWithExpectedSize(output.size() * 2);
for (Slot slot : output) {
scanOutput.put(slot.getName().toLowerCase(), slot);
PartitionInfo partitionInfo = table.getPartitionInfo();
List<Column> partitionColumns = partitionInfo.getPartitionColumns();
List<Slot> partitionSlots = new ArrayList<>(partitionColumns.size());
for (Column column : partitionColumns) {
Slot slot = scanOutput.get(column.getName().toLowerCase());
if (slot == null) {
return originalFilter;
} else {
partitionSlots.add(slot);
}
}

PartitionInfo partitionInfo = table.getPartitionInfo();
List<Column> partitionColumns = partitionInfo.getPartitionColumns();
List<Slot> partitionSlots = new ArrayList<>(partitionColumns.size());
for (Column column : partitionColumns) {
Slot slot = scanOutput.get(column.getName().toLowerCase());
if (slot == null) {
return filter;
} else {
partitionSlots.add(slot);
}
}
List<Long> manuallySpecifiedPartitions = scan.getManuallySpecifiedPartitions();

List<Long> manuallySpecifiedPartitions = scan.getManuallySpecifiedPartitions();
Map<Long, PartitionItem> idToPartitions;
if (manuallySpecifiedPartitions.isEmpty()) {
idToPartitions = partitionInfo.getIdToItem(false);
} else {
Map<Long, PartitionItem> allPartitions = partitionInfo.getAllPartitions();
idToPartitions = allPartitions.keySet().stream()
.filter(id -> manuallySpecifiedPartitions.contains(id))
.collect(Collectors.toMap(Function.identity(), id -> allPartitions.get(id)));
}
List<Long> prunedPartitions = PartitionPruner.prune(
partitionSlots, originalFilter.getPredicate(), idToPartitions, ctx,
PartitionTableType.OLAP);
if (prunedPartitions.isEmpty()) {
return new LogicalEmptyRelation(
ConnectContext.get().getStatementContext().getNextRelationId(),
originalFilter.getOutput());
}

Map<Long, PartitionItem> idToPartitions;
if (manuallySpecifiedPartitions.isEmpty()) {
idToPartitions = partitionInfo.getIdToItem(false);
} else {
Map<Long, PartitionItem> allPartitions = partitionInfo.getAllPartitions();
idToPartitions = allPartitions.keySet().stream()
.filter(id -> manuallySpecifiedPartitions.contains(id))
.collect(Collectors.toMap(Function.identity(), id -> allPartitions.get(id)));
}
List<Long> prunedPartitions = PartitionPruner.prune(
partitionSlots, filter.getPredicate(), idToPartitions, ctx.cascadesContext,
PartitionTableType.OLAP);
LogicalOlapScan rewrittenScan = scan.withSelectedPartitionIds(prunedPartitions);
if (originalFilter.child() instanceof LogicalProject) {
LogicalProject<LogicalOlapScan> rewrittenProject
= (LogicalProject<LogicalOlapScan>) originalFilter.child()
.withChildren(ImmutableList.of(rewrittenScan));
return new LogicalFilter<>(originalFilter.getConjuncts(), rewrittenProject);
}
return originalFilter.withChildren(ImmutableList.of(rewrittenScan));
}

if (prunedPartitions.isEmpty()) {
return new LogicalEmptyRelation(
ConnectContext.get().getStatementContext().getNextRelationId(),
filter.getOutput());
}
LogicalOlapScan rewrittenScan = scan.withSelectedPartitionIds(prunedPartitions);
return new LogicalFilter<>(filter.getConjuncts(), rewrittenScan);
}).toRule(RuleType.OLAP_SCAN_PARTITION_PRUNE);
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
logicalFilter(logicalOlapScan()).when(p -> !p.child().isPartitionPruned()).thenApply(ctx -> {
return prunePartitions(ctx.cascadesContext, ctx.root.child(), ctx.root);
}).toRule(RuleType.OLAP_SCAN_PARTITION_PRUNE),

logicalFilter(logicalProject(logicalOlapScan()))
.when(p -> !p.child().child().isPartitionPruned())
.when(p -> p.child().hasPushedDownToProjectionFunctions()).thenApply(ctx -> {
return prunePartitions(ctx.cascadesContext, ctx.root.child().child(), ctx.root);
}).toRule(RuleType.OLAP_SCAN_WITH_PROJECT_PARTITION_PRUNE)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionColumnFilterConverter;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.planner.HashDistributionPruner;
import org.apache.doris.planner.PartitionColumnFilter;
Expand All @@ -46,34 +49,50 @@
/**
* prune bucket
*/
public class PruneOlapScanTablet extends OneRewriteRuleFactory {

public class PruneOlapScanTablet implements RewriteRuleFactory {
@Override
public Rule build() {
return logicalFilter(logicalOlapScan())
.then(filter -> {
LogicalOlapScan olapScan = filter.child();
OlapTable table = olapScan.getTable();
Builder<Long> selectedTabletIdsBuilder = ImmutableList.builder();
if (olapScan.getSelectedTabletIds().isEmpty()) {
for (Long id : olapScan.getSelectedPartitionIds()) {
Partition partition = table.getPartition(id);
MaterializedIndex index = partition.getIndex(olapScan.getSelectedIndexId());
selectedTabletIdsBuilder
.addAll(getSelectedTabletIds(filter.getConjuncts(), index,
olapScan.getSelectedIndexId() == olapScan.getTable()
.getBaseIndexId(),
partition.getDistributionInfo()));
}
} else {
selectedTabletIdsBuilder.addAll(olapScan.getSelectedTabletIds());
}
List<Long> selectedTabletIds = selectedTabletIdsBuilder.build();
if (new HashSet(selectedTabletIds).equals(new HashSet(olapScan.getSelectedTabletIds()))) {
return null;
}
return filter.withChildren(olapScan.withSelectedTabletIds(selectedTabletIds));
}).toRule(RuleType.OLAP_SCAN_TABLET_PRUNE);
public List<Rule> buildRules() {
return ImmutableList.of(
logicalFilter(logicalOlapScan())
.then(filter -> {
return pruneTablets(filter.child(), filter);
}).toRule(RuleType.OLAP_SCAN_TABLET_PRUNE),

logicalFilter(logicalProject(logicalOlapScan()))
.when(p -> p.child().hasPushedDownToProjectionFunctions()).then(filter -> {
return pruneTablets(filter.child().child(), filter);
}).toRule(RuleType.OLAP_SCAN_WITH_PROJECT_TABLET_PRUNE)
);
}

private <T extends Plan> Plan pruneTablets(LogicalOlapScan olapScan, LogicalFilter<T> originalFilter) {
OlapTable table = olapScan.getTable();
Builder<Long> selectedTabletIdsBuilder = ImmutableList.builder();
if (olapScan.getSelectedTabletIds().isEmpty()) {
for (Long id : olapScan.getSelectedPartitionIds()) {
Partition partition = table.getPartition(id);
MaterializedIndex index = partition.getIndex(olapScan.getSelectedIndexId());
selectedTabletIdsBuilder
.addAll(getSelectedTabletIds(originalFilter.getConjuncts(), index,
olapScan.getSelectedIndexId() == olapScan.getTable()
.getBaseIndexId(),
partition.getDistributionInfo()));
}
} else {
selectedTabletIdsBuilder.addAll(olapScan.getSelectedTabletIds());
}
List<Long> selectedTabletIds = selectedTabletIdsBuilder.build();
if (new HashSet(selectedTabletIds).equals(new HashSet(olapScan.getSelectedTabletIds()))) {
return null;
}
LogicalOlapScan rewrittenScan = olapScan.withSelectedTabletIds(selectedTabletIds);
if (originalFilter.child() instanceof LogicalProject) {
LogicalProject<LogicalOlapScan> rewrittenProject
= (LogicalProject<LogicalOlapScan>) originalFilter.child()
.withChildren(ImmutableList.of(rewrittenScan));
return new LogicalFilter<>(originalFilter.getConjuncts(), rewrittenProject);
}
return originalFilter.withChildren(rewrittenScan);
}

private Collection<Long> getSelectedTabletIds(Set<Expression> expressions,
Expand Down
29 changes: 29 additions & 0 deletions regression-test/data/variant_p0/select_partition.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 {"a":1}

-- !sql --
7 {"a":2}

-- !sql --
1 {"a":1}
7 {"a":2}

-- !sql --
7 {"a":2}

-- !sql --
16 {"a":3}

-- !sql --
16 {"a":3}

-- !sql --
16 {"a":3}

-- !sql --
1 {"a":1}

-- !sql --
6 {"x":"123"}

Loading

0 comments on commit f6403f9

Please sign in to comment.