Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARCTIC-1213] Optimizing of Mixed Format Table supports optimizing a part of partitions at a time #1220

Merged
merged 2 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package com.netease.arctic.ams.server.model;

import com.netease.arctic.table.TableIdentifier;
import org.jetbrains.annotations.NotNull;

import java.math.BigDecimal;

Expand Down Expand Up @@ -59,7 +58,7 @@ public void setTargetQuota(Double targetQuota) {
}

@Override
public int compareTo(@NotNull TableQuotaInfo o) {
public int compareTo(TableQuotaInfo o) {
if (quota.compareTo(o.getQuota()) != 0) {
return quota.compareTo(o.getQuota());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,15 @@ public abstract class AbstractArcticOptimizePlan extends AbstractOptimizePlan {

public AbstractArcticOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime tableOptimizeRuntime,
List<ContentFileWithSequence<?>> changeFiles,
List<FileScanTask> baseFileScanTasks,
int queueId, long currentTime, long changeSnapshotId, long baseSnapshotId) {
List<FileScanTask> baseFileScanTasks,
int queueId, long currentTime, long changeSnapshotId, long baseSnapshotId) {
super(arcticTable, tableOptimizeRuntime, queueId, currentTime, baseSnapshotId);
this.baseFileScanTasks = baseFileScanTasks;
this.changeFiles = changeFiles;
this.isCustomizeDir = false;
this.currentChangeSnapshotId = changeSnapshotId;
}

@Override
protected boolean limitFileCnt() {
return false;
}

protected BasicOptimizeTask buildOptimizeTask(@Nullable List<DataTreeNode> sourceNodes,
List<DataFile> insertFiles,
List<DataFile> deleteFiles,
Expand Down Expand Up @@ -331,6 +326,13 @@ protected List<DataFile> getInsertFilesFromFileTree(String partition) {
return insertFiles;
}

/**
* Check a base file should optimize
*
* @param baseFile - base file
* @param partition - partition
* @return true if the file should optimize
*/
protected abstract boolean baseFileShouldOptimize(DataFile baseFile, String partition);

protected boolean hasFileToOptimize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ public AbstractIcebergOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime
this.fileScanTasks = fileScanTasks;
}

@Override
protected boolean limitFileCnt() {
return true;
}

protected List<FileScanTask> filterRepeatFileScanTask(Collection<FileScanTask> fileScanTasks) {
Set<String> dataFilesPath = new HashSet<>();
List<FileScanTask> finalFileScanTasks = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class AbstractOptimizePlan {
protected final Set<String> allPartitions = new HashSet<>();
// partitions to optimizing
protected final Set<String> affectedPartitions = new HashSet<>();
private boolean skippedPartitions = false;

private int collectFileCnt = 0;

Expand Down Expand Up @@ -91,45 +92,55 @@ protected List<BasicOptimizeTask> collectTasks(List<String> partitions) {

for (String partition : partitions) {
List<BasicOptimizeTask> optimizeTasks = collectTask(partition);
if (reachMaxFileCount()) {
this.skippedPartitions = true;
LOG.info("{} get enough files {} > {}, ignore left partitions", tableId(), this.collectFileCnt,
getMaxFileCntLimit());
break;
}
if (optimizeTasks.size() > 0) {
this.affectedPartitions.add(partition);
LOG.info("{} partition {} ==== collect {} {} tasks", tableId(), partition, optimizeTasks.size(),
getOptimizeType());
results.addAll(optimizeTasks);
if (reachMaxFileCnt(optimizeTasks)) {
LOG.info("{} get enough files {} > {}, ignore left partitions", tableId(), this.collectFileCnt,
getMaxFileCntLimit());
break;
}
accumulateFileCount(optimizeTasks);
}
}
LOG.info("{} ==== after collect, get {} task of partitions {}/{}", tableId(), getOptimizeType(),
affectedPartitions.size(), partitions.size());
return results;
}

private boolean reachMaxFileCnt(List<BasicOptimizeTask> newTasks) {
private void accumulateFileCount(List<BasicOptimizeTask> newTasks) {
int newFileCnt = 0;
for (BasicOptimizeTask optimizeTask : newTasks) {
int taskFileCnt = optimizeTask.getBaseFileCnt() + optimizeTask.getDeleteFileCnt() +
optimizeTask.getInsertFileCnt() + optimizeTask.getPosDeleteFileCnt();
newFileCnt += taskFileCnt;
}
this.collectFileCnt += newFileCnt;
return limitFileCnt() && this.collectFileCnt >= getMaxFileCntLimit();
}

private boolean reachMaxFileCount() {
return this.collectFileCnt >= getMaxFileCntLimit();
}

private OptimizePlanResult buildOptimizePlanResult(List<BasicOptimizeTask> optimizeTasks) {
long currentChangeSnapshotId = getCurrentChangeSnapshotId();
if (skippedPartitions) {
// if not all partitions are optimized, current change snapshot id should set to -1 to trigger next minor optimize
currentChangeSnapshotId = TableOptimizeRuntime.INVALID_SNAPSHOT_ID;
}
return new OptimizePlanResult(this.affectedPartitions, optimizeTasks, getOptimizeType(), this.currentSnapshotId,
getCurrentChangeSnapshotId(), this.planGroup);
currentChangeSnapshotId, this.planGroup);
}

protected List<String> getPartitionsToOptimizeInOrder() {
List<String> partitionNeedOptimizedInOrder = allPartitions.stream()
.filter(this::partitionNeedPlan)
.map(partition -> new PartitionWeight(partition, getPartitionWeight(partition)))
.map(partition -> new PartitionWeightWrapper(partition, getPartitionWeight(partition)))
.sorted()
.map(PartitionWeight::getPartition)
.map(PartitionWeightWrapper::getPartition)
.collect(Collectors.toList());
if (partitionNeedOptimizedInOrder.size() > 0) {
LOG.info("{} filter partitions to optimize, partition count {}", tableId(),
Expand All @@ -140,12 +151,17 @@ protected List<String> getPartitionsToOptimizeInOrder() {
return partitionNeedOptimizedInOrder;
}

protected long getPartitionWeight(String partitionToPath) {
return 0;
}
/**
* Get the partition weight.
* The optimizing order of partition is decide by partition weight, and the larger weight should be ahead.
*
* @param partition - partition
* @return return partition weight
*/
protected abstract PartitionWeight getPartitionWeight(String partition);

private long getMaxFileCntLimit() {
Map<String, String> properties = arcticTable.asUnkeyedTable().properties();
Map<String, String> properties = arcticTable.properties();
return CompatiblePropertyUtil.propertyAsInt(properties,
TableProperties.SELF_OPTIMIZING_MAX_FILE_CNT, TableProperties.SELF_OPTIMIZING_MAX_FILE_CNT_DEFAULT);
}
Expand All @@ -163,11 +179,15 @@ protected long getSmallFileSize(Map<String, String> properties) {
}
}

private static class PartitionWeight implements Comparable<PartitionWeight> {
protected interface PartitionWeight extends Comparable<PartitionWeight> {

}

protected static class PartitionWeightWrapper implements Comparable<PartitionWeightWrapper> {
private final String partition;
private final long weight;
private final PartitionWeight weight;

public PartitionWeight(String partition, long weight) {
public PartitionWeightWrapper(String partition, PartitionWeight weight) {
this.partition = partition;
this.weight = weight;
}
Expand All @@ -176,7 +196,7 @@ public String getPartition() {
return partition;
}

public long getWeight() {
public PartitionWeight getWeight() {
return weight;
}

Expand All @@ -186,8 +206,8 @@ public String toString() {
}

@Override
public int compareTo(PartitionWeight o) {
return Long.compare(o.weight, this.weight);
public int compareTo(PartitionWeightWrapper o) {
return this.weight.compareTo(o.weight);
}
}

Expand All @@ -204,11 +224,35 @@ protected int getCollectFileCnt() {
}

/**
* this optimizing plan should limit the files by "self-optimizing.max-file-count"
* Check this partition should optimize because of interval.
*
* @param partition - partition
* @return true if the partition should optimize
*/
protected boolean checkOptimizeInterval(String partition) {
long optimizeInterval = getMaxOptimizeInterval();

if (optimizeInterval < 0) {
return false;
}

return this.currentTime - getLatestOptimizeTime(partition) >= optimizeInterval;
}

/**
* Get max optimize interval config of specific optimize type.
*
* @return optimize interval
*/
protected abstract long getMaxOptimizeInterval();

/**
* Get latest optimize time of specific optimize type.
*
* @return true for limit file cnt
* @param partition - partition
* @return time of latest optimize, may be -1
*/
protected abstract boolean limitFileCnt();
protected abstract long getLatestOptimizeTime(String partition);

/**
* check whether partition need to plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
Expand All @@ -48,6 +49,8 @@

public class FullOptimizePlan extends AbstractArcticOptimizePlan {
private static final Logger LOG = LoggerFactory.getLogger(FullOptimizePlan.class);
// cache partition delete file size
private final Map<String, Long> partitionDeleteFileSize = new HashMap<>();

public FullOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime tableOptimizeRuntime,
List<FileScanTask> baseFileScanTasks, int queueId, long currentTime,
Expand All @@ -58,22 +61,47 @@ public FullOptimizePlan(ArcticTable arcticTable, TableOptimizeRuntime tableOptim

@Override
protected boolean partitionNeedPlan(String partitionToPath) {
long current = System.currentTimeMillis();

// check position delete file total size
if (checkPosDeleteTotalSize(partitionToPath)) {
return true;
}

// check full optimize interval
if (checkFullOptimizeInterval(current, partitionToPath)) {
if (checkOptimizeInterval(partitionToPath)) {
return true;
}

LOG.debug("{} ==== don't need {} optimize plan, skip partition {}", tableId(), getOptimizeType(), partitionToPath);
return false;
}

@Override
protected PartitionWeight getPartitionWeight(String partition) {
return new FullPartitionWeight(checkOptimizeInterval(partition), getPosDeleteFileSize(partition));
}

protected static class FullPartitionWeight implements PartitionWeight {
private final boolean reachInterval;

private final long deleteFileSize;

public FullPartitionWeight(boolean reachInterval, long deleteFileSize) {
this.reachInterval = reachInterval;
this.deleteFileSize = deleteFileSize;
}

@Override
public int compareTo(PartitionWeight o) {
FullPartitionWeight that = (FullPartitionWeight) o;
int compare = Boolean.compare(that.reachInterval, this.reachInterval);
if (compare != 0) {
return compare;
}
return Long.compare(that.deleteFileSize, this.deleteFileSize);
}
}


@Override
protected OptimizeType getOptimizeType() {
return OptimizeType.FullMajor;
Expand All @@ -97,11 +125,10 @@ protected boolean baseFileShouldOptimize(DataFile baseFile, String partition) {
}

protected boolean checkPosDeleteTotalSize(String partitionToPath) {
List<DeleteFile> posDeleteFiles = getPosDeleteFilesFromFileTree(partitionToPath);
if (posDeleteFiles.isEmpty()) {
long posDeleteSize = getPosDeleteFileSize(partitionToPath);
if (posDeleteSize <= 0) {
return false;
}
long posDeleteSize = posDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
Map<String, String> properties = arcticTable.properties();
if (!properties.containsKey(TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO) &&
properties.containsKey(TableProperties.FULL_OPTIMIZE_TRIGGER_DELETE_FILE_SIZE_BYTES)) {
Expand All @@ -118,17 +145,27 @@ protected boolean checkPosDeleteTotalSize(String partitionToPath) {
}
}

protected boolean checkFullOptimizeInterval(long current, String partitionToPath) {
long fullMajorOptimizeInterval = CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(),
private long getPosDeleteFileSize(String partition) {
Long cache = partitionDeleteFileSize.get(partition);
if (cache != null) {
return cache;
}
List<DeleteFile> posDeleteFiles = getPosDeleteFilesFromFileTree(partition);
long posDeleteSize = posDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
partitionDeleteFileSize.put(partition, posDeleteSize);
return posDeleteSize;
}

@Override
protected long getMaxOptimizeInterval() {
return CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(),
TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL,
TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL_DEFAULT);
}

if (fullMajorOptimizeInterval != TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL_DEFAULT) {
long lastFullMajorOptimizeTime = tableOptimizeRuntime.getLatestFullOptimizeTime(partitionToPath);
return current - lastFullMajorOptimizeTime >= fullMajorOptimizeInterval;
}

return false;
@Override
protected long getLatestOptimizeTime(String partition) {
return tableOptimizeRuntime.getLatestFullOptimizeTime(partition);
}

/**
Expand Down
Loading