Skip to content

Commit

Permalink
[fix](mtmv) Fix partition mv rewrite result wrong (#35236)
Browse files Browse the repository at this point in the history
this is brought by #33800
if mv is partitioned materialzied view,
the data will be wrong by using the hited materialized view when the
paritions in related base partiton table are deleted, created and so on.
this fix the problem.

if **SET enable_materialized_view_union_rewrite=true;** this will use
the materializd view and make sure the data is corrent
if **SET enable_materialized_view_union_rewrite=false;** this will query
base table directly to make sure the data is right
  • Loading branch information
seawinde authored May 29, 2024
1 parent fae2b4b commit 0b5dd11
Show file tree
Hide file tree
Showing 8 changed files with 714 additions and 251 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.exploration.mv.mapping.EquivalenceClassSetMapping;
import org.apache.doris.nereids.rules.exploration.mv.mapping.Mapping.MappedSlot;
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
Expand All @@ -34,17 +27,13 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.Utils;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -231,47 +220,6 @@ public String toString() {
return Utils.toSqlString("Predicates", "pulledUpPredicates", pulledUpPredicates);
}

/** Construct filter by partition
* @param partitions this is the partition which filter should be constructed from
* @param queryToViewSlotMapping construct filter on slot, the slot belong the slotmapping
* */
public static Map<TableIf, Set<Expression>> constructFilterByPartitions(
Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> partitions,
SlotMapping queryToViewSlotMapping) throws AnalysisException {
Map<TableIf, Set<Expression>> constructedFilterMap = new HashMap<>();
for (Map.Entry<Pair<MTMVPartitionInfo, PartitionInfo>, Collection<Partition>> entry :
partitions.asMap().entrySet()) {
// Get the base table partition column mv related
String relatedCol = entry.getKey().key().getRelatedCol();
TableIf relatedTableInfo = entry.getKey().key().getRelatedTable();
// Find the query slot which mv partition col mapped to
Optional<MappedSlot> partitionSlotQueryUsed = queryToViewSlotMapping.getRelationSlotMap()
.keySet()
.stream()
.filter(mappedSlot -> mappedSlot.getSlot().isColumnFromTable()
&& mappedSlot.getSlot().getName().equals(relatedCol)
&& mappedSlot.getBelongedRelation() != null
&& mappedSlot.getBelongedRelation().getTable().getId() == relatedTableInfo.getId())
.findFirst();
if (!partitionSlotQueryUsed.isPresent()) {
return ImmutableMap.of();
}
// Constructed filter which should add on the query base table,
// after supported data roll up this method should keep logic consistency to partition mapping
Set<Expression> partitionExpressions = UpdateMvByPartitionCommand.constructPredicates(
// get mv partition items
entry.getValue().stream()
.map(partition -> entry.getKey().value().getItem(partition.getId()))
.collect(Collectors.toSet()),
partitionSlotQueryUsed.get().getSlot());
// Put partition expressions on query base table
constructedFilterMap.computeIfPresent(relatedTableInfo,
(key, existExpressions) -> Sets.union(existExpressions, partitionExpressions));
constructedFilterMap.computeIfAbsent(relatedTableInfo, key -> partitionExpressions);
}
return constructedFilterMap;
}

/**
* The split different representation for predicate expression, such as equal, range and residual predicate.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph;
Expand All @@ -45,6 +45,7 @@
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.Join;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAddContext;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
Expand All @@ -65,6 +66,7 @@

import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -668,43 +670,49 @@ private Boolean doVisit(Plan plan, PlanCheckContext checkContext) {
}

/**
* Add predicates on base table when materialized view scan contains invalid partitions
* Add or remove partition on base table and mv when materialized view scan contains invalid partitions
*/
public static class InvalidPartitionRemover extends DefaultPlanRewriter<Pair<List<String>, Set<Long>>> {
// materialized view scan is always LogicalOlapScan, so just handle LogicalOlapScan
public static class PartitionRemover extends DefaultPlanRewriter<Map<BaseTableInfo, Set<String>>> {
@Override
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Pair<List<String>, Set<Long>> context) {
if (olapScan.getTable().getFullQualifiers().equals(context.key())) {
List<Long> selectedPartitionIds = olapScan.getSelectedPartitionIds();
return olapScan.withSelectedPartitionIds(selectedPartitionIds.stream()
.filter(partitionId -> !context.value().contains(partitionId))
.collect(Collectors.toList()));
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan,
Map<BaseTableInfo, Set<String>> context) {
// todo Support other partition table
BaseTableInfo tableInfo = new BaseTableInfo(olapScan.getTable());
if (!context.containsKey(tableInfo)) {
return olapScan;
}
return olapScan;
Set<String> targetPartitionNameSet = context.get(tableInfo);
List<Long> selectedPartitionIds = new ArrayList<>(olapScan.getSelectedPartitionIds());
// need remove partition
selectedPartitionIds = selectedPartitionIds.stream()
.filter(partitionId -> !targetPartitionNameSet.contains(
olapScan.getTable().getPartition(partitionId).getName()))
.collect(Collectors.toList());
return olapScan.withSelectedPartitionIds(selectedPartitionIds);
}
}

/**
* Collect partitions which scan used according to given table
* Collect partitions on base table
*/
public static class QueryScanPartitionsCollector extends DefaultPlanVisitor<Plan, Map<Long, Set<PartitionItem>>> {
public static class QueryScanPartitionsCollector extends DefaultPlanVisitor<Plan,
Map<BaseTableInfo, Set<Partition>>> {
@Override
public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
Map<Long, Set<PartitionItem>> context) {
Map<BaseTableInfo, Set<Partition>> targetTablePartitionMap) {
TableIf table = catalogRelation.getTable();
if (!context.containsKey(table.getId())) {
BaseTableInfo relatedPartitionTable = new BaseTableInfo(table);
if (!targetTablePartitionMap.containsKey(relatedPartitionTable)) {
return catalogRelation;
}
// Only support check olap partition currently
// todo Support other type partition table
if (catalogRelation instanceof LogicalOlapScan) {
LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation;
PartitionInfo partitionInfo = logicalOlapScan.getTable().getPartitionInfo();
logicalOlapScan.getSelectedPartitionIds().stream()
.map(partitionInfo::getItem)
.forEach(partitionItem -> context.computeIfPresent(table.getId(), (key, oldValue) -> {
oldValue.add(partitionItem);
return oldValue;
}));
for (Long partitionId : logicalOlapScan.getSelectedPartitionIds()) {
Set<Partition> partitions = targetTablePartitionMap.computeIfAbsent(relatedPartitionTable,
key -> new HashSet<>());
partitions.add(logicalOlapScan.getTable().getPartition(partitionId));
}
}
return catalogRelation;
}
Expand All @@ -713,10 +721,16 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
/**
* Add filter on table scan according to table filter map
*/
public static Plan addFilterOnTableScan(Plan queryPlan, Map<TableIf, Set<Expression>> filterOnOriginPlan,
public static Plan addFilterOnTableScan(Plan queryPlan, Map<BaseTableInfo, Set<String>> partitionOnOriginPlan,
String partitionColumn,
CascadesContext parentCascadesContext) {
// Firstly, construct filter form invalid partition, this filter should be added on origin plan
Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(), filterOnOriginPlan);
PredicateAddContext predicateAddContext = new PredicateAddContext(partitionOnOriginPlan, partitionColumn);
Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(),
predicateAddContext);
if (!predicateAddContext.isAddSuccess()) {
return null;
}
// Deep copy the plan to avoid the plan output is the same with the later union output, this may cause
// exec by mistake
queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy(
Expand Down
Loading

0 comments on commit 0b5dd11

Please sign in to comment.