Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARCTIC-1093] Self-Optimizing scan files from metadata instead of from file info cache #1100

Merged
merged 11 commits into from
Feb 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public static void stopMetaStore() {
}

public static boolean isStarted() {
return server != null && server.isServing();
return server != null && server.isServing() && ServiceContainer.getOptimizeService().isInited();
}

public static void failover() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,24 @@

import com.netease.arctic.data.DataFileType;
import com.netease.arctic.data.DataTreeNode;
import org.apache.iceberg.ContentFile;
import com.netease.arctic.data.file.ContentFileWithSequence;
import com.netease.arctic.data.file.DataFileWithSequence;
import com.netease.arctic.data.file.DeleteFileWithSequence;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class FileTree {
private final DataTreeNode node;
private List<DataFile> deleteFiles = new ArrayList<>();
private List<DataFile> insertFiles = new ArrayList<>();
private List<DataFile> baseFiles = new ArrayList<>();
private List<DeleteFile> posDeleteFiles = new ArrayList<>();

// subTree contains any base file
private Boolean findAnyBaseFilesInTree = null;
private final List<DataFileWithSequence> deleteFiles = new ArrayList<>();
private final List<DataFileWithSequence> insertFiles = new ArrayList<>();
private final List<DataFileWithSequence> baseFiles = new ArrayList<>();
private final List<DeleteFileWithSequence> posDeleteFiles = new ArrayList<>();

private FileTree left;
private FileTree right;
Expand Down Expand Up @@ -86,13 +83,13 @@ public FileTree putNodeIfAbsent(@Nonnull DataTreeNode newNode) {
* @param collector - collect result
* @param canSplit - if this tree can split
*/
public void splitSubTree(List<FileTree> collector, Predicate<FileTree> canSplit) {
public void splitFileTree(List<FileTree> collector, Predicate<FileTree> canSplit) {
if (canSplit.test(this)) {
if (left != null) {
left.splitSubTree(collector, canSplit);
left.splitFileTree(collector, canSplit);
}
if (right != null) {
right.splitSubTree(collector, canSplit);
right.splitFileTree(collector, canSplit);
}
} else {
collector.add(this);
Expand Down Expand Up @@ -152,20 +149,12 @@ public void collectDeleteFiles(List<DataFile> collector) {
}

public void collectBaseFiles(List<DataFile> collector) {
collectBaseFiles(collector, false, Collections.emptyList());
}

public void collectBaseFiles(List<DataFile> collector, boolean isMajor, List<DataFile> needOptimizeFiles) {
if (isMajor) {
baseFiles = baseFiles.stream()
.filter(needOptimizeFiles::contains).collect(Collectors.toList());
}
collector.addAll(baseFiles);
if (left != null) {
left.collectBaseFiles(collector, isMajor, needOptimizeFiles);
left.collectBaseFiles(collector);
}
if (right != null) {
right.collectBaseFiles(collector, isMajor, needOptimizeFiles);
right.collectBaseFiles(collector);
}
}

Expand All @@ -190,35 +179,35 @@ public long accumulate(Function<FileTree, Long> function) {
return apply;
}

public List<DataFile> getDeleteFiles() {
public List<DataFileWithSequence> getDeleteFiles() {
return deleteFiles;
}

public List<DataFile> getInsertFiles() {
public List<DataFileWithSequence> getInsertFiles() {
return insertFiles;
}

public List<DataFile> getBaseFiles() {
public List<DataFileWithSequence> getBaseFiles() {
return baseFiles;
}

public List<DeleteFile> getPosDeleteFiles() {
public List<DeleteFileWithSequence> getPosDeleteFiles() {
return posDeleteFiles;
}

public void addFile(ContentFile<?> file, DataFileType fileType) {
public void addFile(ContentFileWithSequence<?> file, DataFileType fileType) {
switch (fileType) {
case BASE_FILE:
baseFiles.add((DataFile) file);
baseFiles.add((DataFileWithSequence) file);
break;
case EQ_DELETE_FILE:
deleteFiles.add((DataFile) file);
deleteFiles.add((DataFileWithSequence) file);
break;
case INSERT_FILE:
insertFiles.add((DataFile) file);
insertFiles.add((DataFileWithSequence) file);
break;
case POS_DELETE_FILE:
posDeleteFiles.add((DeleteFile) file);
posDeleteFiles.add((DeleteFileWithSequence) file);
break;
default:
}
Expand All @@ -228,33 +217,35 @@ public DataTreeNode getNode() {
return node;
}

public void initFiles() {
this.baseFiles = Collections.emptyList();
this.insertFiles = Collections.emptyList();
this.deleteFiles = Collections.emptyList();
this.posDeleteFiles = Collections.emptyList();
public boolean isRootEmpty() {
return baseFiles.isEmpty() && insertFiles.isEmpty() && deleteFiles.isEmpty() && posDeleteFiles.isEmpty();
}

public boolean isLeaf() {
return left == null && right == null;
}

/**
* if parent node exist files, all child node must be balance, in other word, the left child node and
* the right child node are existing or non-existing at same time. Avoid can't cover parent node data
* when split child tree.
* Complete this binary tree to make every subTree of this Tree As a Full Binary Tree(FBT), if any data exists in this
* subTree.
* <p>
* fill the empty node when can't find left/right child node
*
* @param parentFileExist -
* A Full Binary Tree(FBT) is a binary tree in which all the nodes have either 0 or 2 offspring. In other terms, it is
* a binary tree in which all nodes, except the leaf nodes, have two offspring.
* <p>
* To Complete the tree is to avoid ancestor node's data can't be covered when split subTree.
*/
public void completeTree(boolean parentFileExist) {
public void completeTree() {
completeTree(false);
}

private void completeTree(boolean ancestorFileExist) {
if (left == null && right == null) {
return;
}
boolean thisNodeMustBalance = false;
if (parentFileExist) {
thisNodeMustBalance = true;
} else if (fileExist()) {
thisNodeMustBalance = true;
}
// if any ancestor of this node or this node itself contains any file, this node must be balance
boolean thisNodeMustBalance = ancestorFileExist || fileExist();
if (thisNodeMustBalance) {
// fill and empty node if left or right node not exist
if (left == null) {
left = new FileTree(node.left());
}
Expand All @@ -263,33 +254,17 @@ public void completeTree(boolean parentFileExist) {
}
}
if (left != null) {
left.completeTree(parentFileExist || fileExist());
left.completeTree(ancestorFileExist || fileExist());
}
if (right != null) {
right.completeTree(parentFileExist || fileExist());
right.completeTree(ancestorFileExist || fileExist());
}
}

private boolean fileExist() {
return !baseFiles.isEmpty() || !insertFiles.isEmpty() || !deleteFiles.isEmpty();
}

public boolean findAnyBaseFilesInTree() {
if (this.findAnyBaseFilesInTree != null) {
return this.findAnyBaseFilesInTree;
}
boolean leftContainsBaseFiles = false;
boolean rightContainsBaseFiles = false;
if (left != null) {
leftContainsBaseFiles = left.findAnyBaseFilesInTree();
}
if (right != null) {
rightContainsBaseFiles = right.findAnyBaseFilesInTree();
}
this.findAnyBaseFilesInTree = leftContainsBaseFiles || rightContainsBaseFiles || !baseFiles.isEmpty();
return this.findAnyBaseFilesInTree;
}

public boolean anyMatch(Predicate<FileTree> predicate) {
if (predicate.test(this)) {
return true;
Expand Down
Loading