From 20f3cfebb21e2c0506b9187ecb71ff3574cb64f3 Mon Sep 17 00:00:00 2001 From: wangtao Date: Mon, 13 Mar 2023 20:02:56 +0800 Subject: [PATCH 1/2] support partition ordered by PartitionWeight for OptimizePlan --- .../ams/server/model/TableQuotaInfo.java | 3 +- .../optimize/AbstractArcticOptimizePlan.java | 16 ++-- .../optimize/AbstractIcebergOptimizePlan.java | 5 -- .../server/optimize/AbstractOptimizePlan.java | 65 ++++++++++---- .../ams/server/optimize/FullOptimizePlan.java | 65 +++++++++++--- .../optimize/IcebergFullOptimizePlan.java | 32 ++++++- .../optimize/IcebergMinorOptimizePlan.java | 29 +++++- .../server/optimize/MajorOptimizePlan.java | 88 +++++++++++++------ .../server/optimize/MinorOptimizePlan.java | 75 ++++++++++++---- .../optimize/SupportHiveFullOptimizePlan.java | 3 +- .../SupportHiveMajorOptimizePlan.java | 39 +++----- .../arctic/ams/server/AmsTestBase.java | 1 - .../optimize/TestIcebergFullOptimizePlan.java | 24 +++++ .../TestIcebergMinorOptimizePlan.java | 25 ++++++ .../optimize/TestMajorOptimizePlan.java | 44 ++++++++++ .../optimize/TestMinorOptimizePlan.java | 22 +++++ 16 files changed, 416 insertions(+), 120 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/TableQuotaInfo.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/TableQuotaInfo.java index 4e76a3d4fe..d956a1675f 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/TableQuotaInfo.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/TableQuotaInfo.java @@ -19,7 +19,6 @@ package com.netease.arctic.ams.server.model; import com.netease.arctic.table.TableIdentifier; -import org.jetbrains.annotations.NotNull; import java.math.BigDecimal; @@ -59,7 +58,7 @@ public void setTargetQuota(Double targetQuota) { } @Override - public int compareTo(@NotNull TableQuotaInfo o) { + public int compareTo(TableQuotaInfo o) { if (quota.compareTo(o.getQuota()) != 0) { return quota.compareTo(o.getQuota()); } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java index 890af62146..cbcb25814f 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java @@ -80,8 +80,8 @@ public abstract class AbstractArcticOptimizePlan extends AbstractOptimizePlan { public AbstractArcticOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime tableOptimizeRuntime, List> changeFiles, - List baseFileScanTasks, - int queueId, long currentTime, long changeSnapshotId, long baseSnapshotId) { + List baseFileScanTasks, + int queueId, long currentTime, long changeSnapshotId, long baseSnapshotId) { super(arcticTable, tableOptimizeRuntime, queueId, currentTime, baseSnapshotId); this.baseFileScanTasks = baseFileScanTasks; this.changeFiles = changeFiles; @@ -89,11 +89,6 @@ public AbstractArcticOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime this.currentChangeSnapshotId = changeSnapshotId; } - @Override - protected boolean limitFileCnt() { - return false; - } - protected BasicOptimizeTask buildOptimizeTask(@Nullable List sourceNodes, List insertFiles, List deleteFiles, @@ -331,6 +326,13 @@ protected List getInsertFilesFromFileTree(String partition) { return insertFiles; } + /** + * Check a base file should optimize + * + * @param baseFile - base file + * @param partition - partition + * @return true if the file should optimize + */ protected abstract boolean baseFileShouldOptimize(DataFile baseFile, String partition); protected boolean hasFileToOptimize() { diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractIcebergOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractIcebergOptimizePlan.java index 37505512b0..e2f03b5062 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractIcebergOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractIcebergOptimizePlan.java @@ -64,11 +64,6 @@ public AbstractIcebergOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime this.fileScanTasks = fileScanTasks; } - @Override - protected boolean limitFileCnt() { - return true; - } - protected List filterRepeatFileScanTask(Collection fileScanTasks) { Set dataFilesPath = new HashSet<>(); List finalFileScanTasks = new ArrayList<>(); diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java index e95094ea62..b46df213d2 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java @@ -116,7 +116,7 @@ private boolean reachMaxFileCnt(List newTasks) { newFileCnt += taskFileCnt; } this.collectFileCnt += newFileCnt; - return limitFileCnt() && this.collectFileCnt >= getMaxFileCntLimit(); + return this.collectFileCnt >= getMaxFileCntLimit(); } private OptimizePlanResult buildOptimizePlanResult(List optimizeTasks) { @@ -127,9 +127,9 @@ private OptimizePlanResult buildOptimizePlanResult(List optim protected List getPartitionsToOptimizeInOrder() { List partitionNeedOptimizedInOrder = allPartitions.stream() .filter(this::partitionNeedPlan) - .map(partition -> new PartitionWeight(partition, getPartitionWeight(partition))) + .map(partition -> new PartitionWeightWrapper(partition, getPartitionWeight(partition))) .sorted() - .map(PartitionWeight::getPartition) + .map(PartitionWeightWrapper::getPartition) .collect(Collectors.toList()); if (partitionNeedOptimizedInOrder.size() > 0) { LOG.info("{} filter partitions to optimize, partition count {}", tableId(), @@ -140,12 +140,17 @@ protected List getPartitionsToOptimizeInOrder() { return partitionNeedOptimizedInOrder; } - protected long getPartitionWeight(String partitionToPath) { - return 0; - } + /** + * Get the partition weight. + * The optimizing order of partition is decide by partition weight, and the larger weight should be ahead. + * + * @param partition - partition + * @return return partition weight + */ + protected abstract PartitionWeight getPartitionWeight(String partition); private long getMaxFileCntLimit() { - Map properties = arcticTable.asUnkeyedTable().properties(); + Map properties = arcticTable.properties(); return CompatiblePropertyUtil.propertyAsInt(properties, TableProperties.SELF_OPTIMIZING_MAX_FILE_CNT, TableProperties.SELF_OPTIMIZING_MAX_FILE_CNT_DEFAULT); } @@ -163,11 +168,15 @@ protected long getSmallFileSize(Map properties) { } } - private static class PartitionWeight implements Comparable { + protected interface PartitionWeight extends Comparable { + + } + + protected static class PartitionWeightWrapper implements Comparable { private final String partition; - private final long weight; + private final PartitionWeight weight; - public PartitionWeight(String partition, long weight) { + public PartitionWeightWrapper(String partition, PartitionWeight weight) { this.partition = partition; this.weight = weight; } @@ -176,7 +185,7 @@ public String getPartition() { return partition; } - public long getWeight() { + public PartitionWeight getWeight() { return weight; } @@ -186,8 +195,8 @@ public String toString() { } @Override - public int compareTo(PartitionWeight o) { - return Long.compare(o.weight, this.weight); + public int compareTo(PartitionWeightWrapper o) { + return this.weight.compareTo(o.weight); } } @@ -204,11 +213,35 @@ protected int getCollectFileCnt() { } /** - * this optimizing plan should limit the files by "self-optimizing.max-file-count" + * Check this partition should optimize because of interval. + * + * @param partition - partition + * @return true if the partition should optimize + */ + protected boolean checkOptimizeInterval(String partition) { + long optimizeInterval = getMaxOptimizeInterval(); + + if (optimizeInterval < 0) { + return false; + } + + return this.currentTime - getLatestOptimizeTime(partition) >= optimizeInterval; + } + + /** + * Get max optimize interval config of specific optimize type. + * + * @return optimize interval + */ + protected abstract long getMaxOptimizeInterval(); + + /** + * Get latest optimize time of specific optimize type. * - * @return true for limit file cnt + * @param partition - partition + * @return time of latest optimize, may be -1 */ - protected abstract boolean limitFileCnt(); + protected abstract long getLatestOptimizeTime(String partition); /** * check whether partition need to plan diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java index 30b3f246c0..0693546686 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.OptionalLong; @@ -48,6 +49,8 @@ public class FullOptimizePlan extends AbstractArcticOptimizePlan { private static final Logger LOG = LoggerFactory.getLogger(FullOptimizePlan.class); + // cache partition delete file size + private final Map partitionDeleteFileSize = new HashMap<>(); public FullOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime tableOptimizeRuntime, List baseFileScanTasks, int queueId, long currentTime, @@ -58,15 +61,13 @@ public FullOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime tableOptim @Override protected boolean partitionNeedPlan(String partitionToPath) { - long current = System.currentTimeMillis(); - // check position delete file total size if (checkPosDeleteTotalSize(partitionToPath)) { return true; } // check full optimize interval - if (checkFullOptimizeInterval(current, partitionToPath)) { + if (checkOptimizeInterval(partitionToPath)) { return true; } @@ -74,6 +75,33 @@ protected boolean partitionNeedPlan(String partitionToPath) { return false; } + @Override + protected PartitionWeight getPartitionWeight(String partition) { + return new FullPartitionWeight(checkOptimizeInterval(partition), getPosDeleteFileSize(partition)); + } + + protected static class FullPartitionWeight implements PartitionWeight { + private final boolean reachInterval; + + private final long deleteFileSize; + + public FullPartitionWeight(boolean reachInterval, long deleteFileSize) { + this.reachInterval = reachInterval; + this.deleteFileSize = deleteFileSize; + } + + @Override + public int compareTo(PartitionWeight o) { + FullPartitionWeight that = (FullPartitionWeight) o; + int compare = Boolean.compare(that.reachInterval, this.reachInterval); + if (compare != 0) { + return compare; + } + return Long.compare(that.deleteFileSize, this.deleteFileSize); + } + } + + @Override protected OptimizeType getOptimizeType() { return OptimizeType.FullMajor; @@ -97,11 +125,10 @@ protected boolean baseFileShouldOptimize(DataFile baseFile, String partition) { } protected boolean checkPosDeleteTotalSize(String partitionToPath) { - List posDeleteFiles = getPosDeleteFilesFromFileTree(partitionToPath); - if (posDeleteFiles.isEmpty()) { + long posDeleteSize = getPosDeleteFileSize(partitionToPath); + if (posDeleteSize <= 0) { return false; } - long posDeleteSize = posDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum(); Map properties = arcticTable.properties(); if (!properties.containsKey(TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO) && properties.containsKey(TableProperties.FULL_OPTIMIZE_TRIGGER_DELETE_FILE_SIZE_BYTES)) { @@ -118,17 +145,27 @@ protected boolean checkPosDeleteTotalSize(String partitionToPath) { } } - protected boolean checkFullOptimizeInterval(long current, String partitionToPath) { - long fullMajorOptimizeInterval = CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), + private long getPosDeleteFileSize(String partition) { + Long cache = partitionDeleteFileSize.get(partition); + if (cache != null) { + return cache; + } + List posDeleteFiles = getPosDeleteFilesFromFileTree(partition); + long posDeleteSize = posDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum(); + partitionDeleteFileSize.put(partition, posDeleteSize); + return posDeleteSize; + } + + @Override + protected long getMaxOptimizeInterval() { + return CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL_DEFAULT); + } - if (fullMajorOptimizeInterval != TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL_DEFAULT) { - long lastFullMajorOptimizeTime = tableOptimizeRuntime.getLatestFullOptimizeTime(partitionToPath); - return current - lastFullMajorOptimizeTime >= fullMajorOptimizeInterval; - } - - return false; + @Override + protected long getLatestOptimizeTime(String partition) { + return tableOptimizeRuntime.getLatestFullOptimizeTime(partition); } /** diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergFullOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergFullOptimizePlan.java index 3e325519c2..7bbb245e63 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergFullOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergFullOptimizePlan.java @@ -24,6 +24,7 @@ import com.netease.arctic.ams.server.model.TaskConfig; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.TableProperties; +import com.netease.arctic.utils.CompatiblePropertyUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -99,6 +100,18 @@ protected boolean partitionNeedPlan(String partitionToPath) { return false; } + @Override + protected long getMaxOptimizeInterval() { + return CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), + TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, + TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL_DEFAULT); + } + + @Override + protected long getLatestOptimizeTime(String partition) { + return tableOptimizeRuntime.getLatestFullOptimizeTime(partition); + } + private long getPartitionDeleteFileTotalSize(String partitionToPath) { Long cached = partitionDeleteFileTotalSize.get(partitionToPath); if (cached != null) { @@ -122,8 +135,21 @@ private long getPartitionDeleteFileTotalSize(String partitionToPath) { } @Override - protected long getPartitionWeight(String partitionToPath) { - return getPartitionDeleteFileTotalSize(partitionToPath); + protected PartitionWeight getPartitionWeight(String partitionToPath) { + return new IcebergFullPartitionWeight(getPartitionDeleteFileTotalSize(partitionToPath)); + } + + protected static class IcebergFullPartitionWeight implements PartitionWeight { + private final long deleteFileSize; + + public IcebergFullPartitionWeight(long deleteFileSize) { + this.deleteFileSize = deleteFileSize; + } + + @Override + public int compareTo(PartitionWeight o) { + return Long.compare(((IcebergFullPartitionWeight) o).deleteFileSize, this.deleteFileSize); + } } @Override @@ -159,8 +185,6 @@ protected List collectTask(String partition) { eqDeleteFiles, posDeleteFiles, taskPartitionConfig)); } } - - return collector; } } \ No newline at end of file diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergMinorOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergMinorOptimizePlan.java index 3f5f4c66bc..49591e1807 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergMinorOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergMinorOptimizePlan.java @@ -120,8 +120,33 @@ protected boolean partitionNeedPlan(String partitionToPath) { } @Override - protected long getPartitionWeight(String partitionToPath) { - return getPartitionSmallFileCount(partitionToPath); + protected PartitionWeight getPartitionWeight(String partitionToPath) { + return new IcebergMinorPartitionWeight(getPartitionSmallFileCount(partitionToPath)); + } + + protected static class IcebergMinorPartitionWeight implements PartitionWeight { + private final int smallFileCount; + + public IcebergMinorPartitionWeight(int smallFileCount) { + this.smallFileCount = smallFileCount; + } + + @Override + public int compareTo(PartitionWeight o) { + return Integer.compare(((IcebergMinorPartitionWeight) o).smallFileCount, this.smallFileCount); + } + } + + @Override + protected long getLatestOptimizeTime(String partition) { + return tableOptimizeRuntime.getLatestMinorOptimizeTime(partition); + } + + @Override + protected long getMaxOptimizeInterval() { + return CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL, + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL_DEFAULT); } private int getPartitionSmallFileCount(String partitionToPath) { diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MajorOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MajorOptimizePlan.java index bbd8acad89..29e10ddfbd 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MajorOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MajorOptimizePlan.java @@ -37,11 +37,15 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; public class MajorOptimizePlan extends AbstractArcticOptimizePlan { private static final Logger LOG = LoggerFactory.getLogger(MajorOptimizePlan.class); + // cache partition base file count + private final Map partitionBaseFileCount = new HashMap<>(); public MajorOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime tableOptimizeRuntime, List baseFileScanTasks, @@ -58,26 +62,59 @@ protected OptimizeType getOptimizeType() { @Override public boolean partitionNeedPlan(String partitionToPath) { - long current = System.currentTimeMillis(); - - - List baseFiles = getBaseFilesFromFileTree(partitionToPath); - if (baseFiles.size() >= 2) { - // check small data file count - if (checkSmallFileCount(baseFiles)) { - return true; - } + // check small data file count + if (checkBaseFileCount(partitionToPath)) { + return true; + } - // check major optimize interval - if (checkMajorOptimizeInterval(current, partitionToPath)) { - return true; - } + // check major optimize interval + if (checkOptimizeInterval(partitionToPath)) { + return true; } LOG.debug("{} ==== don't need {} optimize plan, skip partition {}", tableId(), getOptimizeType(), partitionToPath); return false; } + @Override + protected PartitionWeight getPartitionWeight(String partition) { + return new MajorPartitionWeight(checkOptimizeInterval(partition), getBaseFileCount(partition)); + } + + protected int getBaseFileCount(String partition) { + Integer cached = partitionBaseFileCount.get(partition); + if (cached != null) { + return cached; + } + + int baseFileCount = getBaseFilesFromFileTree(partition).size(); + partitionBaseFileCount.put(partition, baseFileCount); + return baseFileCount; + } + + protected static class MajorPartitionWeight implements PartitionWeight { + + private final boolean reachInterval; + + private final int baseFileCount; + + public MajorPartitionWeight(boolean reachInterval, int baseFileCount) { + this.reachInterval = reachInterval; + this.baseFileCount = baseFileCount; + } + + @Override + public int compareTo(PartitionWeight o) { + MajorPartitionWeight that = (MajorPartitionWeight) o; + int compare = Boolean.compare(that.reachInterval, this.reachInterval); + if (compare != 0) { + return compare; + } + return Integer.compare(that.baseFileCount, this.baseFileCount); + } + } + + @Override protected List collectTask(String partition) { List result; FileTree treeRoot = partitionFileTree.get(partition); @@ -93,21 +130,22 @@ protected List collectTask(String partition) { return result; } - protected boolean checkMajorOptimizeInterval(long current, String partitionToPath) { - return current - tableOptimizeRuntime.getLatestMajorOptimizeTime(partitionToPath) >= - CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), - TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_INTERVAL, - TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_INTERVAL_DEFAULT); + @Override + protected long getMaxOptimizeInterval() { + return CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), + TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_INTERVAL, + TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_INTERVAL_DEFAULT); } - protected boolean checkSmallFileCount(List dataFileList) { - if (CollectionUtils.isNotEmpty(dataFileList)) { - return dataFileList.size() >= CompatiblePropertyUtil.propertyAsInt(arcticTable.properties(), - TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_FILE_CNT, - TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_FILE_CNT_DEFAULT); - } + @Override + protected long getLatestOptimizeTime(String partition) { + return tableOptimizeRuntime.getLatestMajorOptimizeTime(partition); + } - return false; + protected boolean checkBaseFileCount(String partition) { + return getBaseFileCount(partition) >= CompatiblePropertyUtil.propertyAsInt(arcticTable.properties(), + TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_FILE_CNT, + TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_FILE_CNT_DEFAULT); } @Override diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MinorOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MinorOptimizePlan.java index e49500a455..baaedc964a 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MinorOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MinorOptimizePlan.java @@ -37,11 +37,15 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; public class MinorOptimizePlan extends AbstractArcticOptimizePlan { private static final Logger LOG = LoggerFactory.getLogger(MinorOptimizePlan.class); + // cache partition change file count + private final Map partitionChangeFileCount = new HashMap<>(); public MinorOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime tableOptimizeRuntime, List baseFileScanTasks, @@ -53,30 +57,71 @@ public MinorOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime tableOpti @Override public boolean partitionNeedPlan(String partitionToPath) { - long current = System.currentTimeMillis(); - List deleteFiles = getDeleteFilesFromFileTree(partitionToPath); - - // check delete file count - if (CollectionUtils.isNotEmpty(deleteFiles)) { - // file count - if (deleteFiles.size() >= CompatiblePropertyUtil.propertyAsInt(arcticTable.properties(), - TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT, - TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT_DEFAULT)) { - return true; - } + int changeFileCount = getChangeFileCount(partitionToPath); + + // file count + if (changeFileCount >= CompatiblePropertyUtil.propertyAsInt(arcticTable.properties(), + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT, + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT_DEFAULT)) { + return true; } // optimize interval - if (current - tableOptimizeRuntime.getLatestMinorOptimizeTime(partitionToPath) >= - CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), - TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL, - TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL_DEFAULT)) { + if (checkOptimizeInterval(partitionToPath)) { return true; } LOG.debug("{} ==== don't need {} optimize plan, skip partition {}", tableId(), getOptimizeType(), partitionToPath); return false; } + private int getChangeFileCount(String partition) { + Integer cached = partitionChangeFileCount.get(partition); + if (cached != null) { + return cached; + } + int changeFileCount = getInsertFilesFromFileTree(partition).size() + getDeleteFilesFromFileTree(partition).size(); + partitionChangeFileCount.put(partition, changeFileCount); + return changeFileCount; + } + + @Override + protected long getLatestOptimizeTime(String partition) { + return tableOptimizeRuntime.getLatestMinorOptimizeTime(partition); + } + + @Override + protected long getMaxOptimizeInterval() { + return CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL, + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL_DEFAULT); + } + + @Override + protected PartitionWeight getPartitionWeight(String partition) { + return new MinorPartitionWeight(checkOptimizeInterval(partition), getChangeFileCount(partition)); + } + + protected static class MinorPartitionWeight implements PartitionWeight { + private final boolean reachInterval; + + private final int changeFileCount; + + public MinorPartitionWeight(boolean reachInterval, int changeFileCount) { + this.reachInterval = reachInterval; + this.changeFileCount = changeFileCount; + } + + @Override + public int compareTo(PartitionWeight o) { + MinorPartitionWeight that = (MinorPartitionWeight) o; + int compare = Boolean.compare(that.reachInterval, this.reachInterval); + if (compare != 0) { + return compare; + } + return Integer.compare(that.changeFileCount, this.changeFileCount); + } + } + @Override protected OptimizeType getOptimizeType() { return OptimizeType.Minor; diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java index 154783ae0d..230ff02a6b 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java @@ -60,7 +60,6 @@ public SupportHiveFullOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime @Override protected boolean partitionNeedPlan(String partitionToPath) { - long current = System.currentTimeMillis(); List posDeleteFiles = getPosDeleteFilesFromFileTree(partitionToPath); List baseFiles = getBaseFilesFromFileTree(partitionToPath); @@ -98,7 +97,7 @@ protected boolean partitionNeedPlan(String partitionToPath) { } // check full optimize interval - if (checkFullOptimizeInterval(current, partitionToPath) && partitionNeedPlan) { + if (checkOptimizeInterval(partitionToPath) && partitionNeedPlan) { return true; } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveMajorOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveMajorOptimizePlan.java index 219495eb5e..f4d8f7e418 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveMajorOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveMajorOptimizePlan.java @@ -24,10 +24,7 @@ import com.netease.arctic.hive.table.SupportHive; import com.netease.arctic.hive.utils.TableTypeUtil; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; -import com.netease.arctic.utils.CompatiblePropertyUtil; -import org.apache.commons.collections.CollectionUtils; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; @@ -59,26 +56,26 @@ public SupportHiveMajorOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntim @Override public boolean partitionNeedPlan(String partitionToPath) { - long current = System.currentTimeMillis(); - - List posDeleteFiles = getPosDeleteFilesFromFileTree(partitionToPath); - List baseFiles = getBaseFilesFromFileTree(partitionToPath); - List smallFiles = filterSmallFiles(baseFiles); + int baseFileCount = getBaseFileCount(partitionToPath); // check whether partition need plan by files info. - // if partition has no pos-delete file, and there are files in not hive location, need plan - // if partition has pos-delete, and there are small file count greater than 2 in not hive location, need plan - boolean hasPos = CollectionUtils.isNotEmpty(posDeleteFiles) && smallFiles.size() >= 2; - boolean noPos = CollectionUtils.isEmpty(posDeleteFiles) && CollectionUtils.isNotEmpty(baseFiles); - boolean partitionNeedPlan = hasPos || noPos; + // if partition doesn't move files to hive location, only if there are more than 1 small files not in hive location, + // need plan + // if partition moves files to hive location, only if there are files not in hive location, need plan + boolean partitionNeedPlan; + if (notMoveToHiveLocation(partitionToPath)) { + partitionNeedPlan = baseFileCount > 1; + } else { + partitionNeedPlan = baseFileCount > 0; + } if (partitionNeedPlan) { // check small data file count - if (checkSmallFileCount(smallFiles)) { + if (checkBaseFileCount(partitionToPath)) { return true; } // check major optimize interval - if (checkMajorOptimizeInterval(current, partitionToPath)) { + if (checkOptimizeInterval(partitionToPath)) { return true; } } @@ -88,18 +85,6 @@ public boolean partitionNeedPlan(String partitionToPath) { return false; } - @Override - protected boolean checkMajorOptimizeInterval(long current, String partitionToPath) { - if (current - tableOptimizeRuntime.getLatestMajorOptimizeTime(partitionToPath) >= - CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), - TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_INTERVAL, - TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_INTERVAL_DEFAULT)) { - return true; - } - - return false; - } - @Override protected boolean baseFileShouldOptimize(DataFile baseFile, String partition) { // if a partition has pos-delete file, only the small base files not in hive location should be optimized, diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/AmsTestBase.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/AmsTestBase.java index 272c051cec..acb0a9a7a3 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/AmsTestBase.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/AmsTestBase.java @@ -66,7 +66,6 @@ import com.netease.arctic.ams.server.utils.CatalogUtil; import com.netease.arctic.ams.server.utils.JDBCSqlSessionFactoryProvider; import com.netease.arctic.ams.server.utils.SequenceNumberFetcherTest; -import com.netease.arctic.ams.server.utils.ThreadPool; import com.netease.arctic.ams.server.utils.UnKeyedTableUtilTest; import com.netease.arctic.catalog.ArcticCatalog; import com.netease.arctic.catalog.CatalogLoader; diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizePlan.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizePlan.java index 8dc5e6dc18..a20eafde6c 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizePlan.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizePlan.java @@ -11,7 +11,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; public class TestIcebergFullOptimizePlan extends TestIcebergBase { @Test @@ -123,4 +125,26 @@ public void testBinPackPlan() throws Exception { List tasks = optimizePlan.plan().getOptimizeTasks(); Assert.assertEquals((int) Math.ceil(1.0 * dataFiles.size() / packFileCnt), tasks.size()); } + + @Test + public void testPartitionWeight() { + List partitionWeights = new ArrayList<>(); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p1", + new IcebergFullOptimizePlan.IcebergFullPartitionWeight(0))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p2", + new IcebergFullOptimizePlan.IcebergFullPartitionWeight(1))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p3", + new IcebergFullOptimizePlan.IcebergFullPartitionWeight(200))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p4", + new IcebergFullOptimizePlan.IcebergFullPartitionWeight(100))); + + List sortedPartitions = partitionWeights.stream() + .sorted() + .map(AbstractOptimizePlan.PartitionWeightWrapper::getPartition) + .collect(Collectors.toList()); + Assert.assertEquals("p3", sortedPartitions.get(0)); + Assert.assertEquals("p4", sortedPartitions.get(1)); + Assert.assertEquals("p2", sortedPartitions.get(2)); + Assert.assertEquals("p1", sortedPartitions.get(3)); + } } diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergMinorOptimizePlan.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergMinorOptimizePlan.java index cb8061bb1b..5ec6c255f4 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergMinorOptimizePlan.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergMinorOptimizePlan.java @@ -7,9 +7,12 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; public class TestIcebergMinorOptimizePlan extends TestIcebergBase { @Test @@ -114,4 +117,26 @@ public void testPartitionPartialMinorOptimize() throws Exception { assertTask(planResult.getOptimizeTasks().get(0), "name=name2", 0, 10, 0, 0); assertTask(planResult.getOptimizeTasks().get(1), "name=name3", 0, 8, 0, 0); } + + @Test + public void testPartitionWeight() { + List partitionWeights = new ArrayList<>(); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p1", + new IcebergMinorOptimizePlan.IcebergMinorPartitionWeight(0))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p2", + new IcebergMinorOptimizePlan.IcebergMinorPartitionWeight(1))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p3", + new IcebergMinorOptimizePlan.IcebergMinorPartitionWeight(200))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p4", + new IcebergMinorOptimizePlan.IcebergMinorPartitionWeight(100))); + + List sortedPartitions = partitionWeights.stream() + .sorted() + .map(AbstractOptimizePlan.PartitionWeightWrapper::getPartition) + .collect(Collectors.toList()); + Assert.assertEquals("p3", sortedPartitions.get(0)); + Assert.assertEquals("p4", sortedPartitions.get(1)); + Assert.assertEquals("p2", sortedPartitions.get(2)); + Assert.assertEquals("p1", sortedPartitions.get(3)); + } } diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMajorOptimizePlan.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMajorOptimizePlan.java index 085d6cec2f..aaea71b0b1 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMajorOptimizePlan.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMajorOptimizePlan.java @@ -279,6 +279,50 @@ public void testNoPartitionTableFullOptimize() throws IOException { Assert.assertEquals(0, tasks.get(0).getDeleteFileCnt()); } + @Test + public void testPartitionWeight() { + List partitionWeights = new ArrayList<>(); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p1", + new MajorOptimizePlan.MajorPartitionWeight(true,0))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p2", + new MajorOptimizePlan.MajorPartitionWeight(true, 1))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p3", + new MajorOptimizePlan.MajorPartitionWeight(false, 200))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p4", + new MajorOptimizePlan.MajorPartitionWeight(false, 100))); + + List sortedPartitions = partitionWeights.stream() + .sorted() + .map(AbstractOptimizePlan.PartitionWeightWrapper::getPartition) + .collect(Collectors.toList()); + Assert.assertEquals("p2", sortedPartitions.get(0)); + Assert.assertEquals("p1", sortedPartitions.get(1)); + Assert.assertEquals("p3", sortedPartitions.get(2)); + Assert.assertEquals("p4", sortedPartitions.get(3)); + } + + @Test + public void testFullPartitionWeight() { + List partitionWeights = new ArrayList<>(); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p1", + new FullOptimizePlan.FullPartitionWeight(true,0))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p2", + new FullOptimizePlan.FullPartitionWeight(true, 1))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p3", + new FullOptimizePlan.FullPartitionWeight(false, 200))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p4", + new FullOptimizePlan.FullPartitionWeight(false, 100))); + + List sortedPartitions = partitionWeights.stream() + .sorted() + .map(AbstractOptimizePlan.PartitionWeightWrapper::getPartition) + .collect(Collectors.toList()); + Assert.assertEquals("p2", sortedPartitions.get(0)); + Assert.assertEquals("p1", sortedPartitions.get(1)); + Assert.assertEquals("p3", sortedPartitions.get(2)); + Assert.assertEquals("p4", sortedPartitions.get(3)); + } + private List insertUnKeyedTableDataFiles(ArcticTable arcticTable) { List dataFiles = insertUnKeyedTableDataFile(FILE_A.partition(), LocalDateTime.of(2022, 1, 1, 12, 0, 0), 5); dataFiles.addAll(insertUnKeyedTableDataFile(FILE_B.partition(), LocalDateTime.of(2022, 1, 2, 12, 0, 0), 5)); diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMinorOptimizePlan.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMinorOptimizePlan.java index 1ebdbc6fdb..a4dca9e142 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMinorOptimizePlan.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestMinorOptimizePlan.java @@ -87,6 +87,28 @@ public void testMinorOptimize() throws IOException { Assert.assertEquals(10, tasks.get(0).getDeleteFileCnt()); } + @Test + public void testPartitionWeight() { + List partitionWeights = new ArrayList<>(); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p1", + new MinorOptimizePlan.MinorPartitionWeight(true,0))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p2", + new MinorOptimizePlan.MinorPartitionWeight(true, 1))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p3", + new MinorOptimizePlan.MinorPartitionWeight(false, 200))); + partitionWeights.add(new AbstractOptimizePlan.PartitionWeightWrapper("p4", + new MinorOptimizePlan.MinorPartitionWeight(false, 100))); + + List sortedPartitions = partitionWeights.stream() + .sorted() + .map(AbstractOptimizePlan.PartitionWeightWrapper::getPartition) + .collect(Collectors.toList()); + Assert.assertEquals("p2", sortedPartitions.get(0)); + Assert.assertEquals("p1", sortedPartitions.get(1)); + Assert.assertEquals("p3", sortedPartitions.get(2)); + Assert.assertEquals("p4", sortedPartitions.get(3)); + } + protected List insertChangeDeleteFiles(ArcticTable arcticTable) throws IOException { AtomicInteger taskId = new AtomicInteger(); List changeDeleteFiles = new ArrayList<>(); From 2c1a002372b442c64b86e5c480cf58496701dc0f Mon Sep 17 00:00:00 2001 From: wangtao Date: Mon, 13 Mar 2023 20:36:28 +0800 Subject: [PATCH 2/2] if not all partitions are optimized, current change snapshot id should set to -1 --- .../server/optimize/AbstractOptimizePlan.java | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java index b46df213d2..9cf9bf224b 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractOptimizePlan.java @@ -52,6 +52,7 @@ public abstract class AbstractOptimizePlan { protected final Set allPartitions = new HashSet<>(); // partitions to optimizing protected final Set affectedPartitions = new HashSet<>(); + private boolean skippedPartitions = false; private int collectFileCnt = 0; @@ -91,16 +92,18 @@ protected List collectTasks(List partitions) { for (String partition : partitions) { List optimizeTasks = collectTask(partition); + if (reachMaxFileCount()) { + this.skippedPartitions = true; + LOG.info("{} get enough files {} > {}, ignore left partitions", tableId(), this.collectFileCnt, + getMaxFileCntLimit()); + break; + } if (optimizeTasks.size() > 0) { this.affectedPartitions.add(partition); LOG.info("{} partition {} ==== collect {} {} tasks", tableId(), partition, optimizeTasks.size(), getOptimizeType()); results.addAll(optimizeTasks); - if (reachMaxFileCnt(optimizeTasks)) { - LOG.info("{} get enough files {} > {}, ignore left partitions", tableId(), this.collectFileCnt, - getMaxFileCntLimit()); - break; - } + accumulateFileCount(optimizeTasks); } } LOG.info("{} ==== after collect, get {} task of partitions {}/{}", tableId(), getOptimizeType(), @@ -108,7 +111,7 @@ protected List collectTasks(List partitions) { return results; } - private boolean reachMaxFileCnt(List newTasks) { + private void accumulateFileCount(List newTasks) { int newFileCnt = 0; for (BasicOptimizeTask optimizeTask : newTasks) { int taskFileCnt = optimizeTask.getBaseFileCnt() + optimizeTask.getDeleteFileCnt() + @@ -116,12 +119,20 @@ private boolean reachMaxFileCnt(List newTasks) { newFileCnt += taskFileCnt; } this.collectFileCnt += newFileCnt; + } + + private boolean reachMaxFileCount() { return this.collectFileCnt >= getMaxFileCntLimit(); } private OptimizePlanResult buildOptimizePlanResult(List optimizeTasks) { + long currentChangeSnapshotId = getCurrentChangeSnapshotId(); + if (skippedPartitions) { + // if not all partitions are optimized, current change snapshot id should set to -1 to trigger next minor optimize + currentChangeSnapshotId = TableOptimizeRuntime.INVALID_SNAPSHOT_ID; + } return new OptimizePlanResult(this.affectedPartitions, optimizeTasks, getOptimizeType(), this.currentSnapshotId, - getCurrentChangeSnapshotId(), this.planGroup); + currentChangeSnapshotId, this.planGroup); } protected List getPartitionsToOptimizeInOrder() {