From 8096753367fa52d9e853e7754dd0ce8669d4bfd8 Mon Sep 17 00:00:00 2001 From: seawinde <149132972+seawinde@users.noreply.github.com> Date: Sun, 21 Apr 2024 13:19:07 +0800 Subject: [PATCH] [improvement](mtmv) Support union rewrite when the materialized view is not enough to provide all the data for the query (#33800) When the materialized view is not enough to provide all the data for the query, if the materialized view is increment update by partition. we can union materialized view and origin query to reponse the query. this depends on https://github.com/apache/doris/pull/33362 such as materialized view def is as following: > CREATE MATERIALIZED VIEW mv_10086 > BUILD IMMEDIATE REFRESH AUTO ON MANUAL > partition by(l_shipdate) > DISTRIBUTED BY RANDOM BUCKETS 2 > PROPERTIES ('replication_num' = '1') > AS > select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total > from lineitem > left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate > group by > l_shipdate, > o_orderdate, > l_partkey, > l_suppkey; the materialized view data is as following: +------------+-------------+-----------+-----------+-----------+ | l_shipdate | o_orderdate | l_partkey | l_suppkey | sum_total | +------------+-------------+-----------+-----------+-----------+ | 2023-10-18 | 2023-10-18 | 2 | 3 | 109.20 | | 2023-10-17 | 2023-10-17 | 2 | 3 | 99.50 | | 2023-10-19 | 2023-10-19 | 2 | 3 | 99.50 | +------------+-------------+-----------+-----------+-----------+ when we insert data to partition `2023-10-17`, if we run query as following ``` select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total from lineitem left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate group by l_shipdate, o_orderdate, l_partkey, l_suppkey; ``` query rewrite by materialzied view will fail with message `Check partition query used validation fail` if we turn on the switch `SET enable_materialized_view_union_rewrite = true;` default true we run the query above again, it will success and will use union all materialized view and origin query to response the query correctly. the plan is as following: ``` | Explain String(Nereids Planner) | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS: | | l_shipdate[#52] | | o_orderdate[#53] | | l_partkey[#54] | | l_suppkey[#55] | | sum_total[#56] | | PARTITION: UNPARTITIONED | | | | HAS_COLO_PLAN_NODE: false | | | | VRESULT SINK | | MYSQL_PROTOCAL | | | | 11:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 1 | | | | PARTITION: HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 11 | | UNPARTITIONED | | | | 10:VUNION(756) | | | | | |----9:VAGGREGATE (merge finalize)(753) | | | | output: sum(partial_sum(o_totalprice)[#46])[#51] | | | | group by: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | cardinality=2 | | | | distribute expr lists: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | | | | 8:VEXCHANGE | | | offset: 0 | | | distribute expr lists: l_shipdate[#42] | | | | | 1:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 2 | | | | PARTITION: HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 08 | | HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | 7:VAGGREGATE (update serialize)(747) | | | STREAMING | | | output: partial_sum(o_totalprice[#41])[#46] | | | group by: l_shipdate[#37], o_orderdate[#38], l_partkey[#39], l_suppkey[#40] | | | cardinality=2 | | | distribute expr lists: l_shipdate[#37] | | | | | 6:VHASH JOIN(741) | | | join op: RIGHT OUTER JOIN(PARTITIONED)[] | | | equal join conjunct: (o_orderkey[#21] = l_orderkey[#5]) | | | equal join conjunct: (o_orderdate[#25] = l_shipdate[#15]) | | | runtime filters: RF000[min_max] <- l_orderkey[#5](2/2/2048), RF001[bloom] <- l_orderkey[#5](2/2/2048), RF002[min_max] <- l_shipdate[#15](1/1/2048), RF003[bloom] <- l_shipdate[#15](1/1/2048) | | | cardinality=2 | | | vec output tuple id: 4 | | | output tuple id: 4 | | | vIntermediate tuple ids: 3 | | | hash output slot ids: 6 7 24 25 15 | | | final projections: l_shipdate[#36], o_orderdate[#32], l_partkey[#34], l_suppkey[#35], o_totalprice[#31] | | | final project output tuple id: 4 | | | distribute expr lists: o_orderkey[#21], o_orderdate[#25] | | | distribute expr lists: l_orderkey[#5], l_shipdate[#15] | | | | | |----3:VEXCHANGE | | | offset: 0 | | | distribute expr lists: l_orderkey[#5] | | | | | 5:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 3 | | | | PARTITION: RANDOM | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 05 | | HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25] | | | | 4:VOlapScanNode(722) | | TABLE: union_db.orders(orders), PREAGGREGATION: ON | | runtime filters: RF000[min_max] -> o_orderkey[#21], RF001[bloom] -> o_orderkey[#21], RF002[min_max] -> o_orderdate[#25], RF003[bloom] -> o_orderdate[#25] | | partitions=3/3 (p_20231017,p_20231018,p_20231019), tablets=9/9, tabletList=161188,161190,161192 ... | | cardinality=3, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | PLAN FRAGMENT 4 | | | | PARTITION: HASH_PARTITIONED: l_orderkey[#5] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | HASH_PARTITIONED: l_orderkey[#5], l_shipdate[#15] | | | | 2:VOlapScanNode(729) | | TABLE: union_db.lineitem(lineitem), PREAGGREGATION: ON | | PREDICATES: (l_shipdate[#15] >= '2023-10-17') AND (l_shipdate[#15] < '2023-10-18') | | partitions=1/3 (p_20231017), tablets=3/3, tabletList=161223,161225,161227 | | cardinality=2, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | PLAN FRAGMENT 5 | | | | PARTITION: RANDOM | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | RANDOM | | | | 0:VOlapScanNode(718) | | TABLE: union_db.mv_10086(mv_10086), PREAGGREGATION: ON | | partitions=2/3 (p_20231018_20231019,p_20231019_20231020), tablets=4/4, tabletList=161251,161253,161265 ... | | cardinality=2, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | MaterializedView | | MaterializedViewRewriteSuccessAndChose: | | Names: mv_10086 | | MaterializedViewRewriteSuccessButNotChose: | | | | MaterializedViewRewriteFail: | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ``` --- .../java/org/apache/doris/mtmv/MTMVCache.java | 48 +- .../post/CommonSubExpressionOpt.java | 5 +- .../mv/AbstractMaterializedViewRule.java | 218 +++-- .../exploration/mv/MaterializedViewUtils.java | 64 +- .../rules/exploration/mv/Predicates.java | 52 ++ .../rules/exploration/mv/StructInfo.java | 70 ++ .../nereids/rules/rewrite/EliminateSort.java | 4 +- .../commands/UpdateMvByPartitionCommand.java | 32 +- .../visitor/ExpressionLineageReplacer.java | 6 +- .../org/apache/doris/qe/SessionVariable.java | 14 + .../mv/partition_mv_rewrite.out | 29 + .../mv/dimension/dimension_self_conn.groovy | 564 ++++++++++++ .../mv/nested_mtmv/nested_mtmv.groovy | 859 ++++++++++++++++++ .../mv/partition_mv_rewrite.groovy | 17 + .../partition_curd_union_rewrite.groovy | 238 +++++ .../usercase_union_rewrite.groovy | 174 ++++ 16 files changed, 2282 insertions(+), 112 deletions(-) create mode 100644 regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out create mode 100644 regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy create mode 100644 regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy create mode 100644 regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy create mode 100644 regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java index 9116817834db24..154bd4ec7f17c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java @@ -17,27 +17,25 @@ package org.apache.doris.mtmv; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.MTMV; -import org.apache.doris.catalog.Partition; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; -import org.apache.doris.nereids.analyzer.UnboundResultSink; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.jobs.executor.Rewriter; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils; +import org.apache.doris.nereids.rules.rewrite.EliminateSort; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; -import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; -import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; -import com.google.common.collect.Lists; - -import java.util.stream.Collectors; +import com.google.common.collect.ImmutableList; /** * The cache for materialized view cache @@ -70,27 +68,25 @@ public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) { if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) { mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext); } - unboundMvPlan = unboundMvPlan.accept(new DefaultPlanVisitor() { - // convert to table sink to eliminate sort under table sink, because sort under result sink can not be - // eliminated - @Override - public LogicalPlan visitUnboundResultSink(UnboundResultSink unboundResultSink, - Void context) { - return new UnboundTableSink<>(mtmv.getFullQualifiers(), - mtmv.getBaseSchema().stream().map(Column::getName).collect(Collectors.toList()), - Lists.newArrayList(), - mtmv.getPartitions().stream().map(Partition::getName).collect(Collectors.toList()), - unboundResultSink.child()); - } - }, null); + // Can not convert to table sink, because use the same column from different table when self join + // the out slot is wrong Plan originPlan = planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN); - // eliminate logicalTableSink because sink operator is useless in query rewrite by materialized view - Plan mvPlan = planner.getCascadesContext().getRewritePlan().accept(new DefaultPlanRewriter() { + // Eliminate result sink because sink operator is useless in query rewrite by materialized view + // and the top sort can also be removed + Plan mvPlan = originPlan.accept(new DefaultPlanRewriter() { @Override - public Plan visitLogicalTableSink(LogicalTableSink logicalTableSink, Object context) { - return logicalTableSink.child().accept(this, context); + public Plan visitLogicalResultSink(LogicalResultSink logicalResultSink, Object context) { + return logicalResultSink.child().accept(this, context); } }, null); + // Optimize by rules to remove top sort + CascadesContext parentCascadesContext = CascadesContext.initContext(mvSqlStatementContext, mvPlan, + PhysicalProperties.ANY); + mvPlan = MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> { + Rewriter.getCteChildrenRewriter(childContext, + ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute(); + return childContext.getRewritePlan(); + }, mvPlan, originPlan); return new MTMVCache(mvPlan, originPlan); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java index 44e65dee1c9873..8f558aaba0239b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java @@ -83,8 +83,9 @@ private List> computeMultiLayerProjections( // 'case slot whenClause2 END' // This is illegal. Expression rewritten = expr.accept(ExpressionReplacer.INSTANCE, aliasMap); - Alias alias = new Alias(rewritten); - aliasMap.put(expr, alias); + // if rewritten is already alias, use it directly, because in materialized view rewriting + // Should keep out slot immutably after rewritten successfully + aliasMap.put(expr, rewritten instanceof Alias ? (Alias) rewritten : new Alias(rewritten)); } }); layer.addAll(aliasMap.values()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index b596408851f04d..6bf47f00c359c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -17,27 +17,33 @@ package org.apache.doris.nereids.rules.exploration.mv; +import org.apache.doris.analysis.PartitionKeyDesc; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Pair; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVRewriteUtil; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.executor.Rewriter; +import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory; import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate; +import org.apache.doris.nereids.rules.exploration.mv.StructInfo.InvalidPartitionRemover; +import org.apache.doris.nereids.rules.exploration.mv.StructInfo.QueryScanPartitionsCollector; import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping; import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping; import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping; import org.apache.doris.nereids.trees.expressions.Alias; -import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.scalar.NonNullable; import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable; @@ -46,20 +52,26 @@ import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation; +import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.TypeUtils; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -129,20 +141,22 @@ public List rewrite(Plan queryPlan, CascadesContext cascadesContext) { */ protected List getValidQueryStructInfos(Plan queryPlan, CascadesContext cascadesContext, BitSet materializedViewTableSet) { - return MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext, materializedViewTableSet) - .stream() - .filter(queryStructInfo -> { - boolean valid = checkPattern(queryStructInfo); - if (!valid) { - cascadesContext.getMaterializationContexts().forEach(ctx -> - ctx.recordFailReason(queryStructInfo, "Query struct info is invalid", - () -> String.format("query table bitmap is %s, plan is %s", - queryStructInfo.getTableBitSet(), queryPlan.treeString()) - )); - } - return valid; - }) - .collect(Collectors.toList()); + List validStructInfos = new ArrayList<>(); + List uncheckedStructInfos = MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext, + materializedViewTableSet); + uncheckedStructInfos.forEach(queryStructInfo -> { + boolean valid = checkPattern(queryStructInfo); + if (!valid) { + cascadesContext.getMaterializationContexts().forEach(ctx -> + ctx.recordFailReason(queryStructInfo, "Query struct info is invalid", + () -> String.format("query table bitmap is %s, plan is %s", + queryStructInfo.getTableBitSet(), queryPlan.treeString()) + )); + } else { + validStructInfos.add(queryStructInfo); + } + }); + return validStructInfos; } /** @@ -200,13 +214,13 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca } Plan rewrittenPlan; Plan mvScan = materializationContext.getMvScanPlan(); - Plan topPlan = queryStructInfo.getTopPlan(); + Plan queryPlan = queryStructInfo.getTopPlan(); if (compensatePredicates.isAlwaysTrue()) { rewrittenPlan = mvScan; } else { // Try to rewrite compensate predicates by using mv scan List rewriteCompensatePredicates = rewriteExpression(compensatePredicates.toList(), - topPlan, materializationContext.getMvExprToMvScanExprMapping(), + queryPlan, materializationContext.getMvExprToMvScanExprMapping(), viewToQuerySlotMapping, true, queryStructInfo.getTableBitSet()); if (rewriteCompensatePredicates.isEmpty()) { materializationContext.recordFailReason(queryStructInfo, @@ -225,65 +239,125 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca if (rewrittenPlan == null) { continue; } - final Plan finalRewrittenPlan = rewriteByRules(cascadesContext, rewrittenPlan, topPlan); - if (!isOutputValid(topPlan, finalRewrittenPlan)) { - materializationContext.recordFailReason(queryStructInfo, - "RewrittenPlan output logical properties is different with target group", - () -> String.format("planOutput logical" - + " properties = %s,\n groupOutput logical properties = %s", - finalRewrittenPlan.getLogicalProperties(), topPlan.getLogicalProperties())); - continue; - } + rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext, + childContext -> { + Rewriter.getWholeTreeRewriter(childContext).execute(); + return childContext.getRewritePlan(); + }, rewrittenPlan, queryPlan); // check the partitions used by rewritten plan is valid or not - Set invalidPartitionsQueryUsed = - calcInvalidPartitions(finalRewrittenPlan, materializationContext, cascadesContext); - if (!invalidPartitionsQueryUsed.isEmpty()) { + Multimap, Partition> invalidPartitionsQueryUsed = + calcUsedInvalidMvPartitions(rewrittenPlan, materializationContext, cascadesContext); + // All partition used by query is valid + if (!invalidPartitionsQueryUsed.isEmpty() && !cascadesContext.getConnectContext().getSessionVariable() + .isEnableMaterializedViewUnionRewrite()) { materializationContext.recordFailReason(queryStructInfo, "Check partition query used validation fail", () -> String.format("the partition used by query is invalid by materialized view," + "invalid partition info query used is %s", - materializationContext.getMTMV().getPartitions().stream() - .filter(partition -> - invalidPartitionsQueryUsed.contains(partition.getId())) + invalidPartitionsQueryUsed.values().stream() + .map(Partition::getName) .collect(Collectors.toSet()))); continue; } + boolean partitionValid = invalidPartitionsQueryUsed.isEmpty(); + if (checkCanUnionRewrite(invalidPartitionsQueryUsed, queryPlan, cascadesContext)) { + // construct filter on originalPlan + Map> filterOnOriginPlan; + try { + filterOnOriginPlan = Predicates.constructFilterByPartitions(invalidPartitionsQueryUsed, + queryToViewSlotMapping); + if (filterOnOriginPlan.isEmpty()) { + materializationContext.recordFailReason(queryStructInfo, + "construct invalid partition filter on query fail", + () -> String.format("the invalid partitions used by query is %s, query plan is %s", + invalidPartitionsQueryUsed.values().stream().map(Partition::getName) + .collect(Collectors.toSet()), + queryStructInfo.getOriginalPlan().treeString())); + continue; + } + } catch (org.apache.doris.common.AnalysisException e) { + materializationContext.recordFailReason(queryStructInfo, + "construct invalid partition filter on query analysis fail", + () -> String.format("the invalid partitions used by query is %s, query plan is %s", + invalidPartitionsQueryUsed.values().stream().map(Partition::getName) + .collect(Collectors.toSet()), + queryStructInfo.getOriginalPlan().treeString())); + continue; + } + // For rewrittenPlan which contains materialized view should remove invalid partition ids + List children = Lists.newArrayList( + rewrittenPlan.accept(new InvalidPartitionRemover(), Pair.of(materializationContext.getMTMV(), + invalidPartitionsQueryUsed.values().stream() + .map(Partition::getId).collect(Collectors.toSet()))), + StructInfo.addFilterOnTableScan(queryPlan, filterOnOriginPlan, cascadesContext)); + // Union query materialized view and source table + rewrittenPlan = new LogicalUnion(Qualifier.ALL, + queryPlan.getOutput().stream().map(NamedExpression.class::cast).collect(Collectors.toList()), + children.stream() + .map(plan -> plan.getOutput().stream() + .map(slot -> (SlotReference) slot.toSlot()).collect(Collectors.toList())) + .collect(Collectors.toList()), + ImmutableList.of(), + false, + children); + partitionValid = true; + } + if (!partitionValid) { + materializationContext.recordFailReason(queryStructInfo, + "materialized view partition is invalid union fail", + () -> String.format("invalidPartitionsQueryUsed = %s,\n query plan = %s", + invalidPartitionsQueryUsed, queryPlan.treeString())); + continue; + } + rewrittenPlan = normalizeExpressions(rewrittenPlan, queryPlan); + if (!isOutputValid(queryPlan, rewrittenPlan)) { + LogicalProperties logicalProperties = rewrittenPlan.getLogicalProperties(); + materializationContext.recordFailReason(queryStructInfo, + "RewrittenPlan output logical properties is different with target group", + () -> String.format("planOutput logical" + + " properties = %s,\n groupOutput logical properties = %s", + logicalProperties, queryPlan.getLogicalProperties())); + continue; + } recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext); - rewriteResults.add(finalRewrittenPlan); + rewriteResults.add(rewrittenPlan); } return rewriteResults; } - /** - * Rewrite by rules and try to make output is the same after optimize by rules - */ - protected Plan rewriteByRules(CascadesContext cascadesContext, Plan rewrittenPlan, Plan originPlan) { - List originOutputs = originPlan.getOutput(); - if (originOutputs.size() != rewrittenPlan.getOutput().size()) { - return null; - } - Map originSlotToRewrittenExprId = Maps.newLinkedHashMap(); - for (int i = 0; i < originOutputs.size(); i++) { - originSlotToRewrittenExprId.put(originOutputs.get(i), rewrittenPlan.getOutput().get(i).getExprId()); + private boolean checkCanUnionRewrite(Multimap, Partition> + invalidPartitionsQueryUsed, Plan queryPlan, CascadesContext cascadesContext) { + if (invalidPartitionsQueryUsed.isEmpty() + || !cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewUnionRewrite()) { + return false; } - // run rbo job on mv rewritten plan - CascadesContext rewrittenPlanContext = CascadesContext.initContext( - cascadesContext.getStatementContext(), rewrittenPlan, - cascadesContext.getCurrentJobContext().getRequiredProperties()); - Rewriter.getWholeTreeRewriter(rewrittenPlanContext).execute(); - rewrittenPlan = rewrittenPlanContext.getRewritePlan(); - - // for get right nullable after rewritten, we need this map - Map exprIdToNewRewrittenSlot = Maps.newLinkedHashMap(); - for (Slot slot : rewrittenPlan.getOutput()) { - exprIdToNewRewrittenSlot.put(slot.getExprId(), slot); + // if mv can not offer valid partition data for query, bail out union rewrite + Map> mvRelatedTablePartitionMap = new LinkedHashMap<>(); + invalidPartitionsQueryUsed.keySet().forEach(invalidPartition -> + mvRelatedTablePartitionMap.put(invalidPartition.key().getRelatedTableInfo().getTableId(), + new HashSet<>())); + queryPlan.accept(new QueryScanPartitionsCollector(), mvRelatedTablePartitionMap); + Set partitionKeyDescSetQueryUsed = mvRelatedTablePartitionMap.values().stream() + .flatMap(Collection::stream) + .map(PartitionItem::toPartitionKeyDesc) + .collect(Collectors.toSet()); + Set mvInvalidPartitionKeyDescSet = new HashSet<>(); + for (Map.Entry, Collection> entry : + invalidPartitionsQueryUsed.asMap().entrySet()) { + entry.getValue().forEach(invalidPartition -> mvInvalidPartitionKeyDescSet.add( + entry.getKey().value().getItem(invalidPartition.getId()).toPartitionKeyDesc())); } + return !mvInvalidPartitionKeyDescSet.containsAll(partitionKeyDescSetQueryUsed); + } + // Normalize expression such as nullable property and output slot id + protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) { // normalize nullable - ImmutableList convertNullable = originOutputs.stream() - .map(s -> normalizeExpression(s, exprIdToNewRewrittenSlot.get(originSlotToRewrittenExprId.get(s)))) - .collect(ImmutableList.toImmutableList()); - return new LogicalProject<>(convertNullable, rewrittenPlan); + List normalizeProjects = new ArrayList<>(); + for (int i = 0; i < originPlan.getOutput().size(); i++) { + normalizeProjects.add(normalizeExpression(originPlan.getOutput().get(i), rewrittenPlan.getOutput().get(i))); + } + return new LogicalProject<>(normalizeProjects, rewrittenPlan); } /** @@ -303,35 +377,47 @@ protected boolean isOutputValid(Plan sourcePlan, Plan rewrittenPlan) { * catalog relation. * Maybe only just some partitions is valid in materialized view, so we should check if the mv can * offer the partitions which query used or not. + * + * @return the invalid partition name set */ - protected Set calcInvalidPartitions(Plan rewrittenPlan, MaterializationContext materializationContext, + protected Multimap, Partition> calcUsedInvalidMvPartitions( + Plan rewrittenPlan, + MaterializationContext materializationContext, CascadesContext cascadesContext) { // check partition is valid or not MTMV mtmv = materializationContext.getMTMV(); PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo(); if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) { // if not partition, if rewrite success, it means mv is available - return ImmutableSet.of(); + return ImmutableMultimap.of(); } // check mv related table partition is valid or not MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo(); BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo(); if (relatedPartitionTable == null) { - return ImmutableSet.of(); + return ImmutableMultimap.of(); } // get mv valid partitions Set mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, cascadesContext.getConnectContext(), System.currentTimeMillis()).stream() .map(Partition::getId) .collect(Collectors.toSet()); - Set queryUsedPartitionIdSet = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan + // get partitions query used + Set mvPartitionSetQueryUsed = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan && Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName())) .stream() .map(node -> ((LogicalOlapScan) node).getSelectedPartitionIds()) .flatMap(Collection::stream) .collect(Collectors.toSet()); - queryUsedPartitionIdSet.removeAll(mvDataValidPartitionIdSet); - return queryUsedPartitionIdSet; + // get invalid partition ids + Set invalidMvPartitionIdSet = new HashSet<>(mvPartitionSetQueryUsed); + invalidMvPartitionIdSet.removeAll(mvDataValidPartitionIdSet); + ImmutableMultimap.Builder, Partition> invalidPartitionMapBuilder = + ImmutableMultimap.builder(); + Pair partitionInfo = Pair.of(mvCustomPartitionInfo, mvPartitionInfo); + invalidMvPartitionIdSet.forEach(invalidPartitionId -> + invalidPartitionMapBuilder.put(partitionInfo, mtmv.getPartition(invalidPartitionId))); + return invalidPartitionMapBuilder.build(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java index ac4b9ccad38174..46d0adde06e978 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java @@ -27,6 +27,7 @@ import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.memo.StructInfoMap; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; @@ -49,15 +50,17 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import java.util.ArrayList; import java.util.BitSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -147,13 +150,19 @@ public static List extractStructInfo(Plan plan, CascadesContext casc StructInfoMap structInfoMap = ownerGroup.getstructInfoMap(); structInfoMap.refresh(ownerGroup); Set queryTableSets = structInfoMap.getTableMaps(); + ImmutableList.Builder structInfosBuilder = ImmutableList.builder(); if (!queryTableSets.isEmpty()) { - return queryTableSets.stream() - // Just construct the struct info which mv table set contains all the query table set - .filter(queryTableSet -> materializedViewTableSet.isEmpty() - || StructInfo.containsAll(materializedViewTableSet, queryTableSet)) - .map(tableMap -> structInfoMap.getStructInfo(tableMap, tableMap, ownerGroup, plan)) - .collect(Collectors.toList()); + for (BitSet queryTableSet : queryTableSets) { + if (!materializedViewTableSet.isEmpty() + && !StructInfo.containsAll(materializedViewTableSet, queryTableSet)) { + continue; + } + StructInfo structInfo = structInfoMap.getStructInfo(queryTableSet, queryTableSet, ownerGroup, plan); + if (structInfo != null) { + structInfosBuilder.add(structInfo); + } + } + return structInfosBuilder.build(); } } // if plan doesn't belong to any group, construct it directly @@ -172,8 +181,8 @@ public static Plan generateMvScanPlan(MTMV materializedView, CascadesContext cas materializedView, ImmutableList.of(materializedView.getQualifiedDbName()), // this must be empty, or it will be used to sample - Lists.newArrayList(), - Lists.newArrayList(), + ImmutableList.of(), + ImmutableList.of(), Optional.empty()); mvScan = mvScan.withMaterializedIndexSelected(PreAggStatus.on(), materializedView.getBaseIndexId()); List mvProjects = mvScan.getOutput().stream().map(NamedExpression.class::cast) @@ -181,6 +190,43 @@ public static Plan generateMvScanPlan(MTMV materializedView, CascadesContext cas return new LogicalProject(mvProjects, mvScan); } + /** + * Optimize by rules, this support optimize by custom rules by define different rewriter according to different + * rules + */ + public static Plan rewriteByRules( + CascadesContext cascadesContext, + Function planRewriter, + Plan rewrittenPlan, Plan originPlan) { + List originOutputs = originPlan.getOutput(); + if (originOutputs.size() != rewrittenPlan.getOutput().size()) { + return null; + } + // After RBO, slot order may change, so need originSlotToRewrittenExprId which record + // origin plan slot order + List originalRewrittenPlanExprIds = + rewrittenPlan.getOutput().stream().map(Slot::getExprId).collect(Collectors.toList()); + // run rbo job on mv rewritten plan + CascadesContext rewrittenPlanContext = CascadesContext.initContext( + cascadesContext.getStatementContext(), rewrittenPlan, + cascadesContext.getCurrentJobContext().getRequiredProperties()); + rewrittenPlan = planRewriter.apply(rewrittenPlanContext); + Map exprIdToNewRewrittenSlot = Maps.newLinkedHashMap(); + for (Slot slot : rewrittenPlan.getOutput()) { + exprIdToNewRewrittenSlot.put(slot.getExprId(), slot); + } + List rewrittenPlanExprIds = rewrittenPlan.getOutput().stream() + .map(Slot::getExprId).collect(Collectors.toList()); + // If project order doesn't change, return rewrittenPlan directly + if (originalRewrittenPlanExprIds.equals(rewrittenPlanExprIds)) { + return rewrittenPlan; + } + // If project order change, return rewrittenPlan with reordered projects + return new LogicalProject<>(originalRewrittenPlanExprIds.stream() + .map(exprId -> (NamedExpression) exprIdToNewRewrittenSlot.get(exprId)).collect(Collectors.toList()), + rewrittenPlan); + } + private static final class TableQueryOperatorChecker extends DefaultPlanVisitor { public static final TableQueryOperatorChecker INSTANCE = new TableQueryOperatorChecker(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java index 139230be5d4b97..c801e683d65bf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java @@ -17,8 +17,15 @@ package org.apache.doris.nereids.rules.exploration.mv; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.rules.exploration.mv.mapping.EquivalenceClassSetMapping; +import org.apache.doris.nereids.rules.exploration.mv.mapping.Mapping.MappedSlot; import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping; import org.apache.doris.nereids.rules.expression.ExpressionNormalization; import org.apache.doris.nereids.rules.expression.ExpressionOptimization; @@ -27,13 +34,17 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; +import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.Utils; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -220,6 +231,47 @@ public String toString() { return Utils.toSqlString("Predicates", "pulledUpPredicates", pulledUpPredicates); } + /** Construct filter by partition + * @param partitions this is the partition which filter should be constructed from + * @param queryToViewSlotMapping construct filter on slot, the slot belong the slotmapping + * */ + public static Map> constructFilterByPartitions( + Multimap, Partition> partitions, + SlotMapping queryToViewSlotMapping) throws AnalysisException { + Map> constructedFilterMap = new HashMap<>(); + for (Map.Entry, Collection> entry : + partitions.asMap().entrySet()) { + // Get the base table partition column mv related + String relatedCol = entry.getKey().key().getRelatedCol(); + TableIf relatedTableInfo = entry.getKey().key().getRelatedTable(); + // Find the query slot which mv partition col mapped to + Optional partitionSlotQueryUsed = queryToViewSlotMapping.getRelationSlotMap() + .keySet() + .stream() + .filter(mappedSlot -> mappedSlot.getSlot().isColumnFromTable() + && mappedSlot.getSlot().getName().equals(relatedCol) + && mappedSlot.getBelongedRelation() != null + && mappedSlot.getBelongedRelation().getTable().getId() == relatedTableInfo.getId()) + .findFirst(); + if (!partitionSlotQueryUsed.isPresent()) { + return ImmutableMap.of(); + } + // Constructed filter which should add on the query base table, + // after supported data roll up this method should keep logic consistency to partition mapping + Set partitionExpressions = UpdateMvByPartitionCommand.constructPredicates( + // get mv partition items + entry.getValue().stream() + .map(partition -> entry.getKey().value().getItem(partition.getId())) + .collect(Collectors.toSet()), + partitionSlotQueryUsed.get().getSlot()); + // Put partition expressions on query base table + constructedFilterMap.computeIfPresent(relatedTableInfo, + (key, existExpressions) -> Sets.union(existExpressions, partitionExpressions)); + constructedFilterMap.computeIfAbsent(relatedTableInfo, key -> partitionExpressions); + } + return constructedFilterMap; + } + /** * The split different representation for predicate expression, such as equal, range and residual predicate. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java index 604e7853d48894..4979f11c28e5db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java @@ -17,13 +17,21 @@ package org.apache.doris.nereids.rules.exploration.mv; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.jobs.executor.Rewriter; import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph; import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.JoinEdge; import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate; +import org.apache.doris.nereids.trees.copier.DeepCopierContext; +import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; @@ -38,11 +46,16 @@ import org.apache.doris.nereids.trees.plans.algebra.Filter; import org.apache.doris.nereids.trees.plans.algebra.Join; import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalSort; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; import org.apache.doris.nereids.trees.plans.visitor.ExpressionLineageReplacer; import org.apache.doris.nereids.util.ExpressionUtils; @@ -584,4 +597,61 @@ private Boolean doVisit(Plan plan, PlanCheckContext checkContext) { return true; } } + + /** + * Add predicates on base table when materialized view scan contains invalid partitions + */ + public static class InvalidPartitionRemover extends DefaultPlanRewriter>> { + // materialized view scan is always LogicalOlapScan, so just handle LogicalOlapScan + @Override + public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Pair> context) { + if (olapScan.getTable().getName().equals(context.key().getName())) { + List selectedPartitionIds = olapScan.getSelectedPartitionIds(); + return olapScan.withSelectedPartitionIds(selectedPartitionIds.stream() + .filter(partitionId -> !context.value().contains(partitionId)) + .collect(Collectors.toList())); + } + return olapScan; + } + } + + /**Collect partitions which scan used according to given table */ + public static class QueryScanPartitionsCollector extends DefaultPlanVisitor>> { + @Override + public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, + Map> context) { + TableIf table = catalogRelation.getTable(); + if (!context.containsKey(table.getId())) { + return catalogRelation; + } + // Only support check olap partition currently + if (catalogRelation instanceof LogicalOlapScan) { + LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation; + PartitionInfo partitionInfo = logicalOlapScan.getTable().getPartitionInfo(); + logicalOlapScan.getSelectedPartitionIds().stream() + .map(partitionInfo::getItem) + .forEach(partitionItem -> context.computeIfPresent(table.getId(), (key, oldValue) -> { + oldValue.add(partitionItem); + return oldValue; + })); + } + return catalogRelation; + } + } + + /**Add filter on table scan according to table filter map */ + public static Plan addFilterOnTableScan(Plan queryPlan, Map> filterOnOriginPlan, + CascadesContext parentCascadesContext) { + // Firstly, construct filter form invalid partition, this filter should be added on origin plan + Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(), filterOnOriginPlan); + // Deep copy the plan to avoid the plan output is the same with the later union output, this may cause + // exec by mistake + queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy( + (LogicalPlan) queryPlanWithUnionFilter, new DeepCopierContext()); + // rbo rewrite after adding filter on origin plan + return MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> { + Rewriter.getWholeTreeRewriter(context).execute(); + return context.getRewritePlan(); + }, queryPlanWithUnionFilter, queryPlan); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java index 9dd807717856ba..152d4c5d92ebed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java @@ -37,7 +37,7 @@ public class EliminateSort extends DefaultPlanRewriter implements CustomRewriter { @Override public Plan rewriteRoot(Plan plan, JobContext jobContext) { - Boolean eliminateSort = false; + Boolean eliminateSort = true; return plan.accept(this, eliminateSort); } @@ -76,7 +76,7 @@ public Plan visitLogicalSink(LogicalSink logicalSink, Boolean el // eliminate sort return visit(logicalSink, true); } - return skipEliminateSort(logicalSink, eliminateSort); + return skipEliminateSort(logicalSink, false); } private Plan skipEliminateSort(Plan plan, Boolean eliminateSort) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 67864187bb2a48..d63aee6ea709e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; @@ -120,18 +121,27 @@ private static Map> constructTableWithPredicates(MTMV m */ @VisibleForTesting public static Set constructPredicates(Set partitions, String colName) { - Set predicates = new HashSet<>(); UnboundSlot slot = new UnboundSlot(colName); + return constructPredicates(partitions, slot); + } + + /** + * construct predicates for partition items, the min key is the min key of range items. + * For list partition or less than partition items, the min key is null. + */ + @VisibleForTesting + public static Set constructPredicates(Set partitions, Slot colSlot) { + Set predicates = new HashSet<>(); if (partitions.isEmpty()) { return Sets.newHashSet(BooleanLiteral.TRUE); } if (partitions.iterator().next() instanceof ListPartitionItem) { for (PartitionItem item : partitions) { - predicates.add(convertListPartitionToIn(item, slot)); + predicates.add(convertListPartitionToIn(item, colSlot)); } } else { for (PartitionItem item : partitions) { - predicates.add(convertRangePartitionToCompare(item, slot)); + predicates.add(convertRangePartitionToCompare(item, colSlot)); } } return predicates; @@ -186,7 +196,10 @@ private static Expression convertRangePartitionToCompare(PartitionItem item, Slo return predicate; } - static class PredicateAdder extends DefaultPlanRewriter>> { + /** + * Add predicates on base table when mv can partition update + */ + public static class PredicateAdder extends DefaultPlanRewriter>> { @Override public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map> predicates) { List tableQualifier = RelationUtil.getQualifierName(ConnectContext.get(), @@ -198,5 +211,16 @@ public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map> predicates) { + TableIf table = catalogRelation.getTable(); + if (predicates.containsKey(table)) { + return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))), + catalogRelation); + } + return catalogRelation; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java index 2060718ec1359b..1252f3b4bbf899 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java @@ -45,12 +45,12 @@ * Get from rewrite plan and can also get from plan struct info, if from plan struct info it depends on * the nodes from graph. */ -public class ExpressionLineageReplacer extends DefaultPlanVisitor { +public class ExpressionLineageReplacer extends DefaultPlanVisitor { public static final ExpressionLineageReplacer INSTANCE = new ExpressionLineageReplacer(); @Override - public Void visit(Plan plan, ExpressionReplaceContext context) { + public Expression visit(Plan plan, ExpressionReplaceContext context) { List expressions = plan.getExpressions(); Map targetExpressionMap = context.getExprIdExpressionMap(); // Filter the namedExpression used by target and collect the namedExpression @@ -62,7 +62,7 @@ public Void visit(Plan plan, ExpressionReplaceContext context) { } @Override - public Void visitGroupPlan(GroupPlan groupPlan, ExpressionReplaceContext context) { + public Expression visitGroupPlan(GroupPlan groupPlan, ExpressionReplaceContext context) { Group group = groupPlan.getGroup(); if (group == null) { return visit(groupPlan, context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index ddb530cf4d2f46..d89ff2ea8df535 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -514,6 +514,9 @@ public class SessionVariable implements Serializable, Writable { public static final String MATERIALIZED_VIEW_REWRITE_SUCCESS_CANDIDATE_NUM = "materialized_view_rewrite_success_candidate_num"; + public static final String ENABLE_MATERIALIZED_VIEW_UNION_REWRITE + = "enable_materialized_view_union_rewrite"; + public static final String CREATE_TABLE_PARTITION_MAX_NUM = "create_table_partition_max_num"; @@ -1619,6 +1622,13 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { "The max candidate num which participate in CBO when using asynchronous materialized views"}) public int materializedViewRewriteSuccessCandidateNum = 3; + @VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_UNION_REWRITE, needForward = true, + description = {"当物化视图不足以提供查询的全部数据时,是否允许基表和物化视图 union 来响应查询", + "When the materialized view is not enough to provide all the data for the query, " + + "whether to allow the union of the base table and the materialized view to " + + "respond to the query"}) + public boolean enableMaterializedViewUnionRewrite = false; + @VariableMgr.VarAttr(name = CREATE_TABLE_PARTITION_MAX_NUM, needForward = true, description = {"建表时创建分区的最大数量", "The maximum number of partitions created during table creation"}) @@ -3628,6 +3638,10 @@ public int getMaterializedViewRewriteSuccessCandidateNum() { return materializedViewRewriteSuccessCandidateNum; } + public boolean isEnableMaterializedViewUnionRewrite() { + return enableMaterializedViewUnionRewrite; + } + public int getCreateTablePartitionMaxNum() { return createTablePartitionMaxNum; } diff --git a/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out new file mode 100644 index 00000000000000..8559da6305b03b --- /dev/null +++ b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !query_all_direct_before -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_all_direct_after -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_partition_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_partition_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_all_before -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_all_after -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + diff --git a/regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy b/regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy new file mode 100644 index 00000000000000..9a2e893f7e4e8c --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy @@ -0,0 +1,564 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/* +This suite test self connection case + */ +suite("partition_mv_rewrite_dimension_self_conn") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=true" + sql "SET enable_nereids_timeout = false" + + sql """ + drop table if exists orders_self_conn + """ + + sql """CREATE TABLE `orders_self_conn` ( + `o_orderkey` BIGINT NULL, + `o_custkey` INT NULL, + `o_orderstatus` VARCHAR(1) NULL, + `o_totalprice` DECIMAL(15, 2) NULL, + `o_orderpriority` VARCHAR(15) NULL, + `o_clerk` VARCHAR(15) NULL, + `o_shippriority` INT NULL, + `o_comment` VARCHAR(79) NULL, + `o_orderdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(`o_orderkey`, `o_custkey`) + COMMENT 'OLAP' + auto partition by range (date_trunc(`o_orderdate`, 'day')) () + DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + drop table if exists lineitem_self_conn + """ + + sql """CREATE TABLE `lineitem_self_conn` ( + `l_orderkey` BIGINT NULL, + `l_linenumber` INT NULL, + `l_partkey` INT NULL, + `l_suppkey` INT NULL, + `l_quantity` DECIMAL(15, 2) NULL, + `l_extendedprice` DECIMAL(15, 2) NULL, + `l_discount` DECIMAL(15, 2) NULL, + `l_tax` DECIMAL(15, 2) NULL, + `l_returnflag` VARCHAR(1) NULL, + `l_linestatus` VARCHAR(1) NULL, + `l_commitdate` DATE NULL, + `l_receiptdate` DATE NULL, + `l_shipinstruct` VARCHAR(25) NULL, + `l_shipmode` VARCHAR(10) NULL, + `l_comment` VARCHAR(44) NULL, + `l_shipdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey ) + COMMENT 'OLAP' + auto partition by range (date_trunc(`l_shipdate`, 'day')) () + DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + insert into orders_self_conn values + (null, 1, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (1, null, 'o', 109.2, 'c','d',2, 'mm', '2023-10-17'), + (3, 3, null, 99.5, 'a', 'b', 1, 'yy', '2023-10-19'), + (1, 2, 'o', null, 'a', 'b', 1, 'yy', '2023-10-20'), + (2, 3, 'k', 109.2, null,'d',2, 'mm', '2023-10-21'), + (3, 1, 'k', 99.5, 'a', null, 1, 'yy', '2023-10-22'), + (1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-19'), + (2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-18'), + (3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19'); + """ + + sql """ + insert into lineitem_self_conn values + (null, 1, 2, 3, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (1, null, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (3, 3, null, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 2, 3, null, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (2, 3, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'), + (3, 1, 1, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 3, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'); + """ + + sql """analyze table orders_self_conn with sync;""" + sql """analyze table lineitem_self_conn with sync;""" + + def create_mv = { mv_name, mv_sql -> + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name};""" + sql """DROP TABLE IF EXISTS ${mv_name}""" + sql""" + CREATE MATERIALIZED VIEW ${mv_name} + BUILD IMMEDIATE REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mv_sql} + """ + } + + def compare_res = { def stmt -> + sql "SET enable_materialized_view_rewrite=false" + def origin_res = sql stmt + logger.info("origin_res: " + origin_res) + sql "SET enable_materialized_view_rewrite=true" + def mv_origin_res = sql stmt + logger.info("mv_origin_res: " + mv_origin_res) + assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size())) + for (int row = 0; row < mv_origin_res.size(); row++) { + assertTrue(mv_origin_res[row].size() == origin_res[row].size()) + for (int col = 0; col < mv_origin_res[row].size(); col++) { + assertTrue(mv_origin_res[row][col] == origin_res[row][col]) + } + } + } + + + // join direction + def mv_name_1 = "mv_self_conn" + + def join_direction_mv_1 = """ + select t1.l_Shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + """ + + create_mv(mv_name_1, join_direction_mv_1) + def job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(job_name_1) + + def join_direction_sql_1 = """ + select t1.L_SHIPDATE + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + """ + explain { + sql("${join_direction_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(join_direction_sql_1 + " order by 1") + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};""" + + // join filter position + def join_filter_stmt_1 = """select t1.L_SHIPDATE, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey""" + def join_filter_stmt_2 = """select t1.l_shipdate, t2.L_partkey, t1.l_suppkey + from (select * from lineitem_self_conn where l_shipdate = '2023-10-17' ) t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey""" + def join_filter_stmt_3 = """select t1.l_shipdate, t2.l_Partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join (select * from lineitem_self_conn where l_shipdate = '2023-10-17' ) t2 + on t1.l_orderkey = t2.l_orderkey""" + def join_filter_stmt_4 = """select t1.l_shipdate, t2.l_parTkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_shipdate = '2023-10-17'""" + def join_filter_stmt_5 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_suppkey=1""" + + def mv_list = [ + join_filter_stmt_1, join_filter_stmt_2, join_filter_stmt_3, join_filter_stmt_4, join_filter_stmt_5] + def join_self_conn_order = " order by 1, 2, 3" + for (int i =0; i < mv_list.size(); i++) { + logger.info("i:" + i) + def join_self_conn_mv = """join_self_conn_mv_${i}""" + create_mv(join_self_conn_mv, mv_list[i]) + def job_name = getJobName(db, join_self_conn_mv) + waitingMTMVTaskFinished(job_name) + if (i == 0) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 2) { + continue + } + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } + } else if (i == 1) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 1 || j == 3) { + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } else { + explain { + sql("${mv_list[j]}") + notContains "${join_self_conn_mv}(${join_self_conn_mv})" + } + } + } + } else if (i == 2) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 2) { + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } else { + explain { + sql("${mv_list[j]}") + notContains "${join_self_conn_mv}(${join_self_conn_mv})" + } + } + + } + } else if (i == 3) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 1 || j == 3) { + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } else { + explain { + sql("${mv_list[j]}") + notContains "${join_self_conn_mv}(${join_self_conn_mv})" + } + } + } + } else if (i == 4) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 4) { + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } else { + explain { + sql("${mv_list[j]}") + notContains "${join_self_conn_mv}(${join_self_conn_mv})" + } + } + } + } + sql """DROP MATERIALIZED VIEW IF EXISTS ${join_self_conn_mv};""" + } + + // join type + def join_type_stmt_1 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_2 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_3 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + right join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_5 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + full join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_6 = """select t1.l_shipdate, t1.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left semi join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_7 = """select t2.l_shipdate, t2.l_partkey, t2.l_suppkey + from lineitem_self_conn as t1 + right semi join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_8 = """select t1.l_shipdate, t1.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left anti join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_9 = """select t2.l_shipdate, t2.l_partkey, t2.l_suppkey + from lineitem_self_conn as t1 + right anti join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_list = [join_type_stmt_1, join_type_stmt_2, join_type_stmt_3, + join_type_stmt_5, join_type_stmt_6, join_type_stmt_7, join_type_stmt_8, join_type_stmt_9] + for (int i = 0; i < join_type_stmt_list.size(); i++) { + logger.info("i:" + i) + String join_type_self_conn_mv = """join_type_self_conn_mv_${i}""" + create_mv(join_type_self_conn_mv, join_type_stmt_list[i]) + def job_name = getJobName(db, join_type_self_conn_mv) + waitingMTMVTaskFinished(job_name) + if (i in [4, 5]) { + for (int j = 0; j < join_type_stmt_list.size(); j++) { + logger.info("j: " + j) + if (j in [4, 5]) { + explain { + sql("${join_type_stmt_list[j]}") + contains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + compare_res(join_type_stmt_list[j] + " order by 1,2,3") + } else { + explain { + sql("${join_type_stmt_list[j]}") + notContains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + } + } + } else if (i in [6, 7]) { + for (int j = 0; j < join_type_stmt_list.size(); j++) { + logger.info("j: " + j) + if (j in [6, 7]) { + explain { + sql("${join_type_stmt_list[j]}") + contains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + compare_res(join_type_stmt_list[j] + " order by 1,2,3") + } else { + explain { + sql("${join_type_stmt_list[j]}") + notContains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + } + } + } else { + for (int j = 0; j < join_type_stmt_list.size(); j++) { + logger.info("j:" + j) + if (i == j) { + explain { + sql("${join_type_stmt_list[j]}") + contains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + compare_res(join_type_stmt_list[j] + " order by 1,2,3") + } else { + explain { + sql("${join_type_stmt_list[j]}") + notContains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + } + } + } + sql """DROP MATERIALIZED VIEW IF EXISTS ${join_type_self_conn_mv};""" + } + + // agg + // agg + without group by + with agg function + agg_mv_stmt = """ + select t2.o_orderkey, + sum(t1.O_TOTALPRICE) as sum_total, + max(t1.o_totalprice) as max_total, + min(t1.o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when t1.o_shippriority > 2 and t1.o_orderkey IN (2) then t1.o_custkey else null end)) as cnt_2 + from orders_self_conn as t1 + left join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by t2.o_orderkey + """ + + create_mv(mv_name_1, agg_mv_stmt) + job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(job_name_1) + + def agg_sql_1 = """select t2.o_orderkey, + count(distinct case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end) as cnt_1, + count(distinct case when t1.O_SHIPPRIORITY > 2 and t1.o_orderkey IN (2) then t1.o_custkey else null end) as cnt_2, + sum(t1.O_totalprice), + max(t1.o_totalprice), + min(t1.o_totalprice), + count(*) + from orders_self_conn as t1 + left join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by t2.o_orderkey + """ + explain { + sql("${agg_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(agg_sql_1 + " order by 1,2,3,4,5,6,7") + + agg_sql_1 = """select t2.o_orderkey, + count(distinct case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end) as cnt_1, + count(distinct case when t1.O_SHIPPRIORITY > 2 and t1.o_orderkey IN (2) then t1.o_custkey else null end) as cnt_2, + sum(t1.O_totalprice), + max(t1.o_totalprice), + min(t1.o_totalprice), + count(*) + from orders_self_conn as t1 + left join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by t2.o_orderkey + """ + explain { + sql("${agg_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(agg_sql_1 + " order by 1,2,3,4,5,6") + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};""" + + // agg + with group by + without agg function + + def agg_mv_stmt_2 = """ + select t1.o_orderdatE, t2.O_SHIPPRIORITY, t1.o_comment + from orders_self_conn as t1 + inner join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by + t1.o_orderdate, + t2.o_shippriority, + t1.o_comment + """ + create_mv(mv_name_1, agg_mv_stmt_2) + def agg_job_name_2 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(agg_job_name_2) + sql """analyze table ${mv_name_1} with sync;""" + + def agg_sql_2 = """ + select t2.O_SHIPPRIORITY, t1.o_comment + from orders_self_conn as t1 + inner join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by + t2.o_shippriority, + t1.o_comment + """ + explain { + sql("${agg_sql_2}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(agg_sql_2 + " order by 1,2") + + // agg + with group by + with agg function + def agg_mv_stmt_3 = """ + select t1.o_orderdatE, t2.o_shippriority, t1.o_comment, + sum(t1.o_totalprice) as sum_total, + max(t2.o_totalpricE) as max_total, + min(t1.o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when t2.o_shippriority > 2 and t2.o_orderkey IN (2) then t2.o_custkey else null end)) as cnt_2 + from orders_self_conn as t1 + inner join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by + t1.o_orderdatE, + t2.o_shippriority, + t1.o_comment + """ + create_mv(mv_name_1, agg_mv_stmt_3) + def agg_job_name_3 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(agg_job_name_3) + sql """analyze table ${mv_name_1} with sync;""" + + def agg_sql_3 = """ + select t2.o_shippriority, t1.o_comment, + sum(t1.o_totalprice) as sum_total, + max(t2.o_totalpricE) as max_total, + min(t1.o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when t2.o_shippriority > 2 and t2.o_orderkey IN (2) then t2.o_custkey else null end)) as cnt_2 + from orders_self_conn as t1 + inner join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by + t2.o_shippriority, + t1.o_comment + """ + explain { + sql("${agg_sql_3}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(agg_sql_3 + " order by 1,2,3,4,5,6") + + + // view partital rewriting + def view_partition_mv_stmt_1 = """ + select t1.l_shipdatE, t2.l_partkey, t1.l_orderkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY + group by t1.l_shipdate, t2.l_partkey, t1.l_orderkeY""" + create_mv(mv_name_1, view_partition_mv_stmt_1) + def view_partition_job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(view_partition_job_name_1) + + def view_partition_sql_1 = """select t.l_shipdate, lineitem_self_conn.l_orderkey, t.l_partkey + from ( + select t1.l_shipdatE as l_shipdatE, t2.l_partkey as l_partkey, t1.l_orderkey as l_orderkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY + group by t1.l_shipdate, t2.l_partkey, t1.l_orderkeY + ) t + inner join lineitem_self_conn + on t.l_partkey = lineitem_self_conn.l_partkey + group by t.l_shipdate, lineitem_self_conn.l_orderkey, t.l_partkey + """ + explain { + sql("${view_partition_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(view_partition_sql_1 + " order by 1,2,3") + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};""" + + + // predicate compensate + def predicate_mv_stmt_1 = """ + select t1.l_shipdatE, t2.l_shipdate, t1.l_partkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_shipdate >= "2023-10-17" + """ + create_mv(mv_name_1, predicate_mv_stmt_1) + def predicate_job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(predicate_job_name_1) + + def predicate_sql_1 = """ + select t1.l_shipdatE, t2.l_shipdate, t1.l_partkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_shipdate >= "2023-10-17" and t1.l_partkey = 1 + """ + explain { + sql("${predicate_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(predicate_sql_1 + " order by 1,2,3") + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};""" + +} diff --git a/regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy b/regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy new file mode 100644 index 00000000000000..4a0f513f0fae88 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy @@ -0,0 +1,859 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("nested_mtmv") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=true" + sql "SET enable_nereids_timeout = false" + + sql """ + drop table if exists orders_1 + """ + + sql """CREATE TABLE `orders_1` ( + `o_orderkey` BIGINT NULL, + `o_custkey` INT NULL, + `o_orderstatus` VARCHAR(1) NULL, + `o_totalprice` DECIMAL(15, 2) NULL, + `o_orderpriority` VARCHAR(15) NULL, + `o_clerk` VARCHAR(15) NULL, + `o_shippriority` INT NULL, + `o_comment` VARCHAR(79) NULL, + `o_orderdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(`o_orderkey`, `o_custkey`) + COMMENT 'OLAP' + auto partition by range (date_trunc(`o_orderdate`, 'day')) () + DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + drop table if exists lineitem_1 + """ + + sql """CREATE TABLE `lineitem_1` ( + `l_orderkey` BIGINT NULL, + `l_linenumber` INT NULL, + `l_partkey` INT NULL, + `l_suppkey` INT NULL, + `l_quantity` DECIMAL(15, 2) NULL, + `l_extendedprice` DECIMAL(15, 2) NULL, + `l_discount` DECIMAL(15, 2) NULL, + `l_tax` DECIMAL(15, 2) NULL, + `l_returnflag` VARCHAR(1) NULL, + `l_linestatus` VARCHAR(1) NULL, + `l_commitdate` DATE NULL, + `l_receiptdate` DATE NULL, + `l_shipinstruct` VARCHAR(25) NULL, + `l_shipmode` VARCHAR(10) NULL, + `l_comment` VARCHAR(44) NULL, + `l_shipdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey ) + COMMENT 'OLAP' + auto partition by range (date_trunc(`l_shipdate`, 'day')) () + DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + drop table if exists partsupp_1 + """ + + sql """CREATE TABLE `partsupp_1` ( + `ps_partkey` INT NULL, + `ps_suppkey` INT NULL, + `ps_availqty` INT NULL, + `ps_supplycost` DECIMAL(15, 2) NULL, + `ps_comment` VARCHAR(199) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`ps_partkey`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`ps_partkey`) BUCKETS 24 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + insert into orders_1 values + (null, 1, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (1, null, 'o', 109.2, 'c','d',2, 'mm', '2023-10-17'), + (3, 3, null, 99.5, 'a', 'b', 1, 'yy', '2023-10-19'), + (1, 2, 'o', null, 'a', 'b', 1, 'yy', '2023-10-20'), + (2, 3, 'k', 109.2, null,'d',2, 'mm', '2023-10-21'), + (3, 1, 'k', 99.5, 'a', null, 1, 'yy', '2023-10-22'), + (1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-19'), + (2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-18'), + (3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19'); + """ + + sql """ + insert into lineitem_1 values + (null, 1, 2, 3, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (1, 1, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (3, 3, 3, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 2, 3, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (2, 1, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'), + (3, 1, 3, 1, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 2, 1, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (2, 2, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'), + (3, 3, 3, 3, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 1, 1, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'); + """ + + sql""" + insert into partsupp_1 values + (1, 1, 1, 99.5, 'yy'), + (2, 2, 2, 109.2, 'mm'), + (3, 3, 1, 99.5, 'yy'), + (3, null, 1, 99.5, 'yy'); + """ + + sql """analyze table orders_1 with sync;""" + sql """analyze table lineitem_1 with sync;""" + sql """analyze table partsupp_1 with sync;""" + + def create_mv = { mv_name, mv_sql -> + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name};""" + sql """DROP TABLE IF EXISTS ${mv_name}""" + sql""" + CREATE MATERIALIZED VIEW ${mv_name} + BUILD IMMEDIATE REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mv_sql} + """ + } + + def compare_res = { def stmt -> + sql "SET enable_materialized_view_rewrite=false" + def origin_res = sql stmt + logger.info("origin_res: " + origin_res) + sql "SET enable_materialized_view_rewrite=true" + def mv_origin_res = sql stmt + logger.info("mv_origin_res: " + mv_origin_res) + assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size())) + for (int row = 0; row < mv_origin_res.size(); row++) { + assertTrue(mv_origin_res[row].size() == origin_res[row].size()) + for (int col = 0; col < mv_origin_res[row].size(); col++) { + assertTrue(mv_origin_res[row][col] == origin_res[row][col]) + } + } + } + + // sr + def mv_stmt_1 = """SELECT l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey + FROM lineitem_1 INNER JOIN orders_1 + ON l_orderkey = o_orderkey""" + def mv_name_1 = "join_mv1" + def mv_stmt_2 = """SELECT + l_orderkey, + l_linenumber, + o_orderkey, + sum(l_partkey) AS total_revenue, + max(o_custkey) AS max_discount + FROM ${mv_name_1} + GROUP BY l_orderkey, l_linenumber, o_orderkey;""" + def mv_name_2 = "agg_mv2" + def mv_stmt_3 = """SELECT + l_orderkey, + sum(total_revenue) AS total_revenue, + max(max_discount) AS max_discount + FROM ${mv_name_2} + GROUP BY l_orderkey;""" + def mv_name_3 = "join_agg_mv3" + create_mv(mv_name_1, mv_stmt_1) + def job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_name_2, mv_stmt_2) + job_name_1 = getJobName(db, mv_name_2) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_name_3, mv_stmt_3) + job_name_1 = getJobName(db, mv_name_3) + waitingMTMVTaskFinished(job_name_1) + + def query_stmt_1 = """SELECT + l_orderkey, + sum(l_partkey) AS total_revenue, + max(o_custkey) AS max_discount + FROM lineitem_1 INNER JOIN orders_1 + ON l_orderkey = o_orderkey + GROUP BY l_orderkey""" + explain { + sql("${query_stmt_1}") + contains "${mv_name_3}(${mv_name_3})" + } + compare_res(query_stmt_1 + " order by 1,2,3") + + // user + def mv_stmt_4 = """ + select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as col1 + from lineitem_1 + inner join orders_1 on lineitem_1.l_orderkey = orders_1.o_orderkey + inner join partsupp_1 on lineitem_1.l_partkey = partsupp_1.ps_partkey and lineitem_1.l_suppkey = partsupp_1.ps_suppkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey + """ + def mv_level1_name = "mv_level1_name" + def mv_stmt_5 = """ + select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, col1 + from ${mv_level1_name} + """ + def mv_level2_name = "mv_level2_name" + def mv_stmt_6 = """ + select t1.l_orderkey, t2.l_linenumber, t1.l_partkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.col1 + from ${mv_level1_name} as t1 + left join ${mv_level1_name} as t2 + on t1.l_orderkey = t2.l_orderkey + """ + def mv_level3_name = "mv_level3_name" + def mv_stmt_7 = """ + select t1.l_orderkey, t2.l_linenumber, t1.l_partkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.col1 + from ${mv_level2_name} as t1 + left join ${mv_level2_name} as t2 + on t1.l_orderkey = t2.l_orderkey + """ + def mv_level4_name = "mv_level4_name" + + create_mv(mv_level1_name, mv_stmt_4) + job_name_1 = getJobName(db, mv_level1_name) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_level2_name, mv_stmt_5) + job_name_1 = getJobName(db, mv_level2_name) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_level3_name, mv_stmt_6) + job_name_1 = getJobName(db, mv_level3_name) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_level4_name, mv_stmt_7) + job_name_1 = getJobName(db, mv_level4_name) + waitingMTMVTaskFinished(job_name_1) + + def query_stmt_2 = """ + select t1.l_orderkey, t2.l_linenumber, t1.l_partkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.col1 + from (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, col1 + from (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as col1 + from lineitem_1 + inner join orders_1 on lineitem_1.l_orderkey = orders_1.o_orderkey + inner join partsupp_1 on lineitem_1.l_partkey = partsupp_1.ps_partkey and lineitem_1.l_suppkey = partsupp_1.ps_suppkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey + ) as t1 + ) as t1 + left join (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, col1 + from (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as col1 + from lineitem_1 + inner join orders_1 on lineitem_1.l_orderkey = orders_1.o_orderkey + inner join partsupp_1 on lineitem_1.l_partkey = partsupp_1.ps_partkey and lineitem_1.l_suppkey = partsupp_1.ps_suppkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey + ) as t1 + ) as t2 on t1.l_orderkey = t2.l_orderkey + """ + explain { + sql("${query_stmt_2}") + contains "${mv_level3_name}(${mv_level3_name})" + } + compare_res(query_stmt_2 + " order by 1,2,3,4,5,6,7") + + // five level + def mv_1 = "mv1" + def mv_2 = "mv2" + def mv_3 = "mv3" + def mv_4 = "mv4" + def mv_5 = "mv5" + + def join_mv_1 = """ + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + """ + def join_mv_2 = """ + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ${mv_1} as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + """ + def join_mv_3 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ${mv_2} as t1 + left join ${mv_2} as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + def join_mv_4 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ${mv_3} as t1 + left join ${mv_3} as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + def join_mv_5 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ${mv_4} as t1 + left join ${mv_4} as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + + create_mv(mv_1, join_mv_1) + job_name_1 = getJobName(db, mv_1) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_2, join_mv_2) + job_name_1 = getJobName(db, mv_2) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_3, join_mv_3) + job_name_1 = getJobName(db, mv_3) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_4, join_mv_4) + job_name_1 = getJobName(db, mv_4) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_5, join_mv_5) + job_name_1 = getJobName(db, mv_5) + waitingMTMVTaskFinished(job_name_1) + + + def sql_2 = """ + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + """ + def sql_3 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + def sql_4 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t1 + left join ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + def sql_5 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t1 + left join ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t1 + left join ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t1 + left join ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + + explain { + sql("${sql_2}") + contains "${mv_2}(${mv_2})" + } + compare_res(sql_2 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13") + + explain { + sql("${sql_3}") + contains "${mv_3}(${mv_3})" + } + compare_res(sql_3 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13") + + explain { + sql("${sql_4}") + contains "${mv_4}(${mv_4})" + } + compare_res(sql_4 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13") + + explain { + sql("${sql_5}") + contains "${mv_5}(${mv_5})" + } + compare_res(sql_5 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13") + + + + +} diff --git a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy index e799c01fff96b9..1d34e9617da0ac 100644 --- a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy @@ -166,12 +166,29 @@ suite("partition_mv_rewrite") { // wait partition is invalid sleep(5000) // only can use valid partition + sql "SET enable_materialized_view_union_rewrite=false" + // Test query all partition when disable enable_materialized_view_union_rewrite + order_qt_query_all_direct_before "${all_partition_sql}" explain { sql("${all_partition_sql}") notContains("${mv_name}(${mv_name})") } + order_qt_query_all_direct_after "${all_partition_sql}" + + // Test query part partition when disable enable_materialized_view_union_rewrite + order_qt_query_partition_before "${partition_sql}" explain { sql("${partition_sql}") contains("${mv_name}(${mv_name})") } + order_qt_query_partition_after "${partition_sql}" + + // Test query part partition when enable enable_materialized_view_union_rewrite + sql "SET enable_materialized_view_union_rewrite=true" + order_qt_query_all_before "${all_partition_sql}" + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + order_qt_query_all_after "${all_partition_sql}" } diff --git a/regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy new file mode 100644 index 00000000000000..cfdd327c0d2199 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite ("partition_curd_union_rewrite") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "SET enable_nereids_planner=true" + sql "set runtime_filter_mode=OFF" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=true" + sql "SET enable_nereids_timeout = false" + + sql """ + drop table if exists orders + """ + + sql """ + CREATE TABLE IF NOT EXISTS orders ( + o_orderkey integer not null, + o_custkey integer not null, + o_orderstatus char(1) not null, + o_totalprice decimalv3(15,2) not null, + o_orderdate date not null, + o_orderpriority char(15) not null, + o_clerk char(15) not null, + o_shippriority integer not null, + o_comment varchar(79) not null + ) + DUPLICATE KEY(o_orderkey, o_custkey) + PARTITION BY RANGE(o_orderdate)( + FROM ('2023-10-17') TO ('2023-11-01') INTERVAL 1 DAY + ) + DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + drop table if exists lineitem + """ + + sql""" + CREATE TABLE IF NOT EXISTS lineitem ( + l_orderkey integer not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimalv3(15,2) not null, + l_extendedprice decimalv3(15,2) not null, + l_discount decimalv3(15,2) not null, + l_tax decimalv3(15,2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null + ) + DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber) + PARTITION BY RANGE(l_shipdate) + (FROM ('2023-10-17') TO ('2023-11-01') INTERVAL 1 DAY) + DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql""" + insert into orders values + (1, 1, 'ok', 99.5, '2023-10-17', 'a', 'b', 1, 'yy'), + (2, 2, 'ok', 109.2, '2023-10-18', 'c','d',2, 'mm'), + (3, 3, 'ok', 99.5, '2023-10-19', 'a', 'b', 1, 'yy'); + """ + + sql """ + insert into lineitem values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'), + (2, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'), + (3, 2, 3, 6, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'); + """ + + + def mv_def_sql = """ + select l_shipdate, o_orderdate, l_partkey, + l_suppkey, sum(o_totalprice) as sum_total + from lineitem + left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate + group by + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey + """ + + def all_partition_sql = """ + select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total + from lineitem + left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate + group by + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey + """ + + + def partition_sql = """ + select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total + from lineitem + left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate + where (l_shipdate>= '2023-10-18' and l_shipdate <= '2023-10-19') + group by + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey + """ + + sql """DROP MATERIALIZED VIEW IF EXISTS mv_10086""" + sql """DROP TABLE IF EXISTS mv_10086""" + sql""" + CREATE MATERIALIZED VIEW mv_10086 + BUILD IMMEDIATE REFRESH AUTO ON MANUAL + partition by(l_shipdate) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mv_def_sql} + """ + + + def compare_res = { def stmt -> + sql "SET enable_materialized_view_rewrite=false" + def origin_res = sql stmt + logger.info("origin_res: " + origin_res) + sql "SET enable_materialized_view_rewrite=true" + def mv_origin_res = sql stmt + logger.info("mv_origin_res: " + mv_origin_res) + assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size())) + for (int row = 0; row < mv_origin_res.size(); row++) { + assertTrue(mv_origin_res[row].size() == origin_res[row].size()) + for (int col = 0; col < mv_origin_res[row].size(); col++) { + assertTrue(mv_origin_res[row][col] == origin_res[row][col]) + } + } + } + + def mv_name = "mv_10086" + def order_by_stmt = " order by 1,2,3,4,5" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + + // All partition is valid, test query and rewrite by materialized view + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(all_partition_sql + order_by_stmt) + explain { + sql("${partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(partition_sql + order_by_stmt) + + /* + // Part partition is invalid, test can not use partition 2023-10-17 to rewrite + sql """ + insert into lineitem values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'); + """ + // wait partition is invalid + sleep(5000) + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(all_partition_sql + order_by_stmt) + explain { + sql("${partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(partition_sql + order_by_stmt) + + sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + // Test when base table create partition + sql """ + insert into lineitem values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-21', '2023-10-21', '2023-10-21', 'a', 'b', 'yyyyyyyyy'); + """ + // Wait partition is invalid + sleep(5000) + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(all_partition_sql + order_by_stmt) + explain { + sql("${partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(partition_sql + order_by_stmt) + + // Test when base table delete partition test + sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + sql """ ALTER TABLE lineitem DROP PARTITION IF EXISTS p_20231021 FORCE; + """ + // Wait partition is invalid + sleep(3000) + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(all_partition_sql + order_by_stmt) + explain { + sql("${partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(partition_sql + order_by_stmt) + */ +} diff --git a/regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy new file mode 100644 index 00000000000000..1e474abc8ffb48 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite ("usercase_union_rewrite") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=true" + sql "SET enable_nereids_timeout = false" + + sql """ + drop table if exists orders_user + """ + + sql """CREATE TABLE `orders_user` ( + `o_orderkey` BIGINT NULL, + `o_custkey` INT NULL, + `o_orderstatus` VARCHAR(1) NULL, + `o_totalprice` DECIMAL(15, 2) NULL, + `o_orderpriority` VARCHAR(15) NULL, + `o_clerk` VARCHAR(15) NULL, + `o_shippriority` INT NULL, + `o_comment` VARCHAR(79) NULL, + `o_orderdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(`o_orderkey`, `o_custkey`) + COMMENT 'OLAP' + auto partition by range (date_trunc(`o_orderdate`, 'day')) () + DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + drop table if exists lineitem_user + """ + + sql """CREATE TABLE `lineitem_user` ( + `l_orderkey` BIGINT NULL, + `l_linenumber` INT NULL, + `l_partkey` INT NULL, + `l_suppkey` INT NULL, + `l_quantity` DECIMAL(15, 2) NULL, + `l_extendedprice` DECIMAL(15, 2) NULL, + `l_discount` DECIMAL(15, 2) NULL, + `l_tax` DECIMAL(15, 2) NULL, + `l_returnflag` VARCHAR(1) NULL, + `l_linestatus` VARCHAR(1) NULL, + `l_commitdate` DATE NULL, + `l_receiptdate` DATE NULL, + `l_shipinstruct` VARCHAR(25) NULL, + `l_shipmode` VARCHAR(10) NULL, + `l_comment` VARCHAR(44) NULL, + `l_shipdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey ) + COMMENT 'OLAP' + auto partition by range (date_trunc(`l_shipdate`, 'day')) () + DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + insert into orders_user values + (1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-19'), + (2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-18'), + (3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19'); + """ + + sql """ + insert into lineitem_user values + (2, 3, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'), + (3, 1, 1, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 3, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'); + """ + + sql """analyze table orders_user with sync;""" + sql """analyze table lineitem_user with sync;""" + + def create_mv_orders = { mv_name, mv_sql -> + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name};""" + sql """DROP TABLE IF EXISTS ${mv_name}""" + sql""" + CREATE MATERIALIZED VIEW ${mv_name} + BUILD IMMEDIATE REFRESH AUTO ON MANUAL + partition by(o_orderdate) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mv_sql} + """ + } + + def compare_res = { def stmt -> + sql "SET enable_materialized_view_rewrite=false" + def origin_res = sql stmt + logger.info("origin_res: " + origin_res) + sql "SET enable_materialized_view_rewrite=true" + def mv_origin_res = sql stmt + logger.info("mv_origin_res: " + mv_origin_res) + assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size())) + for (int row = 0; row < mv_origin_res.size(); row++) { + assertTrue(mv_origin_res[row].size() == origin_res[row].size()) + for (int col = 0; col < mv_origin_res[row].size(); col++) { + assertTrue(mv_origin_res[row][col] == origin_res[row][col]) + } + } + } + + def mv_name = "mv_usercase" + def mv_stmt = """select o_orderdatE, o_shippriority, o_comment, o_orderdate, + sum(o_totalprice) as sum_total, + max(o_totalpricE) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from (select * from orders_user) as t1 + group by + o_orderdatE, + o_shippriority, + o_comment, + o_orderdate + """ + create_mv_orders(mv_name, mv_stmt) + def job_name_1 = getJobName(db, mv_name) + waitingMTMVTaskFinished(job_name_1) + + def query_stmt = """select o_orderdatE, o_shippriority, o_comment, o_orderdate, + sum(o_totalprice) as sum_total, + max(o_totalpricE) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from (select * from orders_user) as t1 + group by + o_orderdatE, + o_shippriority, + o_comment, + o_orderdate + """ + explain { + sql("${query_stmt}") + contains "${mv_name}(${mv_name})" + } + compare_res(query_stmt + " order by 1,2,3,4,5,6,7,8") + + sql """insert into orders_user values (5, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19');""" + sql "SET enable_materialized_view_union_rewrite=true" + sleep(10 * 1000) + explain { + sql("${query_stmt}") + contains "${mv_name}(${mv_name})" + } + compare_res(query_stmt + " order by 1,2,3,4,5,6,7,8") +}