From c5428ead975c48151cf135873478ea5e38b18fe9 Mon Sep 17 00:00:00 2001 From: seawinde Date: Thu, 4 Dec 2025 14:20:09 +0800 Subject: [PATCH] [fix](mtmv) Fix hudi materialized view union all rewritten plan execute fail because of invalid slot (#58643) Related PR: #57558 #58413 Problem Summary: This fix addresses the following three issues: 1. When invoking the method org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan#withRelationId, the output needs to be recalculated to meet expectations. 2. After compensating with a union all due to partial partition invalidation of a materialized view, during the next round of transparent rewriting, the rewriting for the child of the union allshould use the query partitioncorresponding to the specific relation id to prevent infinite loops. 3. Currently, in the `test_hudi_rewrite_mtmv` test, if the plan rewritten by the materialized view transparent rewriting is not selected by the CBO, it is difficult to troubleshoot because explain memo planis not used. Therefore, the corresponding test method is modified. --- .../mv/AbstractMaterializedViewRule.java | 2 +- .../exploration/mv/PartitionCompensator.java | 41 +++- .../trees/plans/logical/LogicalHudiScan.java | 2 +- .../mv/PartitionCompensatorTest.java | 199 ++++++++++++++++++ .../hudi_mtmv/test_hudi_rewrite_mtmv.groovy | 8 +- 5 files changed, 240 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index 69232d6e261b93..036ffcd225177e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -316,7 +316,7 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca continue; } Pair>, Map>> invalidPartitions; - if (PartitionCompensator.needUnionRewrite(materializationContext) + if (PartitionCompensator.needUnionRewrite(materializationContext, cascadesContext.getStatementContext()) && sessionVariable.isEnableMaterializedViewUnionRewrite()) { MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv(); Map, Set> queryUsedPartitions = PartitionCompensator.getQueryUsedPartitions( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java index 22c3540b5cbbf2..3fe966864d04f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java @@ -23,8 +23,10 @@ import org.apache.doris.catalog.PartitionType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.ExternalTable; import org.apache.doris.mtmv.BaseColInfo; import org.apache.doris.mtmv.BaseTableInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.StatementContext; @@ -223,14 +225,38 @@ public static boolean needUnionRewrite( /** * Check if need union compensate or not + * If query base table all partitions with ALL_PARTITIONS or ALL_PARTITIONS_LIST, should not do union compensate + * because it means query all partitions from base table and prune partition failed */ - public static boolean needUnionRewrite(MaterializationContext materializationContext) { + public static boolean needUnionRewrite(MaterializationContext materializationContext, + StatementContext statementContext) throws AnalysisException { if (!(materializationContext instanceof AsyncMaterializationContext)) { return false; } MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv(); PartitionType type = mtmv.getPartitionInfo().getType(); - List pctInfos = mtmv.getMvPartitionInfo().getPctInfos(); + MTMVPartitionInfo mvPartitionInfo = mtmv.getMvPartitionInfo(); + List pctInfos = mvPartitionInfo.getPctInfos(); + Set pctTables = mvPartitionInfo.getPctTables(); + Multimap, Pair>> tableUsedPartitionNameMap = + statementContext.getTableUsedPartitionNameMap(); + for (MTMVRelatedTableIf pctTable : pctTables) { + if (pctTable instanceof ExternalTable && !((ExternalTable) pctTable).supportInternalPartitionPruned()) { + // if pct table is external table and not support internal partition pruned, + // we consider query all partitions from pct table, this would cause loop union compensate, + // so we skip union compensate in this case + return false; + } + Collection>> tableUsedPartitions + = tableUsedPartitionNameMap.get(pctTable.getFullQualifiers()); + if (ALL_PARTITIONS_LIST.equals(tableUsedPartitions) + || tableUsedPartitions.stream().anyMatch(ALL_PARTITIONS::equals)) { + // If query base table all partitions with ALL_PARTITIONS or ALL_PARTITIONS_LIST, + // should not do union compensate, because it means query all partitions from base table + // and prune partition failed + return false; + } + } return !PartitionType.UNPARTITIONED.equals(type) && !pctInfos.isEmpty(); } @@ -238,11 +264,11 @@ public static boolean needUnionRewrite(MaterializationContext materializationCon * Get query used partitions * this is calculated from tableUsedPartitionNameMap and tables in statementContext * - * @param customRelationIdSet if union compensate occurs, the new query used partitions is changed, + * @param currentUsedRelationIdSet if union compensate occurs, the new query used partitions is changed, * so need to get used partitions by relation id set */ public static Map, Set> getQueryUsedPartitions(StatementContext statementContext, - BitSet customRelationIdSet) { + BitSet currentUsedRelationIdSet) { // get table used partitions // if table is not in statementContext().getTables() which means the table is partition prune as empty relation Multimap, Pair>> tableUsedPartitionNameMap = statementContext @@ -267,6 +293,13 @@ public static Map, Set> getQueryUsedPartitions(StatementCon queryUsedRelatedTablePartitionsMap.put(queryUsedTable, null); continue tableLoop; } + // If currentUsedRelationIdSet is not empty, need check relation id to get concrete used partitions + BitSet usedPartitionRelation = new BitSet(); + usedPartitionRelation.set(tableUsedPartitionPair.key().asInt()); + if (!currentUsedRelationIdSet.isEmpty() + && !currentUsedRelationIdSet.intersects(usedPartitionRelation)) { + continue; + } usedPartitionSet.addAll(tableUsedPartitionPair.value()); } queryUsedRelatedTablePartitionsMap.put(queryUsedTable, usedPartitionSet); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java index 9a123a04d2bf4f..7f616d7045ffcb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java @@ -161,7 +161,7 @@ public LogicalHudiScan withSelectedPartitions(SelectedPartitions selectedPartiti public LogicalHudiScan withRelationId(RelationId relationId) { return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, selectedPartitions, tableSample, tableSnapshot, scanParams, incrementalRelation, - operativeSlots, virtualColumns, groupExpression, Optional.of(getLogicalProperties())); + operativeSlots, virtualColumns, groupExpression, Optional.empty()); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java index 672628e7d1e620..17d75f93fcffc7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java @@ -17,21 +17,39 @@ package org.apache.doris.nereids.rules.exploration.mv; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.mtmv.BaseColInfo; +import org.apache.doris.mtmv.BaseTableInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo; +import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.utframe.TestWithFeService; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.util.BitSet; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class PartitionCompensatorTest extends TestWithFeService { @@ -191,4 +209,185 @@ public void testGetAllTableUsedPartitionList() { Assertions.assertEquals(orderTableUsedPartition, ImmutableSet.of("p1", "p2", "p3", "p4")); }); } + + @Test + public void testNeedUnionRewriteUnpartitionedOrNoPctInfos() throws Exception { + MaterializationContext ctx1 = mockCtx( + PartitionType.UNPARTITIONED, + ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())), + ImmutableSet.of(), + false); + StatementContext sc1 = Mockito.mock(StatementContext.class); + Mockito.when(sc1.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx1, sc1)); + + MaterializationContext ctx2 = mockCtx( + PartitionType.RANGE, + Collections.emptyList(), + ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)), + false); + StatementContext sc2 = Mockito.mock(StatementContext.class); + Mockito.when(sc2.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx2, sc2)); + } + + @Test + public void testNeedUnionRewriteEmptyPctTables() throws Exception { + MaterializationContext ctx = mockCtx( + PartitionType.RANGE, + ImmutableList.of(), + Collections.emptySet(), + false); + StatementContext sc = Mockito.mock(StatementContext.class); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testNeedUnionRewriteExternalNoPrune() throws Exception { + MaterializationContext ctx = mockCtx( + PartitionType.LIST, + ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())), + ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)), + true); + StatementContext sc = Mockito.mock(StatementContext.class); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testNeedUnionRewritePositive() throws Exception { + MaterializationContext ctx = mockCtx( + PartitionType.LIST, + ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())), + ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)), + false); + StatementContext sc = Mockito.mock(StatementContext.class); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create()); + Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception { + BaseTableInfo tableInfo = newBaseTableInfo(); + MaterializationContext ctx = mockCtx( + PartitionType.LIST, + ImmutableList.of(new BaseColInfo("c", tableInfo)), + ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)), + false); + StatementContext sc = Mockito.mock(StatementContext.class); + + ArrayListMultimap, Pair>> t = ArrayListMultimap.create(); + t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS); + Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t); + Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc)); + } + + @Test + public void testGetQueryUsedPartitionsAllAndPartial() { + // Prepare qualifiers + List lineitemQualifier = ImmutableList.of( + "internal", "partition_compensate_test", "lineitem_list_partition"); + List ordersQualifier = ImmutableList.of( + "internal", "partition_compensate_test", "orders_list_partition"); + + Multimap, Pair>> tableUsedPartitionNameMap + = connectContext.getStatementContext().getTableUsedPartitionNameMap(); + tableUsedPartitionNameMap.clear(); + + tableUsedPartitionNameMap.put(lineitemQualifier, PartitionCompensator.ALL_PARTITIONS); + + RelationId ridA = new RelationId(1); + RelationId ridB = new RelationId(2); + tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA, ImmutableSet.of("p1", "p2"))); + tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB, ImmutableSet.of("p3"))); + + Map, Set> result = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), new BitSet()); + Assertions.assertNull(result.get(lineitemQualifier)); // all partitions + Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"), result.get(ordersQualifier)); + + BitSet filterRidA = new BitSet(); + filterRidA.set(ridA.asInt()); + Map, Set> resultRidA = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), filterRidA); + Assertions.assertNull(resultRidA.get(lineitemQualifier)); + Assertions.assertEquals(ImmutableSet.of("p1", "p2"), resultRidA.get(ordersQualifier)); + + BitSet filterRidB = new BitSet(); + filterRidB.set(ridB.asInt()); + Map, Set> resultRidB = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), filterRidB); + Assertions.assertNull(resultRidB.get(lineitemQualifier)); + Assertions.assertEquals(ImmutableSet.of("p3"), resultRidB.get(ordersQualifier)); + + tableUsedPartitionNameMap.put(ordersQualifier, PartitionCompensator.ALL_PARTITIONS); + Map, Set> resultAllOrders = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), new BitSet()); + Assertions.assertNull(resultAllOrders.get(ordersQualifier)); + } + + @Test + public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() { + List qualifier = ImmutableList.of( + "internal", "partition_compensate_test", "lineitem_list_partition"); + Multimap, Pair>> tableUsedPartitionNameMap + = connectContext.getStatementContext().getTableUsedPartitionNameMap(); + tableUsedPartitionNameMap.clear(); + // Put an empty set via a distinct relation id to simulate no partitions used + RelationId rid = new RelationId(3); + tableUsedPartitionNameMap.put(qualifier, Pair.of(rid, ImmutableSet.of())); + + Map, Set> result = PartitionCompensator.getQueryUsedPartitions( + connectContext.getStatementContext(), new BitSet()); + Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier)); + } + + private static MaterializationContext mockCtx( + PartitionType type, + List pctInfos, + Set pctTables, + boolean externalNoPrune) throws AnalysisException { + + MTMV mtmv = Mockito.mock(MTMV.class); + PartitionInfo pi = Mockito.mock(PartitionInfo.class); + Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi); + Mockito.when(pi.getType()).thenReturn(type); + + MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class); + Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi); + Mockito.when(mpi.getPctInfos()).thenReturn(pctInfos); + Mockito.when(mpi.getPctTables()).thenReturn(pctTables); + + if (externalNoPrune) { + HMSExternalTable ext = Mockito.mock(HMSExternalTable.class); + Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false); + Set tbls = new HashSet<>(pctTables); + tbls.add(ext); + Mockito.when(mpi.getPctTables()).thenReturn( + tbls.stream().map(MTMVRelatedTableIf.class::cast).collect(Collectors.toSet())); + } + + AsyncMaterializationContext ctx = Mockito.mock(AsyncMaterializationContext.class); + Mockito.when(ctx.getMtmv()).thenReturn(mtmv); + return ctx; + } + + private static BaseTableInfo newBaseTableInfo() { + CatalogIf catalog = Mockito.mock(CatalogIf.class); + Mockito.when(catalog.getId()).thenReturn(1L); + Mockito.when(catalog.getName()).thenReturn("internal"); + + DatabaseIf db = Mockito.mock(DatabaseIf.class); + Mockito.when(db.getId()).thenReturn(2L); + Mockito.when(db.getFullName()).thenReturn("partition_compensate_test"); + Mockito.when(db.getCatalog()).thenReturn(catalog); + + TableIf table = Mockito.mock(TableIf.class); + Mockito.when(table.getId()).thenReturn(3L); + Mockito.when(table.getName()).thenReturn("t"); + Mockito.when(table.getDatabase()).thenReturn(db); + + return new BaseTableInfo(table); + } } diff --git a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy index 70a11d3633acf4..680f7eaa93d48c 100644 --- a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy +++ b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy @@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv", "p2,external,hudi,external_remote,external_remot waitingMTMVTaskFinishedByMvName(mvName) order_qt_refresh_one_partition "SELECT * FROM ${mvName} " - def explainOnePartition = sql """ explain ${mvSql} """ - logger.info("explainOnePartition: " + explainOnePartition.toString()) - assertTrue(explainOnePartition.toString().contains("VUNION")) + mv_rewrite_success(mvSql, mvName) order_qt_refresh_one_partition_rewrite "${mvSql}" mv_rewrite_success("${mvSql}", "${mvName}") @@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv", "p2,external,hudi,external_remote,external_remot waitingMTMVTaskFinishedByMvName(mvName) order_qt_refresh_auto "SELECT * FROM ${mvName} " - def explainAllPartition = sql """ explain ${mvSql}; """ - logger.info("explainAllPartition: " + explainAllPartition.toString()) - assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + mv_rewrite_success(mvSql, mvName) order_qt_refresh_all_partition_rewrite "${mvSql}" mv_rewrite_success("${mvSql}", "${mvName}")