From d768a4008bb10261ca8f5ffeeb9780b5d5ce4031 Mon Sep 17 00:00:00 2001 From: gaoxin Date: Wed, 26 Jun 2024 11:05:17 +0800 Subject: [PATCH 1/2] [opt](split) add max wait time of getting splits --- .../doris/datasource/FileQueryScanNode.java | 3 ++- .../apache/doris/datasource/SplitSource.java | 18 ++++++++++-------- .../org/apache/doris/qe/SessionVariable.java | 17 +++++++++++++++++ 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index ff61c5ee13de20..463cf629b48aaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -337,10 +337,11 @@ public void createScanRangeLocations() throws UserException { locationType = getLocationType(fileSplit.getPath().toString()); } totalFileSize = fileSplit.getLength() * inputSplitsNum; + long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime(); // Not accurate, only used to estimate concurrency. int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends(); for (Backend backend : backendPolicy.getBackends()) { - SplitSource splitSource = new SplitSource(backend, splitAssignment); + SplitSource splitSource = new SplitSource(backend, splitAssignment, maxWaitTime); splitSources.add(splitSource); Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource); TScanRangeLocations curLocations = newLocations(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java index dce135292ec8a9..8515e686f36bd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitSource.java @@ -44,17 +44,18 @@ public class SplitSource { private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0); private static final long WAIT_TIME_OUT = 100; // 100ms - private static final long MAX_WAIT_TIME_OUT = 500; // 500ms private final long uniqueId; private final Backend backend; private final SplitAssignment splitAssignment; private final AtomicBoolean isLastBatch; + private final long maxWaitTime; - public SplitSource(Backend backend, SplitAssignment splitAssignment) { + public SplitSource(Backend backend, SplitAssignment splitAssignment, long maxWaitTime) { this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement(); this.backend = backend; this.splitAssignment = splitAssignment; + this.maxWaitTime = maxWaitTime; this.isLastBatch = new AtomicBoolean(false); splitAssignment.registerSource(uniqueId); } @@ -71,7 +72,7 @@ public List getNextBatch(int maxBatchSize) throws UserExcep return Collections.emptyList(); } List scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize); - long maxTimeOut = 0; + long startTime = System.currentTimeMillis(); while (scanRanges.size() < maxBatchSize) { BlockingQueue> splits = splitAssignment.getAssignedSplits(backend); if (splits == null) { @@ -81,18 +82,19 @@ public List getNextBatch(int maxBatchSize) throws UserExcep while (scanRanges.size() < maxBatchSize) { try { Collection splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS); + if (splitCollection != null) { + scanRanges.addAll(splitCollection); + } + if (!scanRanges.isEmpty() && System.currentTimeMillis() - startTime > maxWaitTime) { + return scanRanges; + } if (splitCollection == null) { - maxTimeOut += WAIT_TIME_OUT; break; } - scanRanges.addAll(splitCollection); } catch (InterruptedException e) { throw new UserException("Failed to get next batch of splits", e); } } - if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) { - break; - } } return scanRanges; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 534b4f3386a50d..94568f01cd5cff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -417,6 +417,8 @@ public class SessionVariable implements Serializable, Writable { public static final String NUM_PARTITIONS_IN_BATCH_MODE = "num_partitions_in_batch_mode"; + public static final String FETCH_SPLITS_MAX_WAIT_TIME = "fetch_splits_max_wait_time"; + /** * use insert stmt as the unified backend for all loads */ @@ -1461,6 +1463,13 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { needForward = true) public int numPartitionsInBatchMode = 1024; + @VariableMgr.VarAttr( + name = FETCH_SPLITS_MAX_WAIT_TIME, + description = {"batch方式中BE获取splits的最大等待时间", + "The max wait time of getting splits in batch mode."}, + needForward = true) + public long fetchSplitsMaxWaitTime = 4000; + @VariableMgr.VarAttr( name = ENABLE_PARQUET_LAZY_MAT, description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。", @@ -2713,6 +2722,14 @@ public void setNumSplitsInBatchMode(int numPartitionsInBatchMode) { this.numPartitionsInBatchMode = numPartitionsInBatchMode; } + public long getFetchSplitsMaxWaitTime() { + return fetchSplitsMaxWaitTime; + } + + public void setFetchSplitsMaxWaitTime(long fetchSplitsMaxWaitTime) { + this.fetchSplitsMaxWaitTime = fetchSplitsMaxWaitTime; + } + public boolean isEnableParquetLazyMat() { return enableParquetLazyMat; } From d60593c30be15358540fcbe85cca650b72e7532a Mon Sep 17 00:00:00 2001 From: gaoxin Date: Wed, 26 Jun 2024 12:54:42 +0800 Subject: [PATCH 2/2] [opt](split) add session variable to control the list partition style --- .../doris/datasource/TablePartitionValues.java | 15 +++++++++++++++ .../datasource/hive/source/HiveScanNode.java | 17 +++++++++++++++-- .../datasource/hudi/source/HudiScanNode.java | 8 ++++---- .../org/apache/doris/qe/SessionVariable.java | 17 +++++++++++++++++ 4 files changed, 51 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java index 60765d705d554d..f3c742ab3e88b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java @@ -49,6 +49,21 @@ @Data public class TablePartitionValues { + public enum PartitionOrdering { + NATURAL, + REVERSE, + SHUFFLE; + + public static PartitionOrdering parse(String ordering) { + for (PartitionOrdering order : PartitionOrdering.values()) { + if (order.name().equalsIgnoreCase(ordering)) { + return order; + } + } + return null; + } + } + public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__"; private final ReadWriteLock readWriteLock; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index fae3e3e84477da..a8ff69aecdb76b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -34,6 +34,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.TablePartitionValues.PartitionOrdering; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -57,6 +58,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import lombok.Setter; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -137,6 +139,17 @@ protected void doInitialize() throws UserException { } } + protected List orderingPartitions(List partitions) { + PartitionOrdering ordering = PartitionOrdering.parse( + ConnectContext.get().getSessionVariable().getPartitionOrdering()); + if (ordering == PartitionOrdering.REVERSE) { + return Ordering.natural().onResultOf(HivePartition::getPath).reverse().sortedCopy(partitions); + } else if (ordering == PartitionOrdering.SHUFFLE) { + return Ordering.arbitrary().onResultOf(HivePartition::getPath).sortedCopy(partitions); + } + return partitions; + } + protected List getPartitions() throws AnalysisException { List resPartitions = Lists.newArrayList(); long start = System.currentTimeMillis(); @@ -211,7 +224,7 @@ public List getSplits() throws UserException { long start = System.currentTimeMillis(); try { if (!partitionInit) { - prunedPartitions = getPartitions(); + prunedPartitions = orderingPartitions(getPartitions()); partitionInit = true; } HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() @@ -289,7 +302,7 @@ public void startSplit() { public boolean isBatchMode() { if (!partitionInit) { try { - prunedPartitions = getPartitions(); + prunedPartitions = orderingPartitions(getPartitions()); } catch (Exception e) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 0fc6b35511a198..9be10ad220592a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -386,9 +386,9 @@ public List getSplits() throws UserException { return getIncrementalSplits(); } if (!partitionInit) { - prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs( + prunedPartitions = orderingPartitions(HiveMetaStoreClientHelper.ugiDoAs( HiveMetaStoreClientHelper.getConfiguration(hmsTable), - () -> getPrunedPartitions(hudiClient, snapshotTimestamp)); + () -> getPrunedPartitions(hudiClient, snapshotTimestamp))); partitionInit = true; } List splits = Collections.synchronizedList(new ArrayList<>()); @@ -448,9 +448,9 @@ public boolean isBatchMode() { } if (!partitionInit) { // Non partition table will get one dummy partition - prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs( + prunedPartitions = orderingPartitions(HiveMetaStoreClientHelper.ugiDoAs( HiveMetaStoreClientHelper.getConfiguration(hmsTable), - () -> getPrunedPartitions(hudiClient, snapshotTimestamp)); + () -> getPrunedPartitions(hudiClient, snapshotTimestamp))); partitionInit = true; } int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 94568f01cd5cff..ac4c1cbdeb9ec8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -419,6 +419,8 @@ public class SessionVariable implements Serializable, Writable { public static final String FETCH_SPLITS_MAX_WAIT_TIME = "fetch_splits_max_wait_time"; + public static final String PARTITION_ORDERING = "partition_ordering"; + /** * use insert stmt as the unified backend for all loads */ @@ -1470,6 +1472,13 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { needForward = true) public long fetchSplitsMaxWaitTime = 4000; + @VariableMgr.VarAttr( + name = PARTITION_ORDERING, + description = {"list partition的排序方式", + "Ordering style of list partition."}, + needForward = true) + public String partitionOrdering = "natural"; + @VariableMgr.VarAttr( name = ENABLE_PARQUET_LAZY_MAT, description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。", @@ -2730,6 +2739,14 @@ public void setFetchSplitsMaxWaitTime(long fetchSplitsMaxWaitTime) { this.fetchSplitsMaxWaitTime = fetchSplitsMaxWaitTime; } + public String getPartitionOrdering() { + return partitionOrdering; + } + + public void setPartitionOrdering(String partitionOrdering) { + this.partitionOrdering = partitionOrdering; + } + public boolean isEnableParquetLazyMat() { return enableParquetLazyMat; }