Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1883] Perform file filtering when adding files to PartitionEvaluator instead of during splitting tasks #1886

Merged
merged 16 commits into from
Aug 28, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.netease.arctic.server.optimizing.OptimizingType;
import com.netease.arctic.server.table.TableRuntime;
import com.netease.arctic.table.ArcticTable;
import org.apache.iceberg.FileContent;
import com.netease.arctic.utils.TablePropertyUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand All @@ -50,13 +50,12 @@ public abstract class AbstractPartitionPlan implements PartitionEvaluator {
private long fromSequence = INVALID_SEQUENCE;
private long toSequence = INVALID_SEQUENCE;
protected final long planTime;
protected final Map<String, String> partitionProperties;

protected final Map<IcebergDataFile, List<IcebergContentFile<?>>> fragmentFiles = Maps.newHashMap();
protected final Map<IcebergDataFile, List<IcebergContentFile<?>>> segmentFiles = Maps.newHashMap();
protected final Map<String, Set<IcebergDataFile>> equalityDeleteFileMap = Maps.newHashMap();
protected final Map<String, Set<IcebergDataFile>> posDeleteFileMap = Maps.newHashMap();

private List<SplitTask> splitTasks;
protected final Map<IcebergDataFile, List<IcebergContentFile<?>>> rewriteDataFiles = Maps.newHashMap();
protected final Map<IcebergDataFile, List<IcebergContentFile<?>>> rewritePosDataFiles = Maps.newHashMap();
// reserved Delete files are Delete files which are related to Data files not optimized in this plan
protected final Set<String> reservedDeleteFiles = Sets.newHashSet();

public AbstractPartitionPlan(TableRuntime tableRuntime,
ArcticTable table, String partition, long planTime) {
Expand All @@ -65,6 +64,7 @@ public AbstractPartitionPlan(TableRuntime tableRuntime,
this.config = tableRuntime.getOptimizingConfig();
this.tableRuntime = tableRuntime;
this.planTime = planTime;
this.partitionProperties = TablePropertyUtil.getPartitionProperties(table, partition);
}

@Override
Expand All @@ -80,7 +80,7 @@ protected CommonPartitionEvaluator evaluator() {
}

protected CommonPartitionEvaluator buildEvaluator() {
return new CommonPartitionEvaluator(tableRuntime, partition, planTime);
return new CommonPartitionEvaluator(tableRuntime, partition, partitionProperties, planTime);
}

@Override
Expand All @@ -99,64 +99,41 @@ public long getCost() {
}

@Override
public void addFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
evaluator().addFile(dataFile, deletes);
if (evaluator().isFragmentFile(dataFile)) {
fragmentFiles.put(dataFile, deletes);
} else {
segmentFiles.put(dataFile, deletes);
}
for (IcebergContentFile<?> deleteFile : deletes) {
if (deleteFile.content() == FileContent.POSITION_DELETES) {
posDeleteFileMap
.computeIfAbsent(deleteFile.path().toString(), delete -> Sets.newHashSet())
.add(dataFile);
public boolean addFile(IcebergDataFile dataFile, List<IcebergContentFile<?>> deletes) {
boolean added = evaluator().addFile(dataFile, deletes);
if (added) {
if (evaluator().fileShouldRewrite(dataFile, deletes)) {
rewriteDataFiles.put(dataFile, deletes);
} else if (evaluator().segmentFileShouldRewritePos(dataFile, deletes)) {
rewritePosDataFiles.put(dataFile, deletes);
} else {
equalityDeleteFileMap
.computeIfAbsent(deleteFile.path().toString(), delete -> Sets.newHashSet())
.add(dataFile);
added = false;
}
}
}

@Override
public void addPartitionProperties(Map<String, String> properties) {
evaluator().addPartitionProperties(properties);
if (!added) {
// if the Data file is not added, it's Delete files should not be removed from iceberg
deletes.stream().map(delete -> delete.path().toString()).forEach(reservedDeleteFiles::add);
}
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
return added;
}

public List<TaskDescriptor> splitTasks(int targetTaskCount) {
if (taskSplitter == null) {
taskSplitter = buildTaskSplitter();
}
this.splitTasks = taskSplitter.splitTasks(targetTaskCount).stream()
.filter(this::taskNeedExecute).collect(Collectors.toList());
return this.splitTasks.stream()
beforeSplit();
return taskSplitter.splitTasks(targetTaskCount).stream()
.map(task -> task.buildTask(buildTaskProperties()))
.collect(Collectors.toList());
}

protected boolean taskNeedExecute(SplitTask task) {
// if there are no delete files and no more than 1 rewrite files, we should not execute
return !task.getDeleteFiles().isEmpty() || task.getRewriteDataFiles().size() > 1;
}

private boolean isOptimizing(IcebergDataFile dataFile) {
return this.splitTasks.stream().anyMatch(task -> task.contains(dataFile));

protected void beforeSplit() {
}

protected abstract TaskSplitter buildTaskSplitter();

protected abstract OptimizingInputProperties buildTaskProperties();

protected boolean fileShouldFullOptimizing(IcebergDataFile dataFile, List<IcebergContentFile<?>> deleteFiles) {
if (config.isFullRewriteAllFiles()) {
return true;
} else {
// if a file is related any delete files or is not big enough, it should full optimizing
return !deleteFiles.isEmpty() || dataFile.fileSizeInBytes() < config.getTargetSize() * 0.9;
}
}

protected void markSequence(long sequence) {
if (fromSequence == INVALID_SEQUENCE || fromSequence > sequence) {
fromSequence = sequence;
Expand Down Expand Up @@ -225,39 +202,15 @@ public Weight getWeight() {

protected class SplitTask {
private final Set<IcebergDataFile> rewriteDataFiles = Sets.newHashSet();
private final Set<IcebergContentFile<?>> deleteFiles = Sets.newHashSet();
private final Set<IcebergDataFile> rewritePosDataFiles = Sets.newHashSet();
private final Set<IcebergContentFile<?>> deleteFiles = Sets.newHashSet();

public SplitTask(Map<IcebergDataFile, List<IcebergContentFile<?>>> fragmentFiles,
Map<IcebergDataFile, List<IcebergContentFile<?>>> segmentFiles) {
if (evaluator().isFullNecessary()) {
fragmentFiles.forEach((icebergFile, deleteFileSet) -> {
if (fileShouldFullOptimizing(icebergFile, deleteFileSet)) {
rewriteDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
}
});
segmentFiles.forEach((icebergFile, deleteFileSet) -> {
if (fileShouldFullOptimizing(icebergFile, deleteFileSet)) {
rewriteDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
}
});
} else {
fragmentFiles.forEach((icebergFile, deleteFileSet) -> {
rewriteDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
});
segmentFiles.forEach((icebergFile, deleteFileSet) -> {
if (evaluator().shouldRewriteSegmentFile(icebergFile, deleteFileSet)) {
rewriteDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
} else if (evaluator.shouldRewritePosForSegmentFile(icebergFile, deleteFileSet)) {
rewritePosDataFiles.add(icebergFile);
deleteFiles.addAll(deleteFileSet);
}
});
}
public SplitTask(Set<IcebergDataFile> rewriteDataFiles,
Set<IcebergDataFile> rewritePosDataFiles,
Set<IcebergContentFile<?>> deleteFiles) {
this.rewriteDataFiles.addAll(rewriteDataFiles);
this.rewritePosDataFiles.addAll(rewritePosDataFiles);
this.deleteFiles.addAll(deleteFiles);
}

public Set<IcebergDataFile> getRewriteDataFiles() {
Expand All @@ -272,23 +225,11 @@ public Set<IcebergDataFile> getRewritePosDataFiles() {
return rewritePosDataFiles;
}

public boolean contains(IcebergDataFile dataFile) {
return rewriteDataFiles.contains(dataFile) || rewritePosDataFiles.contains(dataFile);
}

public TaskDescriptor buildTask(OptimizingInputProperties properties) {
Set<IcebergContentFile<?>> readOnlyDeleteFiles = Sets.newHashSet();
Set<IcebergContentFile<?>> rewriteDeleteFiles = Sets.newHashSet();
for (IcebergContentFile<?> deleteFile : deleteFiles) {
Set<IcebergDataFile> relatedDataFiles;
if (deleteFile.content() == FileContent.POSITION_DELETES) {
relatedDataFiles = posDeleteFileMap.get(deleteFile.path().toString());
} else {
relatedDataFiles = equalityDeleteFileMap.get(deleteFile.path().toString());
}
boolean findDataFileNotOptimizing =
relatedDataFiles.stream().anyMatch(file -> !contains(file) && !isOptimizing(file));
if (findDataFileNotOptimizing) {
if (reservedDeleteFiles.contains(deleteFile.path().toString())) {
readOnlyDeleteFiles.add(deleteFile);
} else {
rewriteDeleteFiles.add(deleteFile);
Expand All @@ -311,12 +252,12 @@ public TaskDescriptor buildTask(OptimizingInputProperties properties) {
protected static class FileTask {
private final IcebergDataFile file;
private final List<IcebergContentFile<?>> deleteFiles;
private final boolean isFragment;
private final boolean isRewriteDataFile;

public FileTask(IcebergDataFile file, List<IcebergContentFile<?>> deleteFiles, boolean isFragment) {
public FileTask(IcebergDataFile file, List<IcebergContentFile<?>> deleteFiles, boolean isRewriteDataFile) {
this.file = file;
this.deleteFiles = deleteFiles;
this.isFragment = isFragment;
this.isRewriteDataFile = isRewriteDataFile;
}

public IcebergDataFile getFile() {
Expand All @@ -327,25 +268,24 @@ public List<IcebergContentFile<?>> getDeleteFiles() {
return deleteFiles;
}

public boolean isFragment() {
return isFragment;
public boolean isRewriteDataFile() {
return isRewriteDataFile;
}

public boolean isSegment() {
return !isFragment;
public boolean isRewritePosDataFile() {
return !isRewriteDataFile;
}
}

protected class BinPackingTaskSplitter implements TaskSplitter {

@Override
public List<SplitTask> splitTasks(int targetTaskCount) {
List<FileTask> allDataFiles = Lists.newLinkedList();
// If there are only one fragment file in a bin, the split task base on the bin will be un-executable
// Therefore prioritize packaging of fragment files to increase the executable rate(see method taskNeedExecute)
fragmentFiles.forEach((dataFile, deleteFiles) ->
// bin-packing
List<FileTask> allDataFiles = Lists.newArrayList();
rewriteDataFiles.forEach((dataFile, deleteFiles) ->
allDataFiles.add(new FileTask(dataFile, deleteFiles, true)));
segmentFiles.forEach((dataFile, deleteFiles) ->
rewritePosDataFiles.forEach((dataFile, deleteFiles) ->
allDataFiles.add(new FileTask(dataFile, deleteFiles, false)));

List<List<FileTask>> packed = new BinPacking.ListPacker<FileTask>(
Expand All @@ -355,13 +295,19 @@ public List<SplitTask> splitTasks(int targetTaskCount) {
// collect
List<SplitTask> results = Lists.newArrayListWithCapacity(packed.size());
for (List<FileTask> fileTasks : packed) {
Map<IcebergDataFile, List<IcebergContentFile<?>>> fragmentFiles = Maps.newHashMap();
Map<IcebergDataFile, List<IcebergContentFile<?>>> segmentFiles = Maps.newHashMap();
fileTasks.stream().filter(FileTask::isFragment)
.forEach(f -> fragmentFiles.put(f.getFile(), f.getDeleteFiles()));
fileTasks.stream().filter(FileTask::isSegment)
.forEach(f -> segmentFiles.put(f.getFile(), f.getDeleteFiles()));
results.add(new SplitTask(fragmentFiles, segmentFiles));
Set<IcebergDataFile> rewriteDataFiles = Sets.newHashSet();
Set<IcebergDataFile> rewritePosDataFiles = Sets.newHashSet();
Set<IcebergContentFile<?>> deleteFiles = Sets.newHashSet();

fileTasks.stream().filter(FileTask::isRewriteDataFile).forEach(f -> {
rewriteDataFiles.add(f.getFile());
deleteFiles.addAll(f.getDeleteFiles());
});
fileTasks.stream().filter(FileTask::isRewritePosDataFile).forEach(f -> {
rewritePosDataFiles.add(f.getFile());
deleteFiles.addAll(f.getDeleteFiles());
});
results.add(new SplitTask(rewriteDataFiles, rewritePosDataFiles, deleteFiles));
}
return results;
}
Expand Down
Loading