From 1f42ff448e2a61408ef7a3ea50eb01314ba39ded Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 24 Aug 2023 16:48:07 +0800 Subject: [PATCH 01/14] optimizing plan: filter when add files --- .../plan/AbstractPartitionPlan.java | 154 ++++++------------ .../plan/CommonPartitionEvaluator.java | 66 ++++++-- .../plan/MixedHivePartitionPlan.java | 70 ++++---- .../plan/MixedIcebergPartitionPlan.java | 76 ++++----- .../optimizing/plan/PartitionEvaluator.java | 2 +- 5 files changed, 182 insertions(+), 186 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index 513a7bc1af..408564d914 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -26,7 +26,6 @@ import com.netease.arctic.server.optimizing.OptimizingType; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.table.ArcticTable; -import org.apache.iceberg.FileContent; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -51,12 +50,10 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator { private long toSequence = INVALID_SEQUENCE; protected final long planTime; - protected final Map>> fragmentFiles = Maps.newHashMap(); - protected final Map>> segmentFiles = Maps.newHashMap(); - protected final Map> equalityDeleteFileMap = Maps.newHashMap(); - protected final Map> posDeleteFileMap = Maps.newHashMap(); - - private List splitTasks; + protected final Map>> rewriteDataFiles = Maps.newHashMap(); + protected final Map>> rewritePosDataFiles = Maps.newHashMap(); + // protected Delete files are Delete files related to Data files not optimized in this plan + protected final Set protectedDeleteFiles = Sets.newHashSet(); public AbstractPartitionPlan(TableRuntime tableRuntime, ArcticTable table, String partition, long planTime) { @@ -99,24 +96,19 @@ public long getCost() { } @Override - public void addFile(IcebergDataFile dataFile, List> deletes) { - evaluator().addFile(dataFile, deletes); - if (evaluator().isFragmentFile(dataFile)) { - fragmentFiles.put(dataFile, deletes); - } else { - segmentFiles.put(dataFile, deletes); + public boolean addFile(IcebergDataFile dataFile, List> deletes) { + boolean added = evaluator().addFile(dataFile, deletes); + if (!added) { + // if the Data file is not added, it's Delete files should be not be removed from iceberg + deletes.stream().map(delete -> delete.path().toString()).forEach(protectedDeleteFiles::add); + return false; } - for (IcebergContentFile deleteFile : deletes) { - if (deleteFile.content() == FileContent.POSITION_DELETES) { - posDeleteFileMap - .computeIfAbsent(deleteFile.path().toString(), delete -> Sets.newHashSet()) - .add(dataFile); - } else { - equalityDeleteFileMap - .computeIfAbsent(deleteFile.path().toString(), delete -> Sets.newHashSet()) - .add(dataFile); - } + if (evaluator().fileShouldRewrite(dataFile, deletes)) { + rewriteDataFiles.put(dataFile, deletes); + } else if (evaluator().segmentFileShouldRewritePos(dataFile, deletes)) { + rewritePosDataFiles.put(dataFile, deletes); } + return true; } @Override @@ -128,35 +120,19 @@ public List splitTasks(int targetTaskCount) { if (taskSplitter == null) { taskSplitter = buildTaskSplitter(); } - this.splitTasks = taskSplitter.splitTasks(targetTaskCount).stream() - .filter(this::taskNeedExecute).collect(Collectors.toList()); - return this.splitTasks.stream() + beforeSplit(); + return taskSplitter.splitTasks(targetTaskCount).stream() .map(task -> task.buildTask(buildTaskProperties())) .collect(Collectors.toList()); } - - protected boolean taskNeedExecute(SplitTask task) { - // if there are no delete files and no more than 1 rewrite files, we should not execute - return !task.getDeleteFiles().isEmpty() || task.getRewriteDataFiles().size() > 1; - } - - private boolean isOptimizing(IcebergDataFile dataFile) { - return this.splitTasks.stream().anyMatch(task -> task.contains(dataFile)); + + protected void beforeSplit() { } protected abstract TaskSplitter buildTaskSplitter(); protected abstract OptimizingInputProperties buildTaskProperties(); - protected boolean fileShouldFullOptimizing(IcebergDataFile dataFile, List> deleteFiles) { - if (config.isFullRewriteAllFiles()) { - return true; - } else { - // if a file is related any delete files or is not big enough, it should full optimizing - return !deleteFiles.isEmpty() || dataFile.fileSizeInBytes() < config.getTargetSize() * 0.9; - } - } - protected void markSequence(long sequence) { if (fromSequence == INVALID_SEQUENCE || fromSequence > sequence) { fromSequence = sequence; @@ -225,39 +201,15 @@ public Weight getWeight() { protected class SplitTask { private final Set rewriteDataFiles = Sets.newHashSet(); - private final Set> deleteFiles = Sets.newHashSet(); private final Set rewritePosDataFiles = Sets.newHashSet(); + private final Set> deleteFiles = Sets.newHashSet(); - public SplitTask(Map>> fragmentFiles, - Map>> segmentFiles) { - if (evaluator().isFullNecessary()) { - fragmentFiles.forEach((icebergFile, deleteFileSet) -> { - if (fileShouldFullOptimizing(icebergFile, deleteFileSet)) { - rewriteDataFiles.add(icebergFile); - deleteFiles.addAll(deleteFileSet); - } - }); - segmentFiles.forEach((icebergFile, deleteFileSet) -> { - if (fileShouldFullOptimizing(icebergFile, deleteFileSet)) { - rewriteDataFiles.add(icebergFile); - deleteFiles.addAll(deleteFileSet); - } - }); - } else { - fragmentFiles.forEach((icebergFile, deleteFileSet) -> { - rewriteDataFiles.add(icebergFile); - deleteFiles.addAll(deleteFileSet); - }); - segmentFiles.forEach((icebergFile, deleteFileSet) -> { - if (evaluator().shouldRewriteSegmentFile(icebergFile, deleteFileSet)) { - rewriteDataFiles.add(icebergFile); - deleteFiles.addAll(deleteFileSet); - } else if (evaluator.shouldRewritePosForSegmentFile(icebergFile, deleteFileSet)) { - rewritePosDataFiles.add(icebergFile); - deleteFiles.addAll(deleteFileSet); - } - }); - } + public SplitTask(Set rewriteDataFiles, + Set rewritePosDataFiles, + Set> deleteFiles) { + this.rewriteDataFiles.addAll(rewriteDataFiles); + this.rewritePosDataFiles.addAll(rewritePosDataFiles); + this.deleteFiles.addAll(deleteFiles); } public Set getRewriteDataFiles() { @@ -272,23 +224,11 @@ public Set getRewritePosDataFiles() { return rewritePosDataFiles; } - public boolean contains(IcebergDataFile dataFile) { - return rewriteDataFiles.contains(dataFile) || rewritePosDataFiles.contains(dataFile); - } - public TaskDescriptor buildTask(OptimizingInputProperties properties) { Set> readOnlyDeleteFiles = Sets.newHashSet(); Set> rewriteDeleteFiles = Sets.newHashSet(); for (IcebergContentFile deleteFile : deleteFiles) { - Set relatedDataFiles; - if (deleteFile.content() == FileContent.POSITION_DELETES) { - relatedDataFiles = posDeleteFileMap.get(deleteFile.path().toString()); - } else { - relatedDataFiles = equalityDeleteFileMap.get(deleteFile.path().toString()); - } - boolean findDataFileNotOptimizing = - relatedDataFiles.stream().anyMatch(file -> !contains(file) && !isOptimizing(file)); - if (findDataFileNotOptimizing) { + if (protectedDeleteFiles.contains(deleteFile.path().toString())) { readOnlyDeleteFiles.add(deleteFile); } else { rewriteDeleteFiles.add(deleteFile); @@ -311,12 +251,12 @@ public TaskDescriptor buildTask(OptimizingInputProperties properties) { protected static class FileTask { private final IcebergDataFile file; private final List> deleteFiles; - private final boolean isFragment; + private final boolean isRewriteDataFile; - public FileTask(IcebergDataFile file, List> deleteFiles, boolean isFragment) { + public FileTask(IcebergDataFile file, List> deleteFiles, boolean isRewriteDataFile) { this.file = file; this.deleteFiles = deleteFiles; - this.isFragment = isFragment; + this.isRewriteDataFile = isRewriteDataFile; } public IcebergDataFile getFile() { @@ -327,12 +267,12 @@ public List> getDeleteFiles() { return deleteFiles; } - public boolean isFragment() { - return isFragment; + public boolean isRewriteDataFile() { + return isRewriteDataFile; } - public boolean isSegment() { - return !isFragment; + public boolean isRewritePosDataFile() { + return !isRewriteDataFile; } } @@ -342,10 +282,10 @@ protected class BinPackingTaskSplitter implements TaskSplitter { public List splitTasks(int targetTaskCount) { // bin-packing List allDataFiles = Lists.newArrayList(); - segmentFiles.forEach((dataFile, deleteFiles) -> - allDataFiles.add(new FileTask(dataFile, deleteFiles, false))); - fragmentFiles.forEach((dataFile, deleteFiles) -> + rewriteDataFiles.forEach((dataFile, deleteFiles) -> allDataFiles.add(new FileTask(dataFile, deleteFiles, true))); + rewritePosDataFiles.forEach((dataFile, deleteFiles) -> + allDataFiles.add(new FileTask(dataFile, deleteFiles, false))); long taskSize = config.getTargetSize(); Long sum = allDataFiles.stream().map(f -> f.getFile().fileSizeInBytes()).reduce(0L, Long::sum); @@ -356,13 +296,19 @@ public List splitTasks(int targetTaskCount) { // collect List results = Lists.newArrayList(); for (List fileTasks : packed) { - Map>> fragmentFiles = Maps.newHashMap(); - Map>> segmentFiles = Maps.newHashMap(); - fileTasks.stream().filter(FileTask::isFragment) - .forEach(f -> fragmentFiles.put(f.getFile(), f.getDeleteFiles())); - fileTasks.stream().filter(FileTask::isSegment) - .forEach(f -> segmentFiles.put(f.getFile(), f.getDeleteFiles())); - results.add(new SplitTask(fragmentFiles, segmentFiles)); + Set rewriteDataFiles = Sets.newHashSet(); + Set rewritePosDataFiles = Sets.newHashSet(); + Set> deleteFiles = Sets.newHashSet(); + + fileTasks.stream().filter(FileTask::isRewriteDataFile).forEach(f -> { + rewriteDataFiles.add(f.getFile()); + deleteFiles.addAll(f.getDeleteFiles()); + }); + fileTasks.stream().filter(FileTask::isRewritePosDataFile).forEach(f -> { + rewritePosDataFiles.add(f.getFile()); + deleteFiles.addAll(f.getDeleteFiles()); + }); + results.add(new SplitTask(rewriteDataFiles, rewritePosDataFiles, deleteFiles)); } return results; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java index 1da59d7972..8ea1b6294b 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java @@ -66,6 +66,7 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { private Boolean necessary = null; private OptimizingType optimizingType = null; private String name; + private Boolean reachFullInterval = null; public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, long planTime) { this.partition = partition; @@ -85,11 +86,11 @@ protected boolean isFragmentFile(IcebergDataFile dataFile) { } @Override - public void addFile(IcebergDataFile dataFile, List> deletes) { + public boolean addFile(IcebergDataFile dataFile, List> deletes) { if (isFragmentFile(dataFile)) { - addFragmentFile(dataFile, deletes); + return addFragmentFile(dataFile, deletes); } else { - addSegmentFile(dataFile, deletes); + return addSegmentFile(dataFile, deletes); } } @@ -105,36 +106,66 @@ private boolean isDuplicateDelete(IcebergContentFile delete) { return deleteExist; } - private void addFragmentFile(IcebergDataFile dataFile, List> deletes) { + private boolean addFragmentFile(IcebergDataFile dataFile, List> deletes) { + if (!fileShouldRewrite(dataFile, deletes)) { + return false; + } fragmentFileSize += dataFile.fileSizeInBytes(); fragmentFileCount += 1; for (IcebergContentFile delete : deletes) { addDelete(delete); } + return true; } - private void addSegmentFile(IcebergDataFile dataFile, List> deletes) { - segmentFileSize += dataFile.fileSizeInBytes(); - segmentFileCount += 1; - - if (shouldRewriteSegmentFile(dataFile, deletes)) { + private boolean addSegmentFile(IcebergDataFile dataFile, List> deletes) { + if (fileShouldRewrite(dataFile, deletes)) { rewriteSegmentFileSize += dataFile.fileSizeInBytes(); rewriteSegmentFileCount += 1; - } else if (shouldRewritePosForSegmentFile(dataFile, deletes)) { + } else if (segmentFileShouldRewritePos(dataFile, deletes)) { rewritePosSegmentFileSize += dataFile.fileSizeInBytes(); rewritePosSegmentFileCount += 1; + } else { + return false; } + + segmentFileSize += dataFile.fileSizeInBytes(); + segmentFileCount += 1; for (IcebergContentFile delete : deletes) { addDelete(delete); } + return true; + } + + protected boolean fileShouldFullOptimizing(IcebergDataFile dataFile, List> deleteFiles) { + if (config.isFullRewriteAllFiles()) { + return true; + } + if (isFragmentFile(dataFile)) { + return true; + } + // if a file is related any delete files or is not big enough, it should full optimizing + return !deleteFiles.isEmpty() || dataFile.fileSizeInBytes() < config.getTargetSize() * 0.9; } - public boolean shouldRewriteSegmentFile(IcebergDataFile dataFile, List> deletes) { + public boolean fileShouldRewrite(IcebergDataFile dataFile, List> deletes) { + if (isFullOptimizing()) { + return fileShouldFullOptimizing(dataFile, deletes); + } + if (isFragmentFile(dataFile)) { + return true; + } return getRecordCount(deletes) > dataFile.recordCount() * config.getMajorDuplicateRatio(); } - public boolean shouldRewritePosForSegmentFile(IcebergDataFile dataFile, List> deletes) { + public boolean segmentFileShouldRewritePos(IcebergDataFile dataFile, List> deletes) { + if (isFullOptimizing()) { + return false; + } + if (isFragmentFile(dataFile)) { + return false; + } if (deletes.stream().anyMatch(delete -> delete.content() == FileContent.EQUALITY_DELETES)) { return true; } else if (deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count() >= 2) { @@ -143,6 +174,10 @@ public boolean shouldRewritePosForSegmentFile(IcebergDataFile dataFile, List> files) { return files.stream().mapToLong(ContentFile::recordCount).sum(); @@ -213,8 +248,11 @@ protected boolean reachMinorInterval() { } protected boolean reachFullInterval() { - return config.getFullTriggerInterval() >= 0 && - planTime - tableRuntime.getLastFullOptimizingTime() > config.getFullTriggerInterval(); + if (reachFullInterval == null) { + reachFullInterval = config.getFullTriggerInterval() >= 0 && + planTime - tableRuntime.getLastFullOptimizingTime() > config.getFullTriggerInterval(); + } + return reachFullInterval; } public boolean isFullNecessary() { diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index b6d45c1fdf..5ff58c0ca0 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -27,6 +27,7 @@ import com.netease.arctic.optimizing.OptimizingInputProperties; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.table.ArcticTable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import java.util.List; import java.util.Map; @@ -43,22 +44,26 @@ public MixedHivePartitionPlan(TableRuntime tableRuntime, } @Override - public void addFile(IcebergDataFile dataFile, List> deletes) { - super.addFile(dataFile, deletes); + public boolean addFile(IcebergDataFile dataFile, List> deletes) { + if (!super.addFile(dataFile, deletes)) { + return false; + } long sequenceNumber = dataFile.dataSequenceNumber(); if (sequenceNumber > maxSequence) { maxSequence = sequenceNumber; } + return true; } @Override - protected boolean fileShouldFullOptimizing(IcebergDataFile dataFile, List> deleteFiles) { - if (moveFiles2CurrentHiveLocation()) { - // if we are going to move files to old hive location, only files not in hive location should full optimizing - return evaluator().notInHiveLocation(dataFile); - } else { - // if we are going to rewrite all files to a new hive location, all files should full optimizing - return true; + protected void beforeSplit() { + super.beforeSplit(); + if (evaluator().isFullOptimizing() && moveFiles2CurrentHiveLocation()) { + // This is an improvement for full optimizing of hive table, if there is no delete files, we only have to move + // files not in hive location to hive location, so the files in the hive location should not be optimizing. + Preconditions.checkArgument(protectedDeleteFiles.isEmpty(), "delete files should be empty"); + rewriteDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey())); + rewritePosDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey())); } } @@ -76,16 +81,6 @@ protected CommonPartitionEvaluator buildEvaluator() { return new MixedHivePartitionEvaluator(tableRuntime, partition, hiveLocation, planTime, isKeyedTable()); } - @Override - protected boolean taskNeedExecute(SplitTask task) { - if (evaluator().isFullNecessary()) { - // if is full optimizing for hive, task should execute if there are any data files - return task.getRewriteDataFiles().size() > 0; - } else { - return super.taskNeedExecute(task); - } - } - @Override protected OptimizingInputProperties buildTaskProperties() { OptimizingInputProperties properties = super.buildTaskProperties(); @@ -121,11 +116,14 @@ public MixedHivePartitionEvaluator(TableRuntime tableRuntime, String partition, } @Override - public void addFile(IcebergDataFile dataFile, List> deletes) { - super.addFile(dataFile, deletes); - if (!filesNotInHiveLocation && notInHiveLocation(dataFile)) { + public boolean addFile(IcebergDataFile dataFile, List> deletes) { + if (!super.addFile(dataFile, deletes)) { + return false; + } + if (!filesNotInHiveLocation && !inHiveLocation(dataFile)) { filesNotInHiveLocation = true; } + return true; } @Override @@ -143,7 +141,7 @@ protected boolean isFragmentFile(IcebergDataFile dataFile) { PrimaryKeyedFile file = (PrimaryKeyedFile) dataFile.internalFile(); if (file.type() == DataFileType.BASE_FILE) { // we treat all files in hive location as segment files - return dataFile.fileSizeInBytes() <= fragmentSize && notInHiveLocation(dataFile); + return dataFile.fileSizeInBytes() <= fragmentSize && !inHiveLocation(dataFile); } else if (file.type() == DataFileType.INSERT_FILE) { // we treat all insert files as fragment files return true; @@ -154,15 +152,17 @@ protected boolean isFragmentFile(IcebergDataFile dataFile) { @Override public boolean isFullNecessary() { - if (reachHiveRefreshInterval() && hasNewHiveData()) { - return true; - } if (!reachFullInterval()) { return false; } return fragmentFileCount > getBaseSplitCount() || hasNewHiveData(); } + @Override + protected boolean reachFullInterval() { + return super.reachFullInterval() || reachHiveRefreshInterval(); + } + protected boolean hasNewHiveData() { return anyDeleteExist() || hasChangeFiles || filesNotInHiveLocation; } @@ -172,8 +172,18 @@ protected boolean reachHiveRefreshInterval() { } @Override - public boolean shouldRewriteSegmentFile(IcebergDataFile dataFile, List> deletes) { - return super.shouldRewriteSegmentFile(dataFile, deletes) && notInHiveLocation(dataFile); + public boolean fileShouldRewrite(IcebergDataFile dataFile, List> deletes) { + if (isFullOptimizing()) { + return fileShouldFullOptimizing(dataFile, deletes); + } else { + // if it is not full optimizing, we only rewrite files not in hive location + return !inHiveLocation(dataFile) && super.fileShouldRewrite(dataFile, deletes); + } + } + + @Override + protected boolean fileShouldFullOptimizing(IcebergDataFile dataFile, List> deleteFiles) { + return true; } @Override @@ -182,8 +192,8 @@ public PartitionEvaluator.Weight getWeight() { hasChangeFiles && reachBaseRefreshInterval() || hasNewHiveData() && reachHiveRefreshInterval()); } - private boolean notInHiveLocation(IcebergContentFile file) { - return !file.path().toString().contains(hiveLocation); + private boolean inHiveLocation(IcebergContentFile file) { + return file.path().toString().contains(hiveLocation); } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java index a248d40f65..6034964b64 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -31,10 +31,12 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import javax.annotation.Nonnull; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Predicate; public class MixedIcebergPartitionPlan extends AbstractPartitionPlan { @@ -45,8 +47,10 @@ public MixedIcebergPartitionPlan(TableRuntime tableRuntime, } @Override - public void addFile(IcebergDataFile dataFile, List> deletes) { - super.addFile(dataFile, deletes); + public boolean addFile(IcebergDataFile dataFile, List> deletes) { + if (!super.addFile(dataFile, deletes)) { + return false; + } if (evaluator().isChangeFile(dataFile)) { markSequence(dataFile.dataSequenceNumber()); } @@ -55,6 +59,7 @@ public void addFile(IcebergDataFile dataFile, List> delete markSequence(deleteFile.dataSequenceNumber()); } } + return true; } @Override @@ -62,15 +67,6 @@ protected MixedIcebergPartitionEvaluator evaluator() { return ((MixedIcebergPartitionEvaluator) super.evaluator()); } - @Override - protected boolean taskNeedExecute(SplitTask task) { - if (super.taskNeedExecute(task)) { - return true; - } else { - return task.getRewriteDataFiles().stream().anyMatch(evaluator()::isChangeFile); - } - } - @Override protected OptimizingInputProperties buildTaskProperties() { OptimizingInputProperties properties = new OptimizingInputProperties(); @@ -109,11 +105,14 @@ public MixedIcebergPartitionEvaluator(TableRuntime tableRuntime, String partitio } @Override - public void addFile(IcebergDataFile dataFile, List> deletes) { - super.addFile(dataFile, deletes); + public boolean addFile(IcebergDataFile dataFile, List> deletes) { + if (!super.addFile(dataFile, deletes)) { + return false; + } if (!hasChangeFiles && isChangeFile(dataFile)) { hasChangeFiles = true; } + return true; } @Override @@ -166,7 +165,7 @@ public boolean isMinorNecessary() { } @Override - public boolean shouldRewritePosForSegmentFile(IcebergDataFile dataFile, List> deletes) { + public boolean segmentFileShouldRewritePos(IcebergDataFile dataFile, List> deletes) { if (deletes.stream().anyMatch( delete -> delete.content() == FileContent.EQUALITY_DELETES || delete.content() == FileContent.DATA)) { // change equality delete file's content is DATA @@ -232,17 +231,20 @@ private class TreeNodeTaskSplitter implements TaskSplitter { public List splitTasks(int targetTaskCount) { List result = Lists.newArrayList(); FileTree rootTree = FileTree.newTreeRoot(); - segmentFiles.forEach(rootTree::addSegmentFile); - fragmentFiles.forEach(rootTree::addFragmentFile); + rewritePosDataFiles.forEach(rootTree::addRewritePosDataFile); + rewriteDataFiles.forEach(rootTree::addRewriteDataFile); rootTree.completeTree(); List subTrees = Lists.newArrayList(); rootTree.splitFileTree(subTrees, new SplitIfNoFileExists()); for (FileTree subTree : subTrees) { - Map>> fragmentFiles = Maps.newHashMap(); - Map>> segmentFiles = Maps.newHashMap(); - subTree.collectFragmentFiles(fragmentFiles); - subTree.collectSegmentFiles(segmentFiles); - result.add(new SplitTask(fragmentFiles, segmentFiles)); + Map>> rewriteDataFiles = Maps.newHashMap(); + Map>> rewritePosDataFiles = Maps.newHashMap(); + Set> deleteFiles = Sets.newHashSet(); + subTree.collectRewriteDataFiles(rewriteDataFiles); + subTree.collectRewritePosDataFiles(rewritePosDataFiles); + rewriteDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes)); + rewritePosDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes)); + result.add(new SplitTask(rewriteDataFiles.keySet(), rewritePosDataFiles.keySet(), deleteFiles)); } return result; } @@ -251,8 +253,8 @@ public List splitTasks(int targetTaskCount) { private static class FileTree { private final DataTreeNode node; - private final Map>> fragmentFiles = Maps.newHashMap(); - private final Map>> segmentFiles = Maps.newHashMap(); + private final Map>> rewriteDataFiles = Maps.newHashMap(); + private final Map>> rewritePosDataFiles = Maps.newHashMap(); private FileTree left; private FileTree right; @@ -303,40 +305,40 @@ public void splitFileTree(List collector, Predicate canSplit } } - public void collectFragmentFiles(Map>> collector) { - collector.putAll(fragmentFiles); + public void collectRewriteDataFiles(Map>> collector) { + collector.putAll(rewriteDataFiles); if (left != null) { - left.collectFragmentFiles(collector); + left.collectRewriteDataFiles(collector); } if (right != null) { - right.collectFragmentFiles(collector); + right.collectRewriteDataFiles(collector); } } - public void collectSegmentFiles(Map>> collector) { - collector.putAll(segmentFiles); + public void collectRewritePosDataFiles(Map>> collector) { + collector.putAll(rewritePosDataFiles); if (left != null) { - left.collectSegmentFiles(collector); + left.collectRewritePosDataFiles(collector); } if (right != null) { - right.collectSegmentFiles(collector); + right.collectRewritePosDataFiles(collector); } } - public void addSegmentFile(IcebergDataFile file, List> deleteFiles) { + public void addRewritePosDataFile(IcebergDataFile file, List> deleteFiles) { PrimaryKeyedFile primaryKeyedFile = (PrimaryKeyedFile) file.internalFile(); FileTree node = putNodeIfAbsent(primaryKeyedFile.node()); - node.segmentFiles.put(file, deleteFiles); + node.rewritePosDataFiles.put(file, deleteFiles); } - public void addFragmentFile(IcebergDataFile file, List> deleteFiles) { + public void addRewriteDataFile(IcebergDataFile file, List> deleteFiles) { PrimaryKeyedFile primaryKeyedFile = (PrimaryKeyedFile) file.internalFile(); FileTree node = putNodeIfAbsent(primaryKeyedFile.node()); - node.fragmentFiles.put(file, deleteFiles); + node.rewriteDataFiles.put(file, deleteFiles); } public boolean isRootEmpty() { - return segmentFiles.isEmpty() && fragmentFiles.isEmpty(); + return rewritePosDataFiles.isEmpty() && rewriteDataFiles.isEmpty(); } public boolean isLeaf() { @@ -380,7 +382,7 @@ private void completeTree(boolean ancestorFileExist) { } private boolean fileExist() { - return !segmentFiles.isEmpty() || !fragmentFiles.isEmpty(); + return !rewritePosDataFiles.isEmpty() || !rewriteDataFiles.isEmpty(); } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java index c46aa753ca..e0594787fb 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java @@ -36,7 +36,7 @@ interface Weight extends Comparable { String getPartition(); - void addFile(IcebergDataFile dataFile, List> deletes); + boolean addFile(IcebergDataFile dataFile, List> deletes); void addPartitionProperties(Map properties); From efa3ac5ecd4d5f541a7d7e766bafae4e8989a888 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 24 Aug 2023 17:15:31 +0800 Subject: [PATCH 02/14] fix some comment --- .../arctic/server/optimizing/plan/AbstractPartitionPlan.java | 4 ++-- .../arctic/server/optimizing/plan/MixedHivePartitionPlan.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index 408564d914..cda7545f30 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -52,7 +52,7 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator { protected final Map>> rewriteDataFiles = Maps.newHashMap(); protected final Map>> rewritePosDataFiles = Maps.newHashMap(); - // protected Delete files are Delete files related to Data files not optimized in this plan + // protected Delete files are Delete files which are related to Data files not optimized in this plan protected final Set protectedDeleteFiles = Sets.newHashSet(); public AbstractPartitionPlan(TableRuntime tableRuntime, @@ -99,7 +99,7 @@ public long getCost() { public boolean addFile(IcebergDataFile dataFile, List> deletes) { boolean added = evaluator().addFile(dataFile, deletes); if (!added) { - // if the Data file is not added, it's Delete files should be not be removed from iceberg + // if the Data file is not added, it's Delete files should not be removed from iceberg deletes.stream().map(delete -> delete.path().toString()).forEach(protectedDeleteFiles::add); return false; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index 5ff58c0ca0..4ffa321d51 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -59,7 +59,7 @@ public boolean addFile(IcebergDataFile dataFile, List> del protected void beforeSplit() { super.beforeSplit(); if (evaluator().isFullOptimizing() && moveFiles2CurrentHiveLocation()) { - // This is an improvement for full optimizing of hive table, if there is no delete files, we only have to move + // This is an improvement for full optimizing of hive table, if there are no delete files, we only have to move // files not in hive location to hive location, so the files in the hive location should not be optimizing. Preconditions.checkArgument(protectedDeleteFiles.isEmpty(), "delete files should be empty"); rewriteDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey())); From d2e39bb469346f18440984902d957feefdd30eb3 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 24 Aug 2023 17:16:31 +0800 Subject: [PATCH 03/14] cache reachHiveRefreshInterval --- .../server/optimizing/plan/MixedHivePartitionPlan.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index 4ffa321d51..6709e06ce3 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -108,6 +108,8 @@ protected static class MixedHivePartitionEvaluator extends MixedIcebergPartition private boolean filesNotInHiveLocation = false; // partition property protected long lastHiveOptimizedTime; + + private Boolean reachHiveRefreshInterval; public MixedHivePartitionEvaluator(TableRuntime tableRuntime, String partition, String hiveLocation, long planTime, boolean keyedTable) { @@ -168,7 +170,11 @@ protected boolean hasNewHiveData() { } protected boolean reachHiveRefreshInterval() { - return config.getHiveRefreshInterval() >= 0 && planTime - lastHiveOptimizedTime > config.getHiveRefreshInterval(); + if (reachHiveRefreshInterval == null) { + reachHiveRefreshInterval = + config.getHiveRefreshInterval() >= 0 && planTime - lastHiveOptimizedTime > config.getHiveRefreshInterval(); + } + return reachHiveRefreshInterval; } @Override From 72b07fba33ff970aa5b9c3cfa1dc53463ec01664 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 24 Aug 2023 17:39:20 +0800 Subject: [PATCH 04/14] add docs --- .../arctic/server/optimizing/plan/PartitionEvaluator.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java index e0594787fb..8f3af3b1e9 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java @@ -36,6 +36,12 @@ interface Weight extends Comparable { String getPartition(); + /** + * Add a Data file and its related Delete files to this evaluator + * @param dataFile - Data file + * @param deletes - Delete files + * @return true if the file is added successfully, false if the file will not be optimized + */ boolean addFile(IcebergDataFile dataFile, List> deletes); void addPartitionProperties(Map properties); From 058b33dcf1b1e2aab4a0fe2f7ab5a73e8eff2bbb Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 24 Aug 2023 18:40:28 +0800 Subject: [PATCH 05/14] rename to reservedDeleteFiles --- .../server/optimizing/plan/AbstractPartitionPlan.java | 8 ++++---- .../server/optimizing/plan/MixedHivePartitionPlan.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index cda7545f30..76d632b646 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -52,8 +52,8 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator { protected final Map>> rewriteDataFiles = Maps.newHashMap(); protected final Map>> rewritePosDataFiles = Maps.newHashMap(); - // protected Delete files are Delete files which are related to Data files not optimized in this plan - protected final Set protectedDeleteFiles = Sets.newHashSet(); + // reserved Delete files are Delete files which are related to Data files not optimized in this plan + protected final Set reservedDeleteFiles = Sets.newHashSet(); public AbstractPartitionPlan(TableRuntime tableRuntime, ArcticTable table, String partition, long planTime) { @@ -100,7 +100,7 @@ public boolean addFile(IcebergDataFile dataFile, List> del boolean added = evaluator().addFile(dataFile, deletes); if (!added) { // if the Data file is not added, it's Delete files should not be removed from iceberg - deletes.stream().map(delete -> delete.path().toString()).forEach(protectedDeleteFiles::add); + deletes.stream().map(delete -> delete.path().toString()).forEach(reservedDeleteFiles::add); return false; } if (evaluator().fileShouldRewrite(dataFile, deletes)) { @@ -228,7 +228,7 @@ public TaskDescriptor buildTask(OptimizingInputProperties properties) { Set> readOnlyDeleteFiles = Sets.newHashSet(); Set> rewriteDeleteFiles = Sets.newHashSet(); for (IcebergContentFile deleteFile : deleteFiles) { - if (protectedDeleteFiles.contains(deleteFile.path().toString())) { + if (reservedDeleteFiles.contains(deleteFile.path().toString())) { readOnlyDeleteFiles.add(deleteFile); } else { rewriteDeleteFiles.add(deleteFile); diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index 6709e06ce3..4f8ab5eea7 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -61,7 +61,7 @@ protected void beforeSplit() { if (evaluator().isFullOptimizing() && moveFiles2CurrentHiveLocation()) { // This is an improvement for full optimizing of hive table, if there are no delete files, we only have to move // files not in hive location to hive location, so the files in the hive location should not be optimizing. - Preconditions.checkArgument(protectedDeleteFiles.isEmpty(), "delete files should be empty"); + Preconditions.checkArgument(reservedDeleteFiles.isEmpty(), "delete files should be empty"); rewriteDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey())); rewritePosDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey())); } From bfaaa09bd439da4213993ea6aaa6a20f3c40cd6b Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 24 Aug 2023 18:45:20 +0800 Subject: [PATCH 06/14] add return false --- .../arctic/server/optimizing/plan/AbstractPartitionPlan.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index 76d632b646..3a38485f9e 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -107,6 +107,8 @@ public boolean addFile(IcebergDataFile dataFile, List> del rewriteDataFiles.put(dataFile, deletes); } else if (evaluator().segmentFileShouldRewritePos(dataFile, deletes)) { rewritePosDataFiles.put(dataFile, deletes); + } else { + return false; } return true; } From b5d7e7de55e4f55356ae161d975133992c32ba6f Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 24 Aug 2023 19:06:04 +0800 Subject: [PATCH 07/14] move reachFullInterval to constructor --- .../server/optimizing/plan/CommonPartitionEvaluator.java | 8 +++----- .../server/optimizing/plan/MixedHivePartitionPlan.java | 6 +++--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java index 8ea1b6294b..167d253797 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java @@ -43,6 +43,7 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { protected final OptimizingConfig config; protected final long fragmentSize; protected final long planTime; + private final boolean reachFullInterval; // fragment files protected int fragmentFileCount = 0; @@ -66,7 +67,6 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { private Boolean necessary = null; private OptimizingType optimizingType = null; private String name; - private Boolean reachFullInterval = null; public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, long planTime) { this.partition = partition; @@ -74,6 +74,8 @@ public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, lon this.config = tableRuntime.getOptimizingConfig(); this.fragmentSize = config.getTargetSize() / config.getFragmentRatio(); this.planTime = planTime; + this.reachFullInterval = config.getFullTriggerInterval() >= 0 && + planTime - tableRuntime.getLastFullOptimizingTime() > config.getFullTriggerInterval(); } @Override @@ -248,10 +250,6 @@ protected boolean reachMinorInterval() { } protected boolean reachFullInterval() { - if (reachFullInterval == null) { - reachFullInterval = config.getFullTriggerInterval() >= 0 && - planTime - tableRuntime.getLastFullOptimizingTime() > config.getFullTriggerInterval(); - } return reachFullInterval; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index 4f8ab5eea7..cd0880f647 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -154,15 +154,15 @@ protected boolean isFragmentFile(IcebergDataFile dataFile) { @Override public boolean isFullNecessary() { - if (!reachFullInterval()) { + if (!reachFullInterval() && !reachHiveRefreshInterval()) { return false; } return fragmentFileCount > getBaseSplitCount() || hasNewHiveData(); } @Override - protected boolean reachFullInterval() { - return super.reachFullInterval() || reachHiveRefreshInterval(); + protected boolean isFullOptimizing() { + return reachFullInterval() || reachHiveRefreshInterval(); } protected boolean hasNewHiveData() { From ebef2a0d6a9dc2fad28ff7775a3ed76ab9fe1265 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 24 Aug 2023 19:06:39 +0800 Subject: [PATCH 08/14] set properties first --- .../server/optimizing/plan/OptimizingEvaluator.java | 13 +++++++------ .../optimizing/plan/MixedTablePlanTestBase.java | 8 ++++---- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java index 24ef3e09bb..5fdb0fc717 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java @@ -94,12 +94,7 @@ protected TableFileScanHelper.PartitionFilter getPartitionFilter() { private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { PartitionSpec partitionSpec = arcticTable.spec(); - for (TableFileScanHelper.FileScanResult fileScanResult : tableFileScanHelper.scan()) { - StructLike partition = fileScanResult.file().partition(); - String partitionPath = partitionSpec.partitionToPath(partition); - PartitionEvaluator evaluator = partitionPlanMap.computeIfAbsent(partitionPath, this::buildEvaluator); - evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles()); - } + // add partition properties first, then add files, because files may use these properties partitionProperty().forEach((partition, properties) -> { String partitionToPath = partitionSpec.partitionToPath(partition); PartitionEvaluator evaluator = partitionPlanMap.get(partitionToPath); @@ -107,6 +102,12 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { evaluator.addPartitionProperties(properties); } }); + for (TableFileScanHelper.FileScanResult fileScanResult : tableFileScanHelper.scan()) { + StructLike partition = fileScanResult.file().partition(); + String partitionPath = partitionSpec.partitionToPath(partition); + PartitionEvaluator evaluator = partitionPlanMap.computeIfAbsent(partitionPath, this::buildEvaluator); + evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles()); + } partitionPlanMap.values().removeIf(plan -> !plan.isNecessary()); } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java index ee3fe52c3f..f66a763967 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java @@ -286,10 +286,6 @@ protected List planWithCurrentFiles() { protected AbstractPartitionPlan buildPlanWithCurrentFiles() { TableFileScanHelper tableFileScanHelper = getTableFileScanHelper(); AbstractPartitionPlan partitionPlan = getAndCheckPartitionPlan(); - List scan = tableFileScanHelper.scan(); - for (TableFileScanHelper.FileScanResult fileScanResult : scan) { - partitionPlan.addFile(fileScanResult.file(), fileScanResult.deleteFiles()); - } PartitionSpec spec = getArcticTable().spec(); partitionProperty().forEach((partition, properties) -> { String partitionToPath = spec.partitionToPath(partition); @@ -297,6 +293,10 @@ protected AbstractPartitionPlan buildPlanWithCurrentFiles() { partitionPlan.addPartitionProperties(properties); } }); + List scan = tableFileScanHelper.scan(); + for (TableFileScanHelper.FileScanResult fileScanResult : scan) { + partitionPlan.addFile(fileScanResult.file(), fileScanResult.deleteFiles()); + } return partitionPlan; } From 3fa2b2ba80602508fd9b88460c90479918e37a60 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 24 Aug 2023 19:27:14 +0800 Subject: [PATCH 09/14] add some check code --- .../arctic/server/optimizing/plan/MixedHivePartitionPlan.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index cd0880f647..7941bac011 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -130,6 +130,8 @@ public boolean addFile(IcebergDataFile dataFile, List> del @Override public void addPartitionProperties(Map properties) { + Preconditions.checkArgument(reachHiveRefreshInterval == null, + "partition properties should be added before add files"); super.addPartitionProperties(properties); String optimizedTime = properties.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_TRANSIENT_TIME); if (optimizedTime != null) { From 0913c4d84a9e52a8231d9ab1df40caf7fd327e80 Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 25 Aug 2023 10:27:22 +0800 Subject: [PATCH 10/14] add docs for PartitionEvaluator --- .../optimizing/plan/PartitionEvaluator.java | 62 ++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java index 8f3af3b1e9..0f2e3863f2 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java @@ -25,6 +25,9 @@ import java.util.List; import java.util.Map; +/** + * PartitionEvaluator is used to evaluate whether a partition is necessary to be optimized. + */ public interface PartitionEvaluator { /** @@ -34,40 +37,95 @@ interface Weight extends Comparable { } + /** + * Get the partition name. + * + * @return the partition name + */ String getPartition(); /** * Add a Data file and its related Delete files to this evaluator + * * @param dataFile - Data file * @param deletes - Delete files * @return true if the file is added successfully, false if the file will not be optimized */ boolean addFile(IcebergDataFile dataFile, List> deletes); - + + /** + * Add some properties of this partition. It should be noted that properties should be added before adding any files. + * + * @param properties - properties of this partition + */ void addPartitionProperties(Map properties); + /** + * Whether this partition is necessary to optimize. + * + * @return true for is necessary to optimize, false for not necessary + */ boolean isNecessary(); + /** + * Get the cost of optimizing for this partition. + * + * @return the cost of optimizing + */ long getCost(); - + + /** + * Get the weight of this partition which determines the priority of partition execution. + * + * @return the weight of this partition + */ Weight getWeight(); + /** + * Get the optimizing type of this partition. + * + * @return the OptimizingType + */ OptimizingType getOptimizingType(); + /** + * Get the count of fragment files involved in optimizing. + */ int getFragmentFileCount(); + /** + * Get the total size of fragment files involved in optimizing. + */ long getFragmentFileSize(); + /** + * Get the count of segment files involved in optimizing. + */ int getSegmentFileCount(); + /** + * Get the total size of segment files involved in optimizing. + */ long getSegmentFileSize(); + /** + * Get the count of equality delete files involved in optimizing. + */ int getEqualityDeleteFileCount(); + /** + * Get the total size of equality delete files involved in optimizing. + */ long getEqualityDeleteFileSize(); + /** + * Get the count of positional delete files involved in optimizing. + */ int getPosDeleteFileCount(); + /** + * Get the total size of positional delete files involved in optimizing. + */ long getPosDeleteFileSize(); } From fddc6839b888133d4da90e415235210120d75750 Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 25 Aug 2023 10:53:12 +0800 Subject: [PATCH 11/14] fix add properties first --- .../optimizing/plan/OptimizingEvaluator.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java index 5fdb0fc717..87510245db 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java @@ -94,18 +94,19 @@ protected TableFileScanHelper.PartitionFilter getPartitionFilter() { private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { PartitionSpec partitionSpec = arcticTable.spec(); - // add partition properties first, then add files, because files may use these properties - partitionProperty().forEach((partition, properties) -> { - String partitionToPath = partitionSpec.partitionToPath(partition); - PartitionEvaluator evaluator = partitionPlanMap.get(partitionToPath); - if (evaluator != null) { - evaluator.addPartitionProperties(properties); - } - }); + // add partition properties before adding files + StructLikeMap> partitionProperties = partitionProperty(); for (TableFileScanHelper.FileScanResult fileScanResult : tableFileScanHelper.scan()) { StructLike partition = fileScanResult.file().partition(); String partitionPath = partitionSpec.partitionToPath(partition); - PartitionEvaluator evaluator = partitionPlanMap.computeIfAbsent(partitionPath, this::buildEvaluator); + if (!partitionPlanMap.containsKey(partitionPath)) { + PartitionEvaluator evaluator = buildEvaluator(partitionPath); + if (partitionProperties.containsKey(partition)) { + evaluator.addPartitionProperties(partitionProperties.get(partition)); + } + partitionPlanMap.put(partitionPath, evaluator); + } + PartitionEvaluator evaluator = partitionPlanMap.get(partitionPath); evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles()); } partitionPlanMap.values().removeIf(plan -> !plan.isNecessary()); From 2adbb6e8aefabe7a6f511ab6a89e332b6916e8b4 Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 25 Aug 2023 13:03:00 +0800 Subject: [PATCH 12/14] remove addPartitionProperties from PartitionEvaluator, add it to constructor --- .../plan/AbstractPartitionPlan.java | 10 +++--- .../plan/CommonPartitionEvaluator.java | 9 +++-- .../plan/MixedHivePartitionPlan.java | 35 +++++++------------ .../plan/MixedIcebergPartitionPlan.java | 26 ++++++-------- .../optimizing/plan/OptimizingEvaluator.java | 29 +++++---------- .../optimizing/plan/PartitionEvaluator.java | 8 ----- .../plan/MixedTablePlanTestBase.java | 8 ----- .../arctic/utils/TablePropertyUtil.java | 26 ++++++++++++++ 8 files changed, 66 insertions(+), 85 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index 3a38485f9e..53423bc66a 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -26,6 +26,7 @@ import com.netease.arctic.server.optimizing.OptimizingType; import com.netease.arctic.server.table.TableRuntime; import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.utils.TablePropertyUtil; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -49,6 +50,7 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator { private long fromSequence = INVALID_SEQUENCE; private long toSequence = INVALID_SEQUENCE; protected final long planTime; + protected final Map partitionProperties; protected final Map>> rewriteDataFiles = Maps.newHashMap(); protected final Map>> rewritePosDataFiles = Maps.newHashMap(); @@ -62,6 +64,7 @@ public AbstractPartitionPlan(TableRuntime tableRuntime, this.config = tableRuntime.getOptimizingConfig(); this.tableRuntime = tableRuntime; this.planTime = planTime; + this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, partition); } @Override @@ -77,7 +80,7 @@ protected CommonPartitionEvaluator evaluator() { } protected CommonPartitionEvaluator buildEvaluator() { - return new CommonPartitionEvaluator(tableRuntime, partition, planTime); + return new CommonPartitionEvaluator(tableRuntime, partition, partitionProperties, planTime); } @Override @@ -113,11 +116,6 @@ public boolean addFile(IcebergDataFile dataFile, List> del return true; } - @Override - public void addPartitionProperties(Map properties) { - evaluator().addPartitionProperties(properties); - } - public List splitTasks(int targetTaskCount) { if (taskSplitter == null) { taskSplitter = buildTaskSplitter(); diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java index 167d253797..765984cac5 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java @@ -43,6 +43,7 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { protected final OptimizingConfig config; protected final long fragmentSize; protected final long planTime; + protected final Map partitionProperties; private final boolean reachFullInterval; // fragment files @@ -68,7 +69,8 @@ public class CommonPartitionEvaluator implements PartitionEvaluator { private OptimizingType optimizingType = null; private String name; - public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, long planTime) { + public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, Map partitionProperties, + long planTime) { this.partition = partition; this.tableRuntime = tableRuntime; this.config = tableRuntime.getOptimizingConfig(); @@ -76,6 +78,7 @@ public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, lon this.planTime = planTime; this.reachFullInterval = config.getFullTriggerInterval() >= 0 && planTime - tableRuntime.getLastFullOptimizingTime() > config.getFullTriggerInterval(); + this.partitionProperties = partitionProperties; } @Override @@ -96,10 +99,6 @@ public boolean addFile(IcebergDataFile dataFile, List> del } } - @Override - public void addPartitionProperties(Map properties) { - } - private boolean isDuplicateDelete(IcebergContentFile delete) { boolean deleteExist = deleteFileSet.contains(delete.path().toString()); if (!deleteExist) { diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index 7941bac011..d1b31ac55f 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -78,7 +78,8 @@ protected MixedHivePartitionEvaluator evaluator() { @Override protected CommonPartitionEvaluator buildEvaluator() { - return new MixedHivePartitionEvaluator(tableRuntime, partition, hiveLocation, planTime, isKeyedTable()); + return new MixedHivePartitionEvaluator(tableRuntime, partition, partitionProperties, hiveLocation, planTime, + isKeyedTable()); } @Override @@ -105,16 +106,20 @@ private String constructCustomHiveSubdirectory() { protected static class MixedHivePartitionEvaluator extends MixedIcebergPartitionEvaluator { private final String hiveLocation; + private final boolean reachHiveRefreshInterval; + private boolean filesNotInHiveLocation = false; - // partition property - protected long lastHiveOptimizedTime; - - private Boolean reachHiveRefreshInterval; - public MixedHivePartitionEvaluator(TableRuntime tableRuntime, String partition, String hiveLocation, + public MixedHivePartitionEvaluator(TableRuntime tableRuntime, String partition, + Map partitionProperties, String hiveLocation, long planTime, boolean keyedTable) { - super(tableRuntime, partition, planTime, keyedTable); + super(tableRuntime, partition, partitionProperties, planTime, keyedTable); this.hiveLocation = hiveLocation; + String optimizedTime = partitionProperties.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_TRANSIENT_TIME); + // the unit of transient-time is seconds + long lastHiveOptimizedTime = optimizedTime == null ? 0 : Integer.parseInt(optimizedTime) * 1000L; + this.reachHiveRefreshInterval = + config.getHiveRefreshInterval() >= 0 && planTime - lastHiveOptimizedTime > config.getHiveRefreshInterval(); } @Override @@ -128,18 +133,6 @@ public boolean addFile(IcebergDataFile dataFile, List> del return true; } - @Override - public void addPartitionProperties(Map properties) { - Preconditions.checkArgument(reachHiveRefreshInterval == null, - "partition properties should be added before add files"); - super.addPartitionProperties(properties); - String optimizedTime = properties.get(HiveTableProperties.PARTITION_PROPERTIES_KEY_TRANSIENT_TIME); - if (optimizedTime != null) { - // the unit of transient-time is seconds - this.lastHiveOptimizedTime = Integer.parseInt(optimizedTime) * 1000L; - } - } - @Override protected boolean isFragmentFile(IcebergDataFile dataFile) { PrimaryKeyedFile file = (PrimaryKeyedFile) dataFile.internalFile(); @@ -172,10 +165,6 @@ protected boolean hasNewHiveData() { } protected boolean reachHiveRefreshInterval() { - if (reachHiveRefreshInterval == null) { - reachHiveRefreshInterval = - config.getHiveRefreshInterval() >= 0 && planTime - lastHiveOptimizedTime > config.getHiveRefreshInterval(); - } return reachHiveRefreshInterval; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java index 6034964b64..82bfb2569c 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -89,19 +89,24 @@ protected TaskSplitter buildTaskSplitter() { @Override protected CommonPartitionEvaluator buildEvaluator() { - return new MixedIcebergPartitionEvaluator(tableRuntime, partition, planTime, isKeyedTable()); + return new MixedIcebergPartitionEvaluator(tableRuntime, partition, partitionProperties, planTime, + isKeyedTable()); } protected static class MixedIcebergPartitionEvaluator extends CommonPartitionEvaluator { protected final boolean keyedTable; protected boolean hasChangeFiles = false; - // partition property - protected long lastBaseOptimizedTime; + private final boolean reachBaseRefreshInterval; - public MixedIcebergPartitionEvaluator(TableRuntime tableRuntime, String partition, long planTime, + public MixedIcebergPartitionEvaluator(TableRuntime tableRuntime, String partition, + Map partitionProperties, long planTime, boolean keyedTable) { - super(tableRuntime, partition, planTime); + super(tableRuntime, partition, partitionProperties, planTime); this.keyedTable = keyedTable; + String optimizedTime = partitionProperties.get(TableProperties.PARTITION_BASE_OPTIMIZED_TIME); + long lastBaseOptimizedTime = optimizedTime == null ? 0 : Long.parseLong(optimizedTime); + this.reachBaseRefreshInterval = + config.getBaseRefreshInterval() >= 0 && planTime - lastBaseOptimizedTime > config.getBaseRefreshInterval(); } @Override @@ -115,15 +120,6 @@ public boolean addFile(IcebergDataFile dataFile, List> del return true; } - @Override - public void addPartitionProperties(Map properties) { - super.addPartitionProperties(properties); - String optimizedTime = properties.get(TableProperties.PARTITION_BASE_OPTIMIZED_TIME); - if (optimizedTime != null) { - this.lastBaseOptimizedTime = Long.parseLong(optimizedTime); - } - } - protected boolean isChangeFile(IcebergDataFile dataFile) { if (!keyedTable) { return false; @@ -178,7 +174,7 @@ public boolean segmentFileShouldRewritePos(IcebergDataFile dataFile, List= 0 && planTime - lastBaseOptimizedTime > config.getBaseRefreshInterval(); + return reachBaseRefreshInterval; } protected int getBaseSplitCount() { diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java index 87510245db..0e21c40391 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingEvaluator.java @@ -28,13 +28,13 @@ import com.netease.arctic.server.table.TableSnapshot; import com.netease.arctic.server.utils.IcebergTableUtil; import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.utils.TablePropertyUtil; import com.netease.arctic.utils.TableTypeUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.util.StructLikeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,42 +95,31 @@ protected TableFileScanHelper.PartitionFilter getPartitionFilter() { private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) { PartitionSpec partitionSpec = arcticTable.spec(); // add partition properties before adding files - StructLikeMap> partitionProperties = partitionProperty(); for (TableFileScanHelper.FileScanResult fileScanResult : tableFileScanHelper.scan()) { StructLike partition = fileScanResult.file().partition(); String partitionPath = partitionSpec.partitionToPath(partition); - if (!partitionPlanMap.containsKey(partitionPath)) { - PartitionEvaluator evaluator = buildEvaluator(partitionPath); - if (partitionProperties.containsKey(partition)) { - evaluator.addPartitionProperties(partitionProperties.get(partition)); - } - partitionPlanMap.put(partitionPath, evaluator); - } - PartitionEvaluator evaluator = partitionPlanMap.get(partitionPath); + PartitionEvaluator evaluator = partitionPlanMap.computeIfAbsent(partitionPath, this::buildEvaluator); evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles()); } partitionPlanMap.values().removeIf(plan -> !plan.isNecessary()); } - private StructLikeMap> partitionProperty() { - if (arcticTable.isKeyedTable()) { - return arcticTable.asKeyedTable().baseTable().partitionProperty(); - } else { - return arcticTable.asUnkeyedTable().partitionProperty(); - } + private Map partitionProperties(String partitionPath) { + return TablePropertyUtil.getPartitionProperties(arcticTable, partitionPath); } protected PartitionEvaluator buildEvaluator(String partitionPath) { + Map partitionProperties = partitionProperties(partitionPath); if (TableTypeUtil.isIcebergTableFormat(arcticTable)) { - return new CommonPartitionEvaluator(tableRuntime, partitionPath, System.currentTimeMillis()); + return new CommonPartitionEvaluator(tableRuntime, partitionPath, partitionProperties, System.currentTimeMillis()); } else { if (com.netease.arctic.hive.utils.TableTypeUtil.isHive(arcticTable)) { String hiveLocation = (((SupportHive) arcticTable).hiveLocation()); - return new MixedHivePartitionPlan.MixedHivePartitionEvaluator(tableRuntime, partitionPath, hiveLocation, - System.currentTimeMillis(), arcticTable.isKeyedTable()); + return new MixedHivePartitionPlan.MixedHivePartitionEvaluator(tableRuntime, partitionPath, partitionProperties, + hiveLocation, System.currentTimeMillis(), arcticTable.isKeyedTable()); } else { return new MixedIcebergPartitionPlan.MixedIcebergPartitionEvaluator(tableRuntime, partitionPath, - System.currentTimeMillis(), arcticTable.isKeyedTable()); + partitionProperties, System.currentTimeMillis(), arcticTable.isKeyedTable()); } } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java index 0f2e3863f2..fa319b5593 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/PartitionEvaluator.java @@ -23,7 +23,6 @@ import com.netease.arctic.server.optimizing.OptimizingType; import java.util.List; -import java.util.Map; /** * PartitionEvaluator is used to evaluate whether a partition is necessary to be optimized. @@ -53,13 +52,6 @@ interface Weight extends Comparable { */ boolean addFile(IcebergDataFile dataFile, List> deletes); - /** - * Add some properties of this partition. It should be noted that properties should be added before adding any files. - * - * @param properties - properties of this partition - */ - void addPartitionProperties(Map properties); - /** * Whether this partition is necessary to optimize. * diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java index f66a763967..4ca57a8769 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java @@ -41,7 +41,6 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; import org.apache.iceberg.Transaction; import org.apache.iceberg.data.Record; @@ -286,13 +285,6 @@ protected List planWithCurrentFiles() { protected AbstractPartitionPlan buildPlanWithCurrentFiles() { TableFileScanHelper tableFileScanHelper = getTableFileScanHelper(); AbstractPartitionPlan partitionPlan = getAndCheckPartitionPlan(); - PartitionSpec spec = getArcticTable().spec(); - partitionProperty().forEach((partition, properties) -> { - String partitionToPath = spec.partitionToPath(partition); - if (partitionToPath.equals(partitionPlan.getPartition())) { - partitionPlan.addPartitionProperties(properties); - } - }); List scan = tableFileScanHelper.scan(); for (TableFileScanHelper.FileScanResult fileScanResult : scan) { partitionPlan.addFile(fileScanResult.file(), fileScanResult.deleteFiles()); diff --git a/core/src/main/java/com/netease/arctic/utils/TablePropertyUtil.java b/core/src/main/java/com/netease/arctic/utils/TablePropertyUtil.java index fc1bdd478a..dc01a98f3a 100644 --- a/core/src/main/java/com/netease/arctic/utils/TablePropertyUtil.java +++ b/core/src/main/java/com/netease/arctic/utils/TablePropertyUtil.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; @@ -126,6 +127,31 @@ public static StructLikeMap getPartitionLongProperties(UnkeyedTable unkeye return result; } + public static Map getPartitionProperties(ArcticTable arcticTable, String partitionPath) { + return getPartitionProperties( + arcticTable.isKeyedTable() ? arcticTable.asKeyedTable().baseTable() : arcticTable.asUnkeyedTable(), + partitionPath); + } + + public static Map getPartitionProperties(UnkeyedTable unkeyedTable, String partitionPath) { + StructLike partitionData; + if (unkeyedTable.spec().isUnpartitioned()) { + partitionData = TablePropertyUtil.EMPTY_STRUCT; + } else { + partitionData = ArcticDataFiles.data(unkeyedTable.spec(), partitionPath); + } + return getPartitionProperties(unkeyedTable, partitionData); + } + + public static Map getPartitionProperties(UnkeyedTable unkeyedTable, StructLike partitionData) { + Map result = Maps.newHashMap(); + StructLikeMap> partitionProperty = unkeyedTable.partitionProperty(); + if (partitionProperty.containsKey(partitionData)) { + result = partitionProperty.get(partitionData); + } + return result; + } + public static StructLikeMap getLegacyPartitionMaxTransactionId(KeyedTable keyedTable) { StructLikeMap baseTableMaxTransactionId = StructLikeMap.create(keyedTable.spec().partitionType()); From 26d426f7ec94dddbfd4157999a068fb7f3293665 Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 25 Aug 2023 14:58:15 +0800 Subject: [PATCH 13/14] remove all files if optimizing is not enabled --- .../server/optimizing/plan/CommonPartitionEvaluator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java index 765984cac5..dc71543437 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java @@ -92,6 +92,9 @@ protected boolean isFragmentFile(IcebergDataFile dataFile) { @Override public boolean addFile(IcebergDataFile dataFile, List> deletes) { + if (!config.isEnabled()) { + return false; + } if (isFragmentFile(dataFile)) { return addFragmentFile(dataFile, deletes); } else { From e8fd09c50f358fc26bfd53414d49b9302d4e186f Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 25 Aug 2023 14:58:54 +0800 Subject: [PATCH 14/14] fix reservedDeleteFiles --- .../plan/AbstractPartitionPlan.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index 53423bc66a..90139990bd 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -101,19 +101,20 @@ public long getCost() { @Override public boolean addFile(IcebergDataFile dataFile, List> deletes) { boolean added = evaluator().addFile(dataFile, deletes); + if (added) { + if (evaluator().fileShouldRewrite(dataFile, deletes)) { + rewriteDataFiles.put(dataFile, deletes); + } else if (evaluator().segmentFileShouldRewritePos(dataFile, deletes)) { + rewritePosDataFiles.put(dataFile, deletes); + } else { + added = false; + } + } if (!added) { // if the Data file is not added, it's Delete files should not be removed from iceberg deletes.stream().map(delete -> delete.path().toString()).forEach(reservedDeleteFiles::add); - return false; - } - if (evaluator().fileShouldRewrite(dataFile, deletes)) { - rewriteDataFiles.put(dataFile, deletes); - } else if (evaluator().segmentFileShouldRewritePos(dataFile, deletes)) { - rewritePosDataFiles.put(dataFile, deletes); - } else { - return false; } - return true; + return added; } public List splitTasks(int targetTaskCount) {