Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
continue;
}
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseColInfo, Set<String>>> invalidPartitions;
if (PartitionCompensator.needUnionRewrite(materializationContext)
if (PartitionCompensator.needUnionRewrite(materializationContext, cascadesContext.getStatementContext())
&& sessionVariable.isEnableMaterializedViewUnionRewrite()) {
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
Map<List<String>, Set<String>> queryUsedPartitions = PartitionCompensator.getQueryUsedPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseColInfo;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
Expand Down Expand Up @@ -223,26 +225,50 @@ public static boolean needUnionRewrite(

/**
* Check if need union compensate or not
* If query base table all partitions with ALL_PARTITIONS or ALL_PARTITIONS_LIST, should not do union compensate
* because it means query all partitions from base table and prune partition failed
*/
public static boolean needUnionRewrite(MaterializationContext materializationContext) {
public static boolean needUnionRewrite(MaterializationContext materializationContext,
StatementContext statementContext) throws AnalysisException {
if (!(materializationContext instanceof AsyncMaterializationContext)) {
return false;
}
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
PartitionType type = mtmv.getPartitionInfo().getType();
List<BaseColInfo> pctInfos = mtmv.getMvPartitionInfo().getPctInfos();
MTMVPartitionInfo mvPartitionInfo = mtmv.getMvPartitionInfo();
List<BaseColInfo> pctInfos = mvPartitionInfo.getPctInfos();
Set<MTMVRelatedTableIf> pctTables = mvPartitionInfo.getPctTables();
Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap =
statementContext.getTableUsedPartitionNameMap();
for (MTMVRelatedTableIf pctTable : pctTables) {
if (pctTable instanceof ExternalTable && !((ExternalTable) pctTable).supportInternalPartitionPruned()) {
// if pct table is external table and not support internal partition pruned,
// we consider query all partitions from pct table, this would cause loop union compensate,
// so we skip union compensate in this case
return false;
}
Collection<Pair<RelationId, Set<String>>> tableUsedPartitions
= tableUsedPartitionNameMap.get(pctTable.getFullQualifiers());
if (ALL_PARTITIONS_LIST.equals(tableUsedPartitions)
|| tableUsedPartitions.stream().anyMatch(ALL_PARTITIONS::equals)) {
// If query base table all partitions with ALL_PARTITIONS or ALL_PARTITIONS_LIST,
// should not do union compensate, because it means query all partitions from base table
// and prune partition failed
return false;
}
}
return !PartitionType.UNPARTITIONED.equals(type) && !pctInfos.isEmpty();
}

/**
* Get query used partitions
* this is calculated from tableUsedPartitionNameMap and tables in statementContext
*
* @param customRelationIdSet if union compensate occurs, the new query used partitions is changed,
* @param currentUsedRelationIdSet if union compensate occurs, the new query used partitions is changed,
* so need to get used partitions by relation id set
*/
public static Map<List<String>, Set<String>> getQueryUsedPartitions(StatementContext statementContext,
BitSet customRelationIdSet) {
BitSet currentUsedRelationIdSet) {
// get table used partitions
// if table is not in statementContext().getTables() which means the table is partition prune as empty relation
Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap = statementContext
Expand All @@ -267,6 +293,13 @@ public static Map<List<String>, Set<String>> getQueryUsedPartitions(StatementCon
queryUsedRelatedTablePartitionsMap.put(queryUsedTable, null);
continue tableLoop;
}
// If currentUsedRelationIdSet is not empty, need check relation id to get concrete used partitions
BitSet usedPartitionRelation = new BitSet();
usedPartitionRelation.set(tableUsedPartitionPair.key().asInt());
if (!currentUsedRelationIdSet.isEmpty()
&& !currentUsedRelationIdSet.intersects(usedPartitionRelation)) {
continue;
}
usedPartitionSet.addAll(tableUsedPartitionPair.value());
}
queryUsedRelatedTablePartitionsMap.put(queryUsedTable, usedPartitionSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public LogicalHudiScan withSelectedPartitions(SelectedPartitions selectedPartiti
public LogicalHudiScan withRelationId(RelationId relationId) {
return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier,
selectedPartitions, tableSample, tableSnapshot, scanParams, incrementalRelation,
operativeSlots, virtualColumns, groupExpression, Optional.of(getLogicalProperties()));
operativeSlots, virtualColumns, groupExpression, Optional.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,39 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mtmv.BaseColInfo;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.utframe.TestWithFeService;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class PartitionCompensatorTest extends TestWithFeService {

Expand Down Expand Up @@ -191,4 +209,185 @@ public void testGetAllTableUsedPartitionList() {
Assertions.assertEquals(orderTableUsedPartition, ImmutableSet.of("p1", "p2", "p3", "p4"));
});
}

@Test
public void testNeedUnionRewriteUnpartitionedOrNoPctInfos() throws Exception {
MaterializationContext ctx1 = mockCtx(
PartitionType.UNPARTITIONED,
ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
ImmutableSet.of(),
false);
StatementContext sc1 = Mockito.mock(StatementContext.class);
Mockito.when(sc1.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx1, sc1));

MaterializationContext ctx2 = mockCtx(
PartitionType.RANGE,
Collections.emptyList(),
ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
false);
StatementContext sc2 = Mockito.mock(StatementContext.class);
Mockito.when(sc2.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx2, sc2));
}

@Test
public void testNeedUnionRewriteEmptyPctTables() throws Exception {
MaterializationContext ctx = mockCtx(
PartitionType.RANGE,
ImmutableList.of(),
Collections.emptySet(),
false);
StatementContext sc = Mockito.mock(StatementContext.class);
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
}

@Test
public void testNeedUnionRewriteExternalNoPrune() throws Exception {
MaterializationContext ctx = mockCtx(
PartitionType.LIST,
ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
true);
StatementContext sc = Mockito.mock(StatementContext.class);
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
}

@Test
public void testNeedUnionRewritePositive() throws Exception {
MaterializationContext ctx = mockCtx(
PartitionType.LIST,
ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
false);
StatementContext sc = Mockito.mock(StatementContext.class);
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc));
}

@Test
public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception {
BaseTableInfo tableInfo = newBaseTableInfo();
MaterializationContext ctx = mockCtx(
PartitionType.LIST,
ImmutableList.of(new BaseColInfo("c", tableInfo)),
ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
false);
StatementContext sc = Mockito.mock(StatementContext.class);

ArrayListMultimap<List<String>, Pair<RelationId, Set<String>>> t = ArrayListMultimap.create();
t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS);
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t);
Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
}

@Test
public void testGetQueryUsedPartitionsAllAndPartial() {
// Prepare qualifiers
List<String> lineitemQualifier = ImmutableList.of(
"internal", "partition_compensate_test", "lineitem_list_partition");
List<String> ordersQualifier = ImmutableList.of(
"internal", "partition_compensate_test", "orders_list_partition");

Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap
= connectContext.getStatementContext().getTableUsedPartitionNameMap();
tableUsedPartitionNameMap.clear();

tableUsedPartitionNameMap.put(lineitemQualifier, PartitionCompensator.ALL_PARTITIONS);

RelationId ridA = new RelationId(1);
RelationId ridB = new RelationId(2);
tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA, ImmutableSet.of("p1", "p2")));
tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB, ImmutableSet.of("p3")));

Map<List<String>, Set<String>> result = PartitionCompensator.getQueryUsedPartitions(
connectContext.getStatementContext(), new BitSet());
Assertions.assertNull(result.get(lineitemQualifier)); // all partitions
Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"), result.get(ordersQualifier));

BitSet filterRidA = new BitSet();
filterRidA.set(ridA.asInt());
Map<List<String>, Set<String>> resultRidA = PartitionCompensator.getQueryUsedPartitions(
connectContext.getStatementContext(), filterRidA);
Assertions.assertNull(resultRidA.get(lineitemQualifier));
Assertions.assertEquals(ImmutableSet.of("p1", "p2"), resultRidA.get(ordersQualifier));

BitSet filterRidB = new BitSet();
filterRidB.set(ridB.asInt());
Map<List<String>, Set<String>> resultRidB = PartitionCompensator.getQueryUsedPartitions(
connectContext.getStatementContext(), filterRidB);
Assertions.assertNull(resultRidB.get(lineitemQualifier));
Assertions.assertEquals(ImmutableSet.of("p3"), resultRidB.get(ordersQualifier));

tableUsedPartitionNameMap.put(ordersQualifier, PartitionCompensator.ALL_PARTITIONS);
Map<List<String>, Set<String>> resultAllOrders = PartitionCompensator.getQueryUsedPartitions(
connectContext.getStatementContext(), new BitSet());
Assertions.assertNull(resultAllOrders.get(ordersQualifier));
}

@Test
public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() {
List<String> qualifier = ImmutableList.of(
"internal", "partition_compensate_test", "lineitem_list_partition");
Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap
= connectContext.getStatementContext().getTableUsedPartitionNameMap();
tableUsedPartitionNameMap.clear();
// Put an empty set via a distinct relation id to simulate no partitions used
RelationId rid = new RelationId(3);
tableUsedPartitionNameMap.put(qualifier, Pair.of(rid, ImmutableSet.of()));

Map<List<String>, Set<String>> result = PartitionCompensator.getQueryUsedPartitions(
connectContext.getStatementContext(), new BitSet());
Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier));
}

private static MaterializationContext mockCtx(
PartitionType type,
List<BaseColInfo> pctInfos,
Set<MTMVRelatedTableIf> pctTables,
boolean externalNoPrune) throws AnalysisException {

MTMV mtmv = Mockito.mock(MTMV.class);
PartitionInfo pi = Mockito.mock(PartitionInfo.class);
Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi);
Mockito.when(pi.getType()).thenReturn(type);

MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class);
Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi);
Mockito.when(mpi.getPctInfos()).thenReturn(pctInfos);
Mockito.when(mpi.getPctTables()).thenReturn(pctTables);

if (externalNoPrune) {
HMSExternalTable ext = Mockito.mock(HMSExternalTable.class);
Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false);
Set<TableIf> tbls = new HashSet<>(pctTables);
tbls.add(ext);
Mockito.when(mpi.getPctTables()).thenReturn(
tbls.stream().map(MTMVRelatedTableIf.class::cast).collect(Collectors.toSet()));
}

AsyncMaterializationContext ctx = Mockito.mock(AsyncMaterializationContext.class);
Mockito.when(ctx.getMtmv()).thenReturn(mtmv);
return ctx;
}

private static BaseTableInfo newBaseTableInfo() {
CatalogIf<?> catalog = Mockito.mock(CatalogIf.class);
Mockito.when(catalog.getId()).thenReturn(1L);
Mockito.when(catalog.getName()).thenReturn("internal");

DatabaseIf<?> db = Mockito.mock(DatabaseIf.class);
Mockito.when(db.getId()).thenReturn(2L);
Mockito.when(db.getFullName()).thenReturn("partition_compensate_test");
Mockito.when(db.getCatalog()).thenReturn(catalog);

TableIf table = Mockito.mock(TableIf.class);
Mockito.when(table.getId()).thenReturn(3L);
Mockito.when(table.getName()).thenReturn("t");
Mockito.when(table.getDatabase()).thenReturn(db);

return new BaseTableInfo(table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv", "p2,external,hudi,external_remote,external_remot
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_one_partition "SELECT * FROM ${mvName} "

def explainOnePartition = sql """ explain ${mvSql} """
logger.info("explainOnePartition: " + explainOnePartition.toString())
assertTrue(explainOnePartition.toString().contains("VUNION"))
mv_rewrite_success(mvSql, mvName)
order_qt_refresh_one_partition_rewrite "${mvSql}"

mv_rewrite_success("${mvSql}", "${mvName}")
Expand All @@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv", "p2,external,hudi,external_remote,external_remot
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_auto "SELECT * FROM ${mvName} "

def explainAllPartition = sql """ explain ${mvSql}; """
logger.info("explainAllPartition: " + explainAllPartition.toString())
assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
mv_rewrite_success(mvSql, mvName)
order_qt_refresh_all_partition_rewrite "${mvSql}"

mv_rewrite_success("${mvSql}", "${mvName}")
Expand Down
Loading