Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ protected void doInitialize() throws UserException {
table.getName()));
}
}
computeColumnsFilter();
initBackendPolicy();
initSchemaParams();
}
Expand Down Expand Up @@ -213,13 +212,23 @@ protected void doFinalize() throws UserException {
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setFinalizeScanNodeStartTime();
}
convertPredicate();
createScanRangeLocations();
updateRequiredSlots();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setFinalizeScanNodeFinishTime();
}
}

/**
* Used as a predicate to convert conjuncts into corresponding data sources.
* All predicate conversions from different data sources should override this method.
* and this method must be called in finalize,
* because in nereids planner, conjuncts are only generated in the finalize stage.
*/
protected void convertPredicate() {
}

private void setColumnPositionMapping()
throws UserException {
TableIf tbl = getTargetTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ public void init() throws UserException {

@Override
public void finalize(Analyzer analyzer) throws UserException {
buildQuery();
doFinalize();
}

@Override
public void finalizeForNereids() throws UserException {
buildQuery();
doFinalize();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
Expand Down Expand Up @@ -288,6 +289,21 @@ public List<Column> getPartitionColumns() {
.orElse(Collections.emptyList());
}

public SelectedPartitions getAllPartitions() {
if (CollectionUtils.isEmpty(this.getPartitionColumns())) {
return SelectedPartitions.NOT_PRUNED;
}

HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) this.getCatalog());
List<Type> partitionColumnTypes = this.getPartitionColumnTypes();
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
this.getDbName(), this.getName(), partitionColumnTypes);
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();

return new SelectedPartitions(idToPartitionItem.size(), idToPartitionItem, false);
}

public boolean isHiveTransactionalTable() {
return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable)
&& isSupportedTransactionalFileFormat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.doris.datasource.hive.HiveTransaction;
import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
Expand Down Expand Up @@ -142,46 +141,16 @@ protected void doInitialize() throws UserException {

protected List<HivePartition> getPartitions() throws AnalysisException {
List<HivePartition> resPartitions = Lists.newArrayList();
long start = System.currentTimeMillis();
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
if (!partitionColumnTypes.isEmpty()) {
// partitioned table
boolean isPartitionPruned = selectedPartitions != null && selectedPartitions.isPruned;
Collection<PartitionItem> partitionItems;
if (!isPartitionPruned) {
// partitionItems is null means that the partition is not pruned by Nereids,
// so need to prune partitions here by legacy ListPartitionPrunerV2.
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
hmsTable.getDbName(), hmsTable.getName(), partitionColumnTypes);
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
this.totalPartitionNum = idToPartitionItem.size();
if (!conjuncts.isEmpty()) {
ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem,
hmsTable.getPartitionColumns(), columnNameToRange,
hivePartitionValues.getUidToPartitionRange(),
hivePartitionValues.getRangeToId(),
hivePartitionValues.getSingleColumnRangeMap(),
true);
Collection<Long> filteredPartitionIds = pruner.prune();
if (LOG.isDebugEnabled()) {
LOG.debug("hive partition fetch and prune for table {}.{} cost: {} ms",
hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start));
}
partitionItems = Lists.newArrayListWithCapacity(filteredPartitionIds.size());
for (Long id : filteredPartitionIds) {
partitionItems.add(idToPartitionItem.get(id));
}
} else {
partitionItems = idToPartitionItem.values();
}
} else {
// partitions has benn pruned by Nereids, in PruneFileScanPartition,
// so just use the selected partitions.
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
partitionItems = selectedPartitions.selectedPartitions.values();
}
// partitions has benn pruned by Nereids, in PruneFileScanPartition,
// so just use the selected partitions.
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
partitionItems = selectedPartitions.selectedPartitions.values();
Preconditions.checkNotNull(partitionItems);
this.selectedPartitionNum = partitionItems.size();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {

private final MaxComputeExternalTable table;
TableBatchReadSession tableBatchReadSession;
private Predicate filterPredicate;

public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE, needCheckColumnPriv);
Expand Down Expand Up @@ -115,8 +116,6 @@ private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeS
}

void createTableBatchReadSession() throws UserException {
Predicate filterPredicate = convertPredicate();

List<String> requiredPartitionColumns = new ArrayList<>();
List<String> orderedRequiredDataColumns = new ArrayList<>();

Expand Down Expand Up @@ -164,9 +163,10 @@ void createTableBatchReadSession() throws UserException {

}

protected Predicate convertPredicate() {
@Override
protected void convertPredicate() {
if (conjuncts.isEmpty()) {
return Predicate.NO_PREDICATE;
this.filterPredicate = Predicate.NO_PREDICATE;
}

List<Predicate> odpsPredicates = new ArrayList<>();
Expand All @@ -180,9 +180,9 @@ protected Predicate convertPredicate() {
}

if (odpsPredicates.isEmpty()) {
return Predicate.NO_PREDICATE;
this.filterPredicate = Predicate.NO_PREDICATE;
} else if (odpsPredicates.size() == 1) {
return odpsPredicates.get(0);
this.filterPredicate = odpsPredicates.get(0);
} else {
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate
filterPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(
Expand All @@ -191,7 +191,7 @@ protected Predicate convertPredicate() {
for (Predicate odpsPredicate : odpsPredicates) {
filterPredicate.addPredicate(odpsPredicate);
}
return filterPredicate;
this.filterPredicate = filterPredicate;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ protected void doInitialize() throws UserException {
super.doInitialize();
source = new PaimonSource(desc);
Preconditions.checkNotNull(source);
}

@Override
protected void convertPredicate() {
PaimonPredicateConverter paimonPredicateConverter = new PaimonPredicateConverter(
source.getPaimonTable().rowType());
predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean needC
protected void doInitialize() throws UserException {
super.doInitialize();
source = new TrinoConnectorSource(desc);
convertPredicate();
}

protected void convertPredicate() throws UserException {
@Override
protected void convertPredicate() {
if (conjuncts.isEmpty()) {
constraint = Constraint.alwaysTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,6 @@ public PlanFragment visitPhysicalEsScan(PhysicalEsScan esScan, PlanTranslatorCon
EsScanNode esScanNode = new EsScanNode(context.nextPlanNodeId(), tupleDescriptor,
table instanceof EsExternalTable);
esScanNode.setNereidsId(esScan.getId());
esScanNode.addConjuncts(translateToLegacyConjuncts(esScan.getConjuncts()));
Utils.execWithUncheckedException(esScanNode::init);
context.addScanNode(esScanNode, esScan);
context.getRuntimeTranslator().ifPresent(
Expand Down Expand Up @@ -666,7 +665,6 @@ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileSca
ScanNode scanNode,
ExternalTable table, TupleDescriptor tupleDescriptor) {
scanNode.setNereidsId(fileScan.getId());
scanNode.addConjuncts(translateToLegacyConjuncts(fileScan.getConjuncts()));
scanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(fileScan.getRelationId()));

TableName tableName = new TableName(null, "", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@
import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderApply;
import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderLimit;
import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderTopN;
import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoEsScan;
import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoJdbcScan;
import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoOdbcScan;
import org.apache.doris.nereids.rules.rewrite.PushCountIntoUnionAll;
import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoinOnPkFk;
Expand Down Expand Up @@ -410,10 +407,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
topDown(
new PruneOlapScanPartition(),
new PruneEmptyPartition(),
new PruneFileScanPartition(),
new PushConjunctsIntoJdbcScan(),
new PushConjunctsIntoOdbcScan(),
new PushConjunctsIntoEsScan()
new PruneFileScanPartition()
)
),
topic("MV optimization",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio
return hudiScan;
} else {
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
qualifierWithoutTableName, unboundRelation.getTableSample(),
qualifierWithoutTableName,
((HMSExternalTable) table).getAllPartitions(),
unboundRelation.getTableSample(),
unboundRelation.getTableSnapshot());
}
case ICEBERG_EXTERNAL_TABLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ public Rule build() {
esScan.getQualifier(),
DistributionSpecAny.INSTANCE,
Optional.empty(),
esScan.getLogicalProperties(),
esScan.getConjuncts())
esScan.getLogicalProperties())
).toRule(RuleType.LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public Rule build() {
DistributionSpecAny.INSTANCE,
Optional.empty(),
fileScan.getLogicalProperties(),
fileScan.getConjuncts(),
fileScan.getSelectedPartitions(),
fileScan.getTableSample(),
fileScan.getTableSnapshot())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public Rule build() {
DistributionSpecAny.INSTANCE,
Optional.empty(),
fileScan.getLogicalProperties(),
fileScan.getConjuncts(),
fileScan.getSelectedPartitions(),
fileScan.getTableSample(),
fileScan.getTableSnapshot(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ public Rule build() {
jdbcScan.getTable(),
jdbcScan.getQualifier(),
Optional.empty(),
jdbcScan.getLogicalProperties(),
jdbcScan.getConjuncts())
jdbcScan.getLogicalProperties())
).toRule(RuleType.LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ public Rule build() {
odbcScan.getTable(),
odbcScan.getQualifier(),
Optional.empty(),
odbcScan.getLogicalProperties(),
odbcScan.getConjuncts())
odbcScan.getLogicalProperties())
).toRule(RuleType.LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalExternalRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
Expand Down Expand Up @@ -277,17 +276,6 @@ public Plan visitLogicalCTEConsumer(LogicalCTEConsumer cteConsumer, Map<ExprId,
return cteConsumer.withTwoMaps(consumerToProducerOutputMap, producerToConsumerOutputMap);
}

@Override
public Plan visitLogicalExternalRelation(LogicalExternalRelation relation, Map<ExprId, Slot> replaceMap) {
if (!relation.getConjuncts().isEmpty()) {
relation.getOutputSet().forEach(s -> replaceMap.put(s.getExprId(), s));
Set<Expression> conjuncts = updateExpressions(relation.getConjuncts(), replaceMap);
return relation.withConjuncts(conjuncts).recomputeLogicalProperties();
} else {
return relation;
}
}

private <T extends Expression> T updateExpression(T input, Map<ExprId, Slot> replaceMap) {
return (T) input.rewriteDownShortCircuit(e -> e.accept(SlotReferenceReplacer.INSTANCE, replaceMap));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.doris.nereids.rules.rewrite;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
Expand Down Expand Up @@ -72,8 +69,7 @@ public Rule build() {
selectedPartitions = new SelectedPartitions(0, ImmutableMap.of(), true);
}

LogicalFileScan rewrittenScan = scan.withConjuncts(filter.getConjuncts())
.withSelectedPartitions(selectedPartitions);
LogicalFileScan rewrittenScan = scan.withSelectedPartitions(selectedPartitions);
return new LogicalFilter<>(filter.getConjuncts(), rewrittenScan);
}).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE);
}
Expand All @@ -95,11 +91,7 @@ private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl,
.map(column -> scanOutput.get(column.getName().toLowerCase()))
.collect(Collectors.toList());

HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hiveTbl.getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
hiveTbl.getDbName(), hiveTbl.getName(), hiveTbl.getPartitionColumnTypes());
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
Map<Long, PartitionItem> idToPartitionItem = scan.getSelectedPartitions().selectedPartitions;
List<Long> prunedPartitions = new ArrayList<>(PartitionPruner.prune(
partitionSlots, filter.getPredicate(), idToPartitionItem, ctx, PartitionTableType.HIVE));

Expand Down
Loading