Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1883] Perform file filtering when adding files to PartitionEvaluator instead of during splitting tasks #1886

Merged
merged 16 commits into from
Aug 28, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.netease.arctic.server.optimizing.OptimizingType;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.table.ArcticTable;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand All @@ -51,12 +50,10 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator {
private long toSequence = INVALID_SEQUENCE;
protected final long planTime;

protected final Map<IcebergDataFile, List<IcebergContentFile<?>>> fragmentFiles = Maps.newHashMap();
protected final Map<IcebergDataFile, List<IcebergContentFile<?>>> segmentFiles = Maps.newHashMap();
protected final Map<String, Set<IcebergDataFile>> equalityDeleteFileMap = Maps.newHashMap();
protected final Map<String, Set<IcebergDataFile>> posDeleteFileMap = Maps.newHashMap();

private List<SplitTask> splitTasks;
protected final Map<IcebergDataFile, List<IcebergContentFile<?>>> rewriteDataFiles = Maps.newHashMap();
protected final Map<IcebergDataFile, List<IcebergContentFile<?>>> rewritePosDataFiles = Maps.newHashMap();
// protected Delete files are Delete files related to Data files not optimized in this plan
protected final Set<String> protectedDeleteFiles = Sets.newHashSet();
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved

public AbstractPartitionPlan(TableRuntime tableRuntime,
ArcticTable table, String partition, long planTime) {
Expand Down Expand Up @@ -99,24 +96,19 @@ public long getCost() {
}

@Override
public void addFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
evaluator().addFile(dataFile, deletes);
if (evaluator().isFragmentFile(dataFile)) {
fragmentFiles.put(dataFile, deletes);
} else {
segmentFiles.put(dataFile, deletes);
public boolean addFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
boolean added = evaluator().addFile(dataFile, deletes);
if (!added) {
// if the Data file is not added, it's Delete files should be not be removed from iceberg
deletes.stream().map(delete -> delete.path().toString()).forEach(protectedDeleteFiles::add);
return false;
}
for (IcebergContentFile<?> deleteFile : deletes) {
if (deleteFile.content() == FileContent.POSITION_DELETES) {
posDeleteFileMap
.computeIfAbsent(deleteFile.path().toString(), delete -> Sets.newHashSet())
.add(dataFile);
} else {
equalityDeleteFileMap
.computeIfAbsent(deleteFile.path().toString(), delete -> Sets.newHashSet())
.add(dataFile);
}
if (evaluator().fileShouldRewrite(dataFile, deletes)) {
rewriteDataFiles.put(dataFile, deletes);
} else if (evaluator().segmentFileShouldRewritePos(dataFile, deletes)) {
rewritePosDataFiles.put(dataFile, deletes);
}
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

@Override
Expand All @@ -128,35 +120,19 @@ public List<TaskDescriptor> splitTasks(int targetTaskCount) {
if (taskSplitter == null) {
taskSplitter = buildTaskSplitter();
}
this.splitTasks = taskSplitter.splitTasks(targetTaskCount).stream()
.filter(this::taskNeedExecute).collect(Collectors.toList());
return this.splitTasks.stream()
beforeSplit();
return taskSplitter.splitTasks(targetTaskCount).stream()
.map(task -> task.buildTask(buildTaskProperties()))
.collect(Collectors.toList());
}

protected boolean taskNeedExecute(SplitTask task) {
// if there are no delete files and no more than 1 rewrite files, we should not execute
return !task.getDeleteFiles().isEmpty() || task.getRewriteDataFiles().size() > 1;
}

private boolean isOptimizing(IcebergDataFile dataFile) {
return this.splitTasks.stream().anyMatch(task -> task.contains(dataFile));

protected void beforeSplit() {
}

protected abstract TaskSplitter buildTaskSplitter();

protected abstract OptimizingInputProperties buildTaskProperties();

protected boolean fileShouldFullOptimizing(IcebergDataFile dataFile, List<IcebergContentFile<?>> deleteFiles) {
if (config.isFullRewriteAllFiles()) {
return true;
} else {
// if a file is related any delete files or is not big enough, it should full optimizing
return !deleteFiles.isEmpty() || dataFile.fileSizeInBytes() < config.getTargetSize() * 0.9;
}
}

protected void markSequence(long sequence) {
if (fromSequence == INVALID_SEQUENCE || fromSequence > sequence) {
fromSequence = sequence;
Expand Down Expand Up @@ -225,39 +201,15 @@ public Weight getWeight() {

protected class SplitTask {
private final Set<IcebergDataFile> rewriteDataFiles = Sets.newHashSet();
private final Set<IcebergContentFile<?>> deleteFiles = Sets.newHashSet();
private final Set<IcebergDataFile> rewritePosDataFiles = Sets.newHashSet();
private final Set<IcebergContentFile<?>> deleteFiles = Sets.newHashSet();

public SplitTask(Map<IcebergDataFile, List<IcebergContentFile<?>>> fragmentFiles,
Map<IcebergDataFile, List<IcebergContentFile<?>>> segmentFiles) {
if (evaluator().isFullNecessary()) {
fragmentFiles.forEach((icebergFile, deleteFileSet) -> {
if (fileShouldFullOptimizing(icebergFile, deleteFileSet)) {
rewriteDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
}
});
segmentFiles.forEach((icebergFile, deleteFileSet) -> {
if (fileShouldFullOptimizing(icebergFile, deleteFileSet)) {
rewriteDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
}
});
} else {
fragmentFiles.forEach((icebergFile, deleteFileSet) -> {
rewriteDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
});
segmentFiles.forEach((icebergFile, deleteFileSet) -> {
if (evaluator().shouldRewriteSegmentFile(icebergFile, deleteFileSet)) {
rewriteDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
} else if (evaluator.shouldRewritePosForSegmentFile(icebergFile, deleteFileSet)) {
rewritePosDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
}
});
}
public SplitTask(Set<IcebergDataFile> rewriteDataFiles,
Set<IcebergDataFile> rewritePosDataFiles,
Set<IcebergContentFile<?>> deleteFiles) {
this.rewriteDataFiles.addAll(rewriteDataFiles);
this.rewritePosDataFiles.addAll(rewritePosDataFiles);
this.deleteFiles.addAll(deleteFiles);
}

public Set<IcebergDataFile> getRewriteDataFiles() {
Expand All @@ -272,23 +224,11 @@ public Set<IcebergDataFile> getRewritePosDataFiles() {
return rewritePosDataFiles;
}

public boolean contains(IcebergDataFile dataFile) {
return rewriteDataFiles.contains(dataFile) || rewritePosDataFiles.contains(dataFile);
}

public TaskDescriptor buildTask(OptimizingInputProperties properties) {
Set<IcebergContentFile<?>> readOnlyDeleteFiles = Sets.newHashSet();
Set<IcebergContentFile<?>> rewriteDeleteFiles = Sets.newHashSet();
for (IcebergContentFile<?> deleteFile : deleteFiles) {
Set<IcebergDataFile> relatedDataFiles;
if (deleteFile.content() == FileContent.POSITION_DELETES) {
relatedDataFiles = posDeleteFileMap.get(deleteFile.path().toString());
} else {
relatedDataFiles = equalityDeleteFileMap.get(deleteFile.path().toString());
}
boolean findDataFileNotOptimizing =
relatedDataFiles.stream().anyMatch(file -> !contains(file) && !isOptimizing(file));
if (findDataFileNotOptimizing) {
if (protectedDeleteFiles.contains(deleteFile.path().toString())) {
readOnlyDeleteFiles.add(deleteFile);
} else {
rewriteDeleteFiles.add(deleteFile);
Expand All @@ -311,12 +251,12 @@ public TaskDescriptor buildTask(OptimizingInputProperties properties) {
protected static class FileTask {
private final IcebergDataFile file;
private final List<IcebergContentFile<?>> deleteFiles;
private final boolean isFragment;
private final boolean isRewriteDataFile;

public FileTask(IcebergDataFile file, List<IcebergContentFile<?>> deleteFiles, boolean isFragment) {
public FileTask(IcebergDataFile file, List<IcebergContentFile<?>> deleteFiles, boolean isRewriteDataFile) {
this.file = file;
this.deleteFiles = deleteFiles;
this.isFragment = isFragment;
this.isRewriteDataFile = isRewriteDataFile;
}

public IcebergDataFile getFile() {
Expand All @@ -327,12 +267,12 @@ public List<IcebergContentFile<?>> getDeleteFiles() {
return deleteFiles;
}

public boolean isFragment() {
return isFragment;
public boolean isRewriteDataFile() {
return isRewriteDataFile;
}

public boolean isSegment() {
return !isFragment;
public boolean isRewritePosDataFile() {
return !isRewriteDataFile;
}
}

Expand All @@ -342,10 +282,10 @@ protected class BinPackingTaskSplitter implements TaskSplitter {
public List<SplitTask> splitTasks(int targetTaskCount) {
// bin-packing
List<FileTask> allDataFiles = Lists.newArrayList();
segmentFiles.forEach((dataFile, deleteFiles) ->
allDataFiles.add(new FileTask(dataFile, deleteFiles, false)));
fragmentFiles.forEach((dataFile, deleteFiles) ->
rewriteDataFiles.forEach((dataFile, deleteFiles) ->
allDataFiles.add(new FileTask(dataFile, deleteFiles, true)));
rewritePosDataFiles.forEach((dataFile, deleteFiles) ->
allDataFiles.add(new FileTask(dataFile, deleteFiles, false)));

long taskSize = config.getTargetSize();
Long sum = allDataFiles.stream().map(f -> f.getFile().fileSizeInBytes()).reduce(0L, Long::sum);
Expand All @@ -356,13 +296,19 @@ public List<SplitTask> splitTasks(int targetTaskCount) {
// collect
List<SplitTask> results = Lists.newArrayList();
for (List<FileTask> fileTasks : packed) {
Map<IcebergDataFile, List<IcebergContentFile<?>>> fragmentFiles = Maps.newHashMap();
Map<IcebergDataFile, List<IcebergContentFile<?>>> segmentFiles = Maps.newHashMap();
fileTasks.stream().filter(FileTask::isFragment)
.forEach(f -> fragmentFiles.put(f.getFile(), f.getDeleteFiles()));
fileTasks.stream().filter(FileTask::isSegment)
.forEach(f -> segmentFiles.put(f.getFile(), f.getDeleteFiles()));
results.add(new SplitTask(fragmentFiles, segmentFiles));
Set<IcebergDataFile> rewriteDataFiles = Sets.newHashSet();
Set<IcebergDataFile> rewritePosDataFiles = Sets.newHashSet();
Set<IcebergContentFile<?>> deleteFiles = Sets.newHashSet();

fileTasks.stream().filter(FileTask::isRewriteDataFile).forEach(f -> {
rewriteDataFiles.add(f.getFile());
deleteFiles.addAll(f.getDeleteFiles());
});
fileTasks.stream().filter(FileTask::isRewritePosDataFile).forEach(f -> {
rewritePosDataFiles.add(f.getFile());
deleteFiles.addAll(f.getDeleteFiles());
});
results.add(new SplitTask(rewriteDataFiles, rewritePosDataFiles, deleteFiles));
}
return results;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class CommonPartitionEvaluator implements PartitionEvaluator {
private Boolean necessary = null;
private OptimizingType optimizingType = null;
private String name;
private Boolean reachFullInterval = null;

public CommonPartitionEvaluator(TableRuntime tableRuntime, String partition, long planTime) {
this.partition = partition;
Expand All @@ -85,11 +86,11 @@ protected boolean isFragmentFile(IcebergDataFile dataFile) {
}

@Override
public void addFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
public boolean addFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
if (isFragmentFile(dataFile)) {
addFragmentFile(dataFile, deletes);
return addFragmentFile(dataFile, deletes);
} else {
addSegmentFile(dataFile, deletes);
return addSegmentFile(dataFile, deletes);
}
}

Expand All @@ -105,36 +106,66 @@ private boolean isDuplicateDelete(IcebergContentFile<?> delete) {
return deleteExist;
}

private void addFragmentFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
private boolean addFragmentFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
if (!fileShouldRewrite(dataFile, deletes)) {
return false;
}
fragmentFileSize += dataFile.fileSizeInBytes();
fragmentFileCount += 1;

for (IcebergContentFile<?> delete : deletes) {
addDelete(delete);
}
return true;
}

private void addSegmentFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
segmentFileSize += dataFile.fileSizeInBytes();
segmentFileCount += 1;

if (shouldRewriteSegmentFile(dataFile, deletes)) {
private boolean addSegmentFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
if (fileShouldRewrite(dataFile, deletes)) {
rewriteSegmentFileSize += dataFile.fileSizeInBytes();
rewriteSegmentFileCount += 1;
} else if (shouldRewritePosForSegmentFile(dataFile, deletes)) {
} else if (segmentFileShouldRewritePos(dataFile, deletes)) {
rewritePosSegmentFileSize += dataFile.fileSizeInBytes();
rewritePosSegmentFileCount += 1;
} else {
return false;
}

segmentFileSize += dataFile.fileSizeInBytes();
segmentFileCount += 1;
for (IcebergContentFile<?> delete : deletes) {
addDelete(delete);
}
return true;
}

protected boolean fileShouldFullOptimizing(IcebergDataFile dataFile, List<IcebergContentFile<?>> deleteFiles) {
if (config.isFullRewriteAllFiles()) {
return true;
}
if (isFragmentFile(dataFile)) {
return true;
}
// if a file is related any delete files or is not big enough, it should full optimizing
return !deleteFiles.isEmpty() || dataFile.fileSizeInBytes() < config.getTargetSize() * 0.9;
}

public boolean shouldRewriteSegmentFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
public boolean fileShouldRewrite(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
if (isFullOptimizing()) {
return fileShouldFullOptimizing(dataFile, deletes);
}
if (isFragmentFile(dataFile)) {
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
return getRecordCount(deletes) > dataFile.recordCount() * config.getMajorDuplicateRatio();
}

public boolean shouldRewritePosForSegmentFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
public boolean segmentFileShouldRewritePos(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
if (isFullOptimizing()) {
return false;
}
if (isFragmentFile(dataFile)) {
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
if (deletes.stream().anyMatch(delete -> delete.content() == FileContent.EQUALITY_DELETES)) {
return true;
} else if (deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count() >= 2) {
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -143,6 +174,10 @@ public boolean shouldRewritePosForSegmentFile(IcebergDataFile dataFile, List<Ice
return false;
}
}

protected boolean isFullOptimizing() {
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
return reachFullInterval();
}

private long getRecordCount(List<IcebergContentFile<?>> files) {
return files.stream().mapToLong(ContentFile::recordCount).sum();
Expand Down Expand Up @@ -213,8 +248,11 @@ protected boolean reachMinorInterval() {
}

protected boolean reachFullInterval() {
return config.getFullTriggerInterval() >= 0 &&
planTime - tableRuntime.getLastFullOptimizingTime() > config.getFullTriggerInterval();
if (reachFullInterval == null) {
reachFullInterval = config.getFullTriggerInterval() >= 0 &&
planTime - tableRuntime.getLastFullOptimizingTime() > config.getFullTriggerInterval();
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
}
return reachFullInterval;
}

public boolean isFullNecessary() {
Expand Down
Loading