Skip to content

Commit

Permalink
[hotfix] Fix deleting Puffin files when cleaning orphan files (apache…
Browse files Browse the repository at this point in the history
…#2320)

* fix orphan files deleting puffin files

* using Sets.union instead of ImmutableSet Builder

* fix
  • Loading branch information
wangtaohz authored and ShawHee committed Dec 29, 2023
1 parent 49196a0 commit ead775f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -200,7 +201,9 @@ protected Set<String> expireSnapshotNeedToExcludeFiles() {
}

protected Set<String> orphanFileCleanNeedToExcludeFiles() {
return IcebergTableUtil.getAllContentFilePath(table);
return Sets.union(
IcebergTableUtil.getAllContentFilePath(table),
IcebergTableUtil.getAllStatisticsFilePath(table));
}

protected ArcticFileIO arcticFileIO() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.netease.arctic.server.utils.HiveLocationUtil;
import com.netease.arctic.server.utils.IcebergTableUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.BaseTable;
import com.netease.arctic.table.ChangeTable;
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
Expand All @@ -39,6 +41,7 @@
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -73,15 +76,25 @@ public class MixedTableMaintainer implements TableMaintainer {
public MixedTableMaintainer(ArcticTable arcticTable) {
this.arcticTable = arcticTable;
if (arcticTable.isKeyedTable()) {
changeMaintainer = new ChangeTableMaintainer(arcticTable.asKeyedTable().changeTable());
baseMaintainer = new BaseTableMaintainer(arcticTable.asKeyedTable().baseTable());
ChangeTable changeTable = arcticTable.asKeyedTable().changeTable();
BaseTable baseTable = arcticTable.asKeyedTable().baseTable();
changeMaintainer = new ChangeTableMaintainer(changeTable);
baseMaintainer = new BaseTableMaintainer(baseTable);
changeFiles =
IcebergTableUtil.getAllContentFilePath(arcticTable.asKeyedTable().changeTable());
baseFiles = IcebergTableUtil.getAllContentFilePath(arcticTable.asKeyedTable().baseTable());
Sets.union(
IcebergTableUtil.getAllContentFilePath(changeTable),
IcebergTableUtil.getAllStatisticsFilePath(changeTable));
baseFiles =
Sets.union(
IcebergTableUtil.getAllContentFilePath(baseTable),
IcebergTableUtil.getAllStatisticsFilePath(baseTable));
} else {
baseMaintainer = new BaseTableMaintainer(arcticTable.asUnkeyedTable());
changeFiles = new HashSet<>();
baseFiles = IcebergTableUtil.getAllContentFilePath(arcticTable.asUnkeyedTable());
baseFiles =
Sets.union(
IcebergTableUtil.getAllContentFilePath(arcticTable.asUnkeyedTable()),
IcebergTableUtil.getAllStatisticsFilePath(arcticTable.asUnkeyedTable()));
}

if (TableTypeUtil.isHive(arcticTable)) {
Expand Down Expand Up @@ -143,15 +156,6 @@ public BaseTableMaintainer getBaseMaintainer() {
return baseMaintainer;
}

@SafeVarargs
private final Set<String> mergeSets(Set<String>... sets) {
Set<String> result = new HashSet<>();
for (Set<String> set : sets) {
result.addAll(set);
}
return result;
}

public class ChangeTableMaintainer extends IcebergTableMaintainer {

private static final int DATA_FILE_LIST_SPLIT = 3000;
Expand All @@ -165,7 +169,7 @@ public ChangeTableMaintainer(UnkeyedTable unkeyedTable) {

@Override
public Set<String> orphanFileCleanNeedToExcludeFiles() {
return mergeSets(changeFiles, baseFiles, hiveFiles);
return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles));
}

@Override
Expand All @@ -188,7 +192,7 @@ protected long olderThanSnapshotNeedToExpire(long mustOlderThan) {

@Override
protected Set<String> expireSnapshotNeedToExcludeFiles() {
return mergeSets(baseFiles, hiveFiles);
return Sets.union(baseFiles, hiveFiles);
}

public void expireFiles(long ttlPoint) {
Expand Down Expand Up @@ -324,12 +328,12 @@ public BaseTableMaintainer(UnkeyedTable unkeyedTable) {

@Override
public Set<String> orphanFileCleanNeedToExcludeFiles() {
return mergeSets(changeFiles, baseFiles, hiveFiles);
return Sets.union(changeFiles, Sets.union(baseFiles, hiveFiles));
}

@Override
protected Set<String> expireSnapshotNeedToExcludeFiles() {
return mergeSets(changeFiles, hiveFiles);
return Sets.union(changeFiles, hiveFiles);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
Expand All @@ -43,6 +44,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

public class IcebergTableUtil {

Expand Down Expand Up @@ -93,6 +95,12 @@ public static Set<String> getAllContentFilePath(Table internalTable) {
return validFilesPath;
}

public static Set<String> getAllStatisticsFilePath(Table table) {
return ReachableFileUtil.statisticsFilesLocations(table).stream()
.map(TableFileUtil::getUriPath)
.collect(Collectors.toSet());
}

public static Set<DeleteFile> getDanglingDeleteFiles(Table internalTable) {
if (internalTable.currentSnapshot() == null) {
return Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,21 +334,27 @@ public void notDeleteStatisticsFile() {
} else {
unkeyedTable = getArcticTable().asUnkeyedTable();
}
unkeyedTable.newAppend().commit();
Snapshot snapshot = unkeyedTable.currentSnapshot();
StatisticsFile file = writeStatisticsFile(unkeyedTable, snapshot);
unkeyedTable.updateStatistics().setStatistics(snapshot.snapshotId(), file).commit();

Assert.assertTrue(unkeyedTable.io().exists(file.path()));
StatisticsFile file1 =
commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/metadata/test1.puffin");
StatisticsFile file2 =
commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/data/test2.puffin");
StatisticsFile file3 =
commitStatisticsFile(unkeyedTable, unkeyedTable.location() + "/data/puffin/test3.puffin");

Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
Assert.assertTrue(unkeyedTable.io().exists(file3.path()));
new MixedTableMaintainer(getArcticTable()).cleanContentFiles(System.currentTimeMillis() + 1);
new MixedTableMaintainer(getArcticTable()).cleanMetadata(System.currentTimeMillis() + 1);
Assert.assertTrue(unkeyedTable.io().exists(file.path()));
Assert.assertTrue(unkeyedTable.io().exists(file1.path()));
Assert.assertTrue(unkeyedTable.io().exists(file2.path()));
Assert.assertTrue(unkeyedTable.io().exists(file3.path()));
}

private StatisticsFile writeStatisticsFile(UnkeyedTable table, Snapshot snapshot) {
OutputFile outputFile =
table
.io()
.newOutputFile(table.location() + "/metadata/" + snapshot.snapshotId() + ".puffin");
private StatisticsFile commitStatisticsFile(UnkeyedTable table, String fileLocation) {
table.newAppend().commit();
Snapshot snapshot = table.currentSnapshot();
OutputFile outputFile = table.io().newOutputFile(fileLocation);
List<BlobMetadata> blobMetadata;
long fileSize;
long footerSize;
Expand All @@ -362,7 +368,11 @@ private StatisticsFile writeStatisticsFile(UnkeyedTable table, Snapshot snapshot
}
List<org.apache.iceberg.BlobMetadata> collect =
blobMetadata.stream().map(GenericBlobMetadata::from).collect(Collectors.toList());
return new GenericStatisticsFile(
snapshot.snapshotId(), outputFile.location(), fileSize, footerSize, collect);
GenericStatisticsFile statisticsFile =
new GenericStatisticsFile(
snapshot.snapshotId(), outputFile.location(), fileSize, footerSize, collect);
table.updateStatistics().setStatistics(snapshot.snapshotId(), statisticsFile).commit();

return statisticsFile;
}
}

0 comments on commit ead775f

Please sign in to comment.