diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index 69232d6e261b93..036ffcd225177e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -316,7 +316,7 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca continue; } Pair>, Map>> invalidPartitions; - if (PartitionCompensator.needUnionRewrite(materializationContext) + if (PartitionCompensator.needUnionRewrite(materializationContext, cascadesContext.getStatementContext()) && sessionVariable.isEnableMaterializedViewUnionRewrite()) { MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv(); Map, Set> queryUsedPartitions = PartitionCompensator.getQueryUsedPartitions( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java index 22c3540b5cbbf2..3fe966864d04f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java @@ -23,8 +23,10 @@ import org.apache.doris.catalog.PartitionType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.ExternalTable; import org.apache.doris.mtmv.BaseColInfo; import org.apache.doris.mtmv.BaseTableInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.StatementContext; @@ -223,14 +225,38 @@ public static boolean needUnionRewrite( /** * Check if need union compensate or not + * If query base table all partitions with ALL_PARTITIONS or ALL_PARTITIONS_LIST, should not do union compensate + * because it means query all partitions from base table and prune partition failed */ - public static boolean needUnionRewrite(MaterializationContext materializationContext) { + public static boolean needUnionRewrite(MaterializationContext materializationContext, + StatementContext statementContext) throws AnalysisException { if (!(materializationContext instanceof AsyncMaterializationContext)) { return false; } MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv(); PartitionType type = mtmv.getPartitionInfo().getType(); - List pctInfos = mtmv.getMvPartitionInfo().getPctInfos(); + MTMVPartitionInfo mvPartitionInfo = mtmv.getMvPartitionInfo(); + List pctInfos = mvPartitionInfo.getPctInfos(); + Set pctTables = mvPartitionInfo.getPctTables(); + Multimap, Pair>> tableUsedPartitionNameMap = + statementContext.getTableUsedPartitionNameMap(); + for (MTMVRelatedTableIf pctTable : pctTables) { + if (pctTable instanceof ExternalTable && !((ExternalTable) pctTable).supportInternalPartitionPruned()) { + // if pct table is external table and not support internal partition pruned, + // we consider query all partitions from pct table, this would cause loop union compensate, + // so we skip union compensate in this case + return false; + } + Collection>> tableUsedPartitions + = tableUsedPartitionNameMap.get(pctTable.getFullQualifiers()); + if (ALL_PARTITIONS_LIST.equals(tableUsedPartitions) + || tableUsedPartitions.stream().anyMatch(ALL_PARTITIONS::equals)) { + // If query base table all partitions with ALL_PARTITIONS or ALL_PARTITIONS_LIST, + // should not do union compensate, because it means query all partitions from base table + // and prune partition failed + return false; + } + } return !PartitionType.UNPARTITIONED.equals(type) && !pctInfos.isEmpty(); } @@ -238,11 +264,11 @@ public static boolean needUnionRewrite(MaterializationContext materializationCon * Get query used partitions * this is calculated from tableUsedPartitionNameMap and tables in statementContext * - * @param customRelationIdSet if union compensate occurs, the new query used partitions is changed, + * @param currentUsedRelationIdSet if union compensate occurs, the new query used partitions is changed, * so need to get used partitions by relation id set */ public static Map, Set> getQueryUsedPartitions(StatementContext statementContext, - BitSet customRelationIdSet) { + BitSet currentUsedRelationIdSet) { // get table used partitions // if table is not in statementContext().getTables() which means the table is partition prune as empty relation Multimap, Pair>> tableUsedPartitionNameMap = statementContext @@ -267,6 +293,13 @@ public static Map, Set> getQueryUsedPartitions(StatementCon queryUsedRelatedTablePartitionsMap.put(queryUsedTable, null); continue tableLoop; } + // If currentUsedRelationIdSet is not empty, need check relation id to get concrete used partitions + BitSet usedPartitionRelation = new BitSet(); + usedPartitionRelation.set(tableUsedPartitionPair.key().asInt()); + if (!currentUsedRelationIdSet.isEmpty() + && !currentUsedRelationIdSet.intersects(usedPartitionRelation)) { + continue; + } usedPartitionSet.addAll(tableUsedPartitionPair.value()); } queryUsedRelatedTablePartitionsMap.put(queryUsedTable, usedPartitionSet); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java index 9a123a04d2bf4f..7f616d7045ffcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java @@ -161,7 +161,7 @@ public LogicalHudiScan withSelectedPartitions(SelectedPartitions selectedPartiti public LogicalHudiScan withRelationId(RelationId relationId) { return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, selectedPartitions, tableSample, tableSnapshot, scanParams, incrementalRelation, - operativeSlots, virtualColumns, groupExpression, Optional.of(getLogicalProperties())); + operativeSlots, virtualColumns, groupExpression, Optional.empty()); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java index 672628e7d1e620..17d75f93fcffc7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java @@ -17,21 +17,39 @@ package org.apache.doris.nereids.rules.exploration.mv; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.mtmv.BaseColInfo; +import org.apache.doris.mtmv.BaseTableInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo; +import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.utframe.TestWithFeService; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.util.BitSet; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class PartitionCompensatorTest extends TestWithFeService { @@ -191,4 +209,185 @@ public void testGetAllTableUsedPartitionList() { Assertions.assertEquals(orderTableUsedPartition, ImmutableSet.of("p1", "p2", "p3", "p4")); }); } + + @Test + public void testNeedUnionRewriteUnpartitionedOrNoPctInfos() throws Exception { + MaterializationContext ctx1 = mockCtx( + PartitionType.UNPARTITIONED, + ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())), + ImmutableSet.of(), + false); + StatementContext sc1 = Mockito.mock(StatementContext.class); + Mockito.when(sc1.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx1, sc1)); + + MaterializationContext ctx2 = mockCtx( + PartitionType.RANGE, + Collections.emptyList(), + ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)), + false); + StatementContext sc2 = Mockito.mock(StatementContext.class); + Mockito.when(sc2.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx2, sc2)); + } + + @Test + public void testNeedUnionRewriteEmptyPctTables() throws Exception { + MaterializationContext ctx = mockCtx( + PartitionType.RANGE, + ImmutableList.of(), + Collections.emptySet(), + false); + StatementContext sc = Mockito.mock(StatementContext.class); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testNeedUnionRewriteExternalNoPrune() throws Exception { + MaterializationContext ctx = mockCtx( + PartitionType.LIST, + ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())), + ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)), + true); + StatementContext sc = Mockito.mock(StatementContext.class); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testNeedUnionRewritePositive() throws Exception { + MaterializationContext ctx = mockCtx( + PartitionType.LIST, + ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())), + ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)), + false); + StatementContext sc = Mockito.mock(StatementContext.class); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception { + BaseTableInfo tableInfo = newBaseTableInfo(); + MaterializationContext ctx = mockCtx( + PartitionType.LIST, + ImmutableList.of(new BaseColInfo("c", tableInfo)), + ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)), + false); + StatementContext sc = Mockito.mock(StatementContext.class); + + ArrayListMultimap, Pair>> t = ArrayListMultimap.create(); + t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testGetQueryUsedPartitionsAllAndPartial() { + // Prepare qualifiers + List lineitemQualifier = ImmutableList.of( + "internal", "partition_compensate_test", "lineitem_list_partition"); + List ordersQualifier = ImmutableList.of( + "internal", "partition_compensate_test", "orders_list_partition"); + + Multimap, Pair>> tableUsedPartitionNameMap + = connectContext.getStatementContext().getTableUsedPartitionNameMap(); + tableUsedPartitionNameMap.clear(); + + tableUsedPartitionNameMap.put(lineitemQualifier, PartitionCompensator.ALL_PARTITIONS); + + RelationId ridA = new RelationId(1); + RelationId ridB = new RelationId(2); + tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA, ImmutableSet.of("p1", "p2"))); + tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB, ImmutableSet.of("p3"))); + + Map, Set> result = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), new BitSet()); + Assertions.assertNull(result.get(lineitemQualifier)); // all partitions + Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"), result.get(ordersQualifier)); + + BitSet filterRidA = new BitSet(); + filterRidA.set(ridA.asInt()); + Map, Set> resultRidA = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), filterRidA); + Assertions.assertNull(resultRidA.get(lineitemQualifier)); + Assertions.assertEquals(ImmutableSet.of("p1", "p2"), resultRidA.get(ordersQualifier)); + + BitSet filterRidB = new BitSet(); + filterRidB.set(ridB.asInt()); + Map, Set> resultRidB = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), filterRidB); + Assertions.assertNull(resultRidB.get(lineitemQualifier)); + Assertions.assertEquals(ImmutableSet.of("p3"), resultRidB.get(ordersQualifier)); + + tableUsedPartitionNameMap.put(ordersQualifier, PartitionCompensator.ALL_PARTITIONS); + Map, Set> resultAllOrders = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), new BitSet()); + Assertions.assertNull(resultAllOrders.get(ordersQualifier)); + } + + @Test + public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() { + List qualifier = ImmutableList.of( + "internal", "partition_compensate_test", "lineitem_list_partition"); + Multimap, Pair>> tableUsedPartitionNameMap + = connectContext.getStatementContext().getTableUsedPartitionNameMap(); + tableUsedPartitionNameMap.clear(); + // Put an empty set via a distinct relation id to simulate no partitions used + RelationId rid = new RelationId(3); + tableUsedPartitionNameMap.put(qualifier, Pair.of(rid, ImmutableSet.of())); + + Map, Set> result = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), new BitSet()); + Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier)); + } + + private static MaterializationContext mockCtx( + PartitionType type, + List pctInfos, + Set pctTables, + boolean externalNoPrune) throws AnalysisException { + + MTMV mtmv = Mockito.mock(MTMV.class); + PartitionInfo pi = Mockito.mock(PartitionInfo.class); + Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi); + Mockito.when(pi.getType()).thenReturn(type); + + MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class); + Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi); + Mockito.when(mpi.getPctInfos()).thenReturn(pctInfos); + Mockito.when(mpi.getPctTables()).thenReturn(pctTables); + + if (externalNoPrune) { + HMSExternalTable ext = Mockito.mock(HMSExternalTable.class); + Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false); + Set tbls = new HashSet<>(pctTables); + tbls.add(ext); + Mockito.when(mpi.getPctTables()).thenReturn( + tbls.stream().map(MTMVRelatedTableIf.class::cast).collect(Collectors.toSet())); + } + + AsyncMaterializationContext ctx = Mockito.mock(AsyncMaterializationContext.class); + Mockito.when(ctx.getMtmv()).thenReturn(mtmv); + return ctx; + } + + private static BaseTableInfo newBaseTableInfo() { + CatalogIf catalog = Mockito.mock(CatalogIf.class); + Mockito.when(catalog.getId()).thenReturn(1L); + Mockito.when(catalog.getName()).thenReturn("internal"); + + DatabaseIf db = Mockito.mock(DatabaseIf.class); + Mockito.when(db.getId()).thenReturn(2L); + Mockito.when(db.getFullName()).thenReturn("partition_compensate_test"); + Mockito.when(db.getCatalog()).thenReturn(catalog); + + TableIf table = Mockito.mock(TableIf.class); + Mockito.when(table.getId()).thenReturn(3L); + Mockito.when(table.getName()).thenReturn("t"); + Mockito.when(table.getDatabase()).thenReturn(db); + + return new BaseTableInfo(table); + } } diff --git a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy index 70a11d3633acf4..680f7eaa93d48c 100644 --- a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy +++ b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy @@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv", "p2,external,hudi,external_remote,external_remot waitingMTMVTaskFinishedByMvName(mvName) order_qt_refresh_one_partition "SELECT * FROM ${mvName} " - def explainOnePartition = sql """ explain ${mvSql} """ - logger.info("explainOnePartition: " + explainOnePartition.toString()) - assertTrue(explainOnePartition.toString().contains("VUNION")) + mv_rewrite_success(mvSql, mvName) order_qt_refresh_one_partition_rewrite "${mvSql}" mv_rewrite_success("${mvSql}", "${mvName}") @@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv", "p2,external,hudi,external_remote,external_remot waitingMTMVTaskFinishedByMvName(mvName) order_qt_refresh_auto "SELECT * FROM ${mvName} " - def explainAllPartition = sql """ explain ${mvSql}; """ - logger.info("explainAllPartition: " + explainAllPartition.toString()) - assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + mv_rewrite_success(mvSql, mvName) order_qt_refresh_all_partition_rewrite "${mvSql}" mv_rewrite_success("${mvSql}", "${mvName}")