Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,22 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.rules.expression.rules.MultiColumnBound;
import org.apache.doris.nereids.rules.expression.rules.PartitionItemToRange;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndId;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.RpcException;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Range;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.hadoop.util.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;

Expand Down Expand Up @@ -123,35 +115,10 @@ private SortedPartitionRanges<?> loadCache(
}

Map<?, PartitionItem> unsortedMap = table.getOriginPartitions(scan);
List<Entry<?, PartitionItem>> unsortedList = Lists.newArrayList(unsortedMap.entrySet());
List<PartitionItemAndRange<?>> sortedRanges = Lists.newArrayListWithCapacity(unsortedMap.size());
List<PartitionItemAndId<?>> defaultPartitions = Lists.newArrayList();
for (Entry<?, PartitionItem> entry : unsortedList) {
PartitionItem partitionItem = entry.getValue();
Object id = entry.getKey();
if (!partitionItem.isDefaultPartition()) {
List<Range<MultiColumnBound>> ranges = PartitionItemToRange.toRanges(partitionItem);
for (Range<MultiColumnBound> range : ranges) {
sortedRanges.add(new PartitionItemAndRange<>(id, partitionItem, range));
}
} else {
defaultPartitions.add(new PartitionItemAndId<>(id, partitionItem));
}
SortedPartitionRanges<?> sortedPartitionRanges = SortedPartitionRanges.build(unsortedMap);
if (sortedPartitionRanges == null) {
return null;
}

sortedRanges.sort((o1, o2) -> {
Range<MultiColumnBound> span1 = o1.range;
Range<MultiColumnBound> span2 = o2.range;
int result = span1.lowerEndpoint().compareTo(span2.lowerEndpoint());
if (result != 0) {
return result;
}
result = span1.upperEndpoint().compareTo(span2.upperEndpoint());
return result;
});
SortedPartitionRanges<?> sortedPartitionRanges = new SortedPartitionRanges(
sortedRanges, defaultPartitions
);
PartitionCacheContext context = new PartitionCacheContext(
table.getId(), table.getPartitionMetaVersion(scan), sortedPartitionRanges);
partitionCaches.put(key, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
Expand Down Expand Up @@ -453,6 +455,18 @@ public boolean supportInternalPartitionPruned() {
return false;
}

/**
* Get sorted partition ranges for binary search filtering.
* Subclasses can override this method to provide sorted partition ranges
* for efficient partition pruning.
*
* @param scan the catalog relation
* @return sorted partition ranges, or empty if not supported
*/
public Optional<SortedPartitionRanges<String>> getSortedPartitionRanges(CatalogRelation scan) {
return Optional.empty();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.statistics.AnalysisInfo;
Expand Down Expand Up @@ -360,6 +362,19 @@ public boolean supportInternalPartitionPruned() {
return getDlaType() == DLAType.HIVE || getDlaType() == DLAType.HUDI;
}

@Override
public Optional<SortedPartitionRanges<String>> getSortedPartitionRanges(CatalogRelation scan) {
if (getDlaType() != DLAType.HIVE) {
return Optional.empty();
}
if (CollectionUtils.isEmpty(this.getPartitionColumns())) {
return Optional.empty();
}
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = getHivePartitionValues(
MvccUtil.getSnapshotFromContext(this));
return hivePartitionValues.getSortedPartitionRanges();
}

public SelectedPartitions initHudiSelectedPartitions(Optional<TableSnapshot> tableSnapshot) {
if (getDlaType() != DLAType.HUDI) {
return SelectedPartitions.NOT_PRUNED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.ColumnBound;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
Expand All @@ -66,10 +65,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.Streams;
import com.google.common.collect.TreeRangeMap;
import lombok.Data;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -90,6 +86,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -258,7 +255,6 @@ private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) {
}
Map<Long, PartitionItem> idToPartitionItem = Maps.newHashMapWithExpectedSize(partitionNames.size());
BiMap<String, Long> partitionNameToIdMap = HashBiMap.create(partitionNames.size());
Map<Long, List<UniqueId>> idToUniqueIdsMap = Maps.newHashMapWithExpectedSize(partitionNames.size());
for (String partitionName : partitionNames) {
long partitionId = Util.genIdByName(catalog.getName(), nameMapping.getLocalDbName(),
nameMapping.getLocalTblName(), partitionName);
Expand All @@ -267,23 +263,8 @@ private HivePartitionValues loadPartitionValues(PartitionValueCacheKey key) {
partitionNameToIdMap.put(partitionName, partitionId);
}

Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = null;
Map<Range<PartitionKey>, UniqueId> rangeToId = null;
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null;
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = null;
if (key.types.size() > 1) {
// uidToPartitionRange and rangeToId are only used for multi-column partition
uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem, idToUniqueIdsMap);
rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
} else {
Preconditions.checkState(key.types.size() == 1, key.types);
// singleColumnRangeMap is only used for single-column partition
singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem, idToUniqueIdsMap);
singleUidToColumnRangeMap = ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap);
}
Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
return new HivePartitionValues(idToPartitionItem, uidToPartitionRange, rangeToId, singleColumnRangeMap,
partitionNameToIdMap, idToUniqueIdsMap, singleUidToColumnRangeMap, partitionValuesMap);
return new HivePartitionValues(idToPartitionItem, partitionNameToIdMap, partitionValuesMap);
}

@VisibleForTesting
Expand Down Expand Up @@ -614,7 +595,6 @@ public void addPartitionsCache(NameMapping nameMapping, List<String> partitionNa
HivePartitionValues copy = partitionValues.copy();
Map<Long, PartitionItem> idToPartitionItemBefore = copy.getIdToPartitionItem();
Map<String, Long> partitionNameToIdMapBefore = copy.getPartitionNameToIdMap();
Map<Long, List<UniqueId>> idToUniqueIdsMap = copy.getIdToUniqueIdsMap();
Map<Long, PartitionItem> idToPartitionItem = new HashMap<>();
String localDbName = nameMapping.getLocalDbName();
String localTblName = nameMapping.getLocalTblName();
Expand All @@ -632,28 +612,8 @@ public void addPartitionsCache(NameMapping nameMapping, List<String> partitionNa
Map<Long, List<String>> partitionValuesMapBefore = copy.getPartitionValuesMap();
Map<Long, List<String>> partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
partitionValuesMapBefore.putAll(partitionValuesMap);
if (key.types.size() > 1) {
Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore = copy.getUidToPartitionRange();
// uidToPartitionRange and rangeToId are only used for multi-column partition
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange = ListPartitionPrunerV2
.genUidToPartitionRange(idToPartitionItem, idToUniqueIdsMap);
uidToPartitionRangeBefore.putAll(uidToPartitionRange);
Map<Range<PartitionKey>, UniqueId> rangeToIdBefore = copy.getRangeToId();
Map<Range<PartitionKey>, UniqueId> rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
rangeToIdBefore.putAll(rangeToId);
} else {
Preconditions.checkState(key.types.size() == 1, key.types);
// singleColumnRangeMap is only used for single-column partition
RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore = copy.getSingleColumnRangeMap();
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = ListPartitionPrunerV2
.genSingleColumnRangeMap(idToPartitionItem, idToUniqueIdsMap);
singleColumnRangeMapBefore.putAll(singleColumnRangeMap);
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore = copy
.getSingleUidToColumnRangeMap();
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap = ListPartitionPrunerV2
.genSingleUidToColumnRange(singleColumnRangeMap);
singleUidToColumnRangeMapBefore.putAll(singleUidToColumnRangeMap);
}
// Rebuild sorted partition ranges after adding partitions
copy.rebuildSortedPartitionRanges();
HivePartitionValues partitionValuesCur = partitionValuesCache.getIfPresent(key);
if (partitionValuesCur == partitionValues) {
partitionValuesCache.put(key, copy);
Expand All @@ -671,11 +631,6 @@ public void dropPartitionsCache(ExternalTable dorisTable, List<String> partition
HivePartitionValues copy = partitionValues.copy();
Map<String, Long> partitionNameToIdMapBefore = copy.getPartitionNameToIdMap();
Map<Long, PartitionItem> idToPartitionItemBefore = copy.getIdToPartitionItem();
Map<Long, List<UniqueId>> idToUniqueIdsMapBefore = copy.getIdToUniqueIdsMap();
Map<UniqueId, Range<PartitionKey>> uidToPartitionRangeBefore = copy.getUidToPartitionRange();
Map<Range<PartitionKey>, UniqueId> rangeToIdBefore = copy.getRangeToId();
RangeMap<ColumnBound, UniqueId> singleColumnRangeMapBefore = copy.getSingleColumnRangeMap();
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMapBefore = copy.getSingleUidToColumnRangeMap();
Map<Long, List<String>> partitionValuesMap = copy.getPartitionValuesMap();
for (String partitionName : partitionNames) {
if (!partitionNameToIdMapBefore.containsKey(partitionName)) {
Expand All @@ -686,27 +641,13 @@ public void dropPartitionsCache(ExternalTable dorisTable, List<String> partition
Long partitionId = partitionNameToIdMapBefore.remove(partitionName);
idToPartitionItemBefore.remove(partitionId);
partitionValuesMap.remove(partitionId);
List<UniqueId> uniqueIds = idToUniqueIdsMapBefore.remove(partitionId);
for (UniqueId uniqueId : uniqueIds) {
if (uidToPartitionRangeBefore != null) {
Range<PartitionKey> range = uidToPartitionRangeBefore.remove(uniqueId);
if (range != null) {
rangeToIdBefore.remove(range);
}
}

if (singleUidToColumnRangeMapBefore != null) {
Range<ColumnBound> range = singleUidToColumnRangeMapBefore.remove(uniqueId);
if (range != null) {
singleColumnRangeMapBefore.remove(range);
}
}
}

if (invalidPartitionCache) {
invalidatePartitionCache(dorisTable, partitionName);
}
}
// Rebuild sorted partition ranges after dropping partitions
copy.rebuildSortedPartitionRanges();
HivePartitionValues partitionValuesCur = partitionValuesCache.getIfPresent(key);
if (partitionValuesCur == partitionValues) {
partitionValuesCache.put(key, copy);
Expand Down Expand Up @@ -981,7 +922,7 @@ public boolean equals(Object obj) {
return dummyKey == ((FileCacheKey) obj).dummyKey;
}
return location.equals(((FileCacheKey) obj).location)
&& Objects.equals(partitionValues, ((FileCacheKey) obj).partitionValues);
&& Objects.equals(partitionValues, ((FileCacheKey) obj).partitionValues);
}

boolean isSameTable(long id) {
Expand Down Expand Up @@ -1079,54 +1020,69 @@ public static class HiveFileStatus {
@Data
public static class HivePartitionValues {
private BiMap<String, Long> partitionNameToIdMap;
private Map<Long, List<UniqueId>> idToUniqueIdsMap;
private Map<Long, PartitionItem> idToPartitionItem;
private Map<Long, List<String>> partitionValuesMap;
//multi pair
private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange;
private Map<Range<PartitionKey>, UniqueId> rangeToId;
//single pair
private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap;
private Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap;

// Sorted partition ranges for binary search filtering.
// Built at construction time, shares the same lifecycle with HivePartitionValues.
private SortedPartitionRanges<String> sortedPartitionRanges;

public HivePartitionValues() {
}

public HivePartitionValues(Map<Long, PartitionItem> idToPartitionItem,
Map<UniqueId, Range<PartitionKey>> uidToPartitionRange,
Map<Range<PartitionKey>, UniqueId> rangeToId,
RangeMap<ColumnBound, UniqueId> singleColumnRangeMap,
BiMap<String, Long> partitionNameToIdMap,
Map<Long, List<UniqueId>> idToUniqueIdsMap,
Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap,
Map<Long, List<String>> partitionValuesMap) {
this.idToPartitionItem = idToPartitionItem;
this.uidToPartitionRange = uidToPartitionRange;
this.rangeToId = rangeToId;
this.singleColumnRangeMap = singleColumnRangeMap;
this.partitionNameToIdMap = partitionNameToIdMap;
this.idToUniqueIdsMap = idToUniqueIdsMap;
this.singleUidToColumnRangeMap = singleUidToColumnRangeMap;
this.partitionValuesMap = partitionValuesMap;
this.sortedPartitionRanges = buildSortedPartitionRanges();
}

/**
* Create a copy for incremental updates (add/drop partitions).
* The sortedPartitionRanges will be rebuilt after the caller modifies the partition data.
*/
public HivePartitionValues copy() {
HivePartitionValues copy = new HivePartitionValues();
copy.setPartitionNameToIdMap(partitionNameToIdMap == null ? null : HashBiMap.create(partitionNameToIdMap));
copy.setIdToUniqueIdsMap(idToUniqueIdsMap == null ? null : Maps.newHashMap(idToUniqueIdsMap));
copy.setIdToPartitionItem(idToPartitionItem == null ? null : Maps.newHashMap(idToPartitionItem));
copy.setPartitionValuesMap(partitionValuesMap == null ? null : Maps.newHashMap(partitionValuesMap));
copy.setUidToPartitionRange(uidToPartitionRange == null ? null : Maps.newHashMap(uidToPartitionRange));
copy.setRangeToId(rangeToId == null ? null : Maps.newHashMap(rangeToId));
copy.setSingleUidToColumnRangeMap(
singleUidToColumnRangeMap == null ? null : Maps.newHashMap(singleUidToColumnRangeMap));
if (singleColumnRangeMap != null) {
RangeMap<ColumnBound, UniqueId> copySingleColumnRangeMap = TreeRangeMap.create();
copySingleColumnRangeMap.putAll(singleColumnRangeMap);
copy.setSingleColumnRangeMap(copySingleColumnRangeMap);
}
// sortedPartitionRanges is not copied here, caller should call rebuildSortedPartitionRanges()
// after modifying partition data
return copy;
}

/**
* Rebuild sorted partition ranges after incremental updates.
* Should be called after add/drop partitions.
*/
public void rebuildSortedPartitionRanges() {
this.sortedPartitionRanges = buildSortedPartitionRanges();
}

/**
* Get sorted partition ranges for binary search filtering.
*/
public Optional<SortedPartitionRanges<String>> getSortedPartitionRanges() {
return Optional.ofNullable(sortedPartitionRanges);
}

private SortedPartitionRanges<String> buildSortedPartitionRanges() {
if (partitionNameToIdMap == null || partitionNameToIdMap.isEmpty()) {
return null;
}

// Build name to partition item map for SortedPartitionRanges.buildFrom
BiMap<Long, String> idToName = partitionNameToIdMap.inverse();
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
for (Map.Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
String partitionName = idToName.get(entry.getKey());
nameToPartitionItem.put(partitionName, entry.getValue());
}

return SortedPartitionRanges.build(nameToPartitionItem);
}
}

/**
Expand Down
Loading
Loading