Skip to content

Commit

Permalink
[ARCTIC-877] Fix Orphan files clean delete files unexpected (#878)
Browse files Browse the repository at this point in the history
* fix #877 orphan files clean with file uri

* refactor orphan files clean metadata files

* fix checkStyle

* add unit test
  • Loading branch information
wangtaohz authored Dec 8, 2022
1 parent cd67dcb commit 4b59f12
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.net.URI;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -192,82 +193,30 @@ private static void clearInternalTableMetadata(ArcticTable table, UnkeyedTable i
Set<String> validFiles = getValidMetadataFiles(table.id(), table.io(), internalTable);
LOG.info("{} table get {} valid files", table.id(), validFiles.size());
int deleteFilesCnt = 0;
List<MetadataJson> metadataJsonFiles = new ArrayList<>();
int maxVersion = 0;
String metadataLocation = internalTable.location() + File.separator + METADATA_FOLDER_NAME;
LOG.info("start orphan files clean in {}", metadataLocation);
for (FileStatus fileStatus : table.io().list(metadataLocation)) {
if (fileStatus.getPath().toString().endsWith("metadata.json")) {
Integer version = getVersionFromMetadataFilePath(fileStatus, lastTime);
metadataJsonFiles.add(new MetadataJson(version, fileStatus));
if (version != null && version > maxVersion) {
maxVersion = version;
}
continue;
}
deleteFilesCnt += deleteInvalidMetadata(table.io(),
fileStatus,
validFiles,
lastTime,
execute);
}
LOG.info("{} total delete[execute={}] {} manifestList/manifest files", table.id(), execute, deleteFilesCnt);
// delete metadata.json, keep latest 100 files and last modify time < min modify time
int minKeepVersion = Math.max(1, maxVersion - 100);
int deleteMetadataFileCnt = 0;
Set<String> parentDirectory = new HashSet<>();
for (MetadataJson metadataJson : metadataJsonFiles) {
Integer version = metadataJson.version;
FileStatus fileStatus = metadataJson.fileStatus;
if ((version == null || version < minKeepVersion) && fileStatus.getModificationTime() < lastTime) {
deleteMetadataFileCnt++;
if (execute) {
table.io().deleteFile(fileStatus.getPath().toString());
parentDirectory.add(fileStatus.getPath().getParent().toString());
}
LOG.info("delete[execute={}] metadata file {}: {}", execute, fileStatus.getPath().toString(),
formatTime(fileStatus.getModificationTime()));
}
}
parentDirectory.forEach(parent -> FileUtil.deleteEmptyDirectory(table.io(), parent));
LOG.info("{} total delete[execute={}] {} metadata files with min keep version {}, total delete {} files",
table.id(), execute, deleteMetadataFileCnt, minKeepVersion, deleteMetadataFileCnt + deleteFilesCnt);
}

private static class MetadataJson {
final Integer version;
final FileStatus fileStatus;

public MetadataJson(Integer version, FileStatus fileStatus) {
this.version = version;
this.fileStatus = fileStatus;
}
LOG.info("{} total delete[execute={}] {} manifestList/manifest/metadata files", table.id(), execute,
deleteFilesCnt);
}

private static String formatTime(long timestamp) {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).toString();
}

private static Integer getVersionFromMetadataFilePath(FileStatus fileStatus, long lastTime) {
String path = fileStatus.getPath().toString();
try {
int lastSlash = path.lastIndexOf('/');
String fileName = path.substring(lastSlash + 1);
return Integer.parseInt(fileName.replace(".metadata.json", "").replace("v", ""));
} catch (NumberFormatException e) {
LOG.warn("get unexpected table metadata {}: {}, delete it later if modify time is old than {}", path,
formatTime(fileStatus.getModificationTime()), formatTime(lastTime));
return null;
}
}

private static int deleteInvalidDataFiles(ArcticFileIO io,
FileStatus fileStatus,
Set<String> validFiles,
Long lastTime,
Set<String> exclude,
boolean execute) {
String location = fileStatus.getPath().toUri().getPath();
String location = getUriPath(fileStatus.getPath().toString());
if (io.isDirectory(location)) {
if (!io.isEmptyDirectory(location)) {
LOG.info("start orphan files clean in {}", location);
Expand Down Expand Up @@ -316,15 +265,12 @@ private static int deleteInvalidMetadata(ArcticFileIO io,
FileStatus fileStatus,
Set<String> validFiles,
Long lastTime, boolean execute) {
String location = fileStatus.getPath().toUri().getPath();
String location = getUriPath(fileStatus.getPath().toString());
if (io.isDirectory(location)) {
LOG.warn("unexpected dir in metadata/, {}", location);
return 0;
} else {
if (!validFiles.contains(location) &&
fileStatus.getModificationTime() < lastTime &&
!location.endsWith("metadata.json") &&
!location.contains("version-hint.text")) {
if (!validFiles.contains(location) && fileStatus.getModificationTime() < lastTime) {
if (execute) {
io.deleteFile(location);
}
Expand All @@ -346,22 +292,29 @@ private static Set<String> getValidMetadataFiles(TableIdentifier tableIdentifier
cnt++;
int before = validFiles.size();
String manifestListLocation = snapshot.manifestListLocation();
validFiles.add(manifestListLocation);

validFiles.add(getUriPath(manifestListLocation));

io.doAs(() -> {
// valid data files
List<ManifestFile> manifestFiles = snapshot.allManifests();
for (ManifestFile manifestFile : manifestFiles) {
validFiles.add(manifestFile.path());
validFiles.add(getUriPath(manifestFile.path()));
}
return null;
});
LOG.info("{} scan snapshot {}: {} and get {} files, complete {}/{}", tableIdentifier, snapshot.snapshotId(),
formatTime(snapshot.timestampMillis()), validFiles.size() - before, cnt, size);
}
ReachableFileUtil.metadataFileLocations(internalTable, true).forEach(f -> validFiles.add(getUriPath(f)));
validFiles.add(getUriPath(ReachableFileUtil.versionHintLocation(internalTable)));

return validFiles;
}

protected static String getUriPath(String path) {
return URI.create(path).getPath();
}

private static Set<String> getValidDataFiles(TableIdentifier tableIdentifier, ArcticFileIO io,
UnkeyedTable internalTable) {
Expand All @@ -378,8 +331,8 @@ private static Set<String> getValidDataFiles(TableIdentifier tableIdentifier, Ar
for (FileScanTask scanTask
: internalTable.newScan().useSnapshot(snapshot.snapshotId()).planFiles()) {
if (scanTask.file() != null) {
validFiles.add(scanTask.file().path().toString());
scanTask.deletes().forEach(file -> validFiles.add(file.path().toString()));
validFiles.add(getUriPath(scanTask.file().path().toString()));
scanTask.deletes().forEach(file -> validFiles.add(getUriPath(file.path().toString())));
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.netease.arctic.ams.server.service.impl.JDBCMetaService;
import com.netease.arctic.ams.server.service.impl.OptimizeQueueService;
import com.netease.arctic.ams.server.service.impl.OptimizerService;
import com.netease.arctic.ams.server.service.impl.OrphanFilesCleanServiceTest;
import com.netease.arctic.ams.server.service.impl.PlatformFileInfoService;
import com.netease.arctic.ams.server.util.DerbyTestUtil;
import com.netease.arctic.ams.server.utils.CatalogUtil;
Expand Down Expand Up @@ -114,6 +115,7 @@
TestIcebergMinorOptimizeCommit.class,
TestExpireFileCleanSupportIceberg.class,
TestOrphanFileCleanSupportIceberg.class,
OrphanFilesCleanServiceTest.class,
TestOrphanFileClean.class,
TestFileInfoCacheService.class,
SupportHiveTestGroup.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

package com.netease.arctic.ams.server.optimize;

import com.netease.arctic.TableTestBase;
import com.netease.arctic.ams.api.DataFileInfo;
import com.netease.arctic.ams.api.TableIdentifier;
import com.netease.arctic.ams.server.service.ServiceContainer;
import com.netease.arctic.ams.server.service.impl.FileInfoCacheService;
import com.netease.arctic.ams.server.service.impl.OrphanFilesCleanService;
import com.netease.arctic.ams.server.utils.JDBCSqlSessionFactoryProvider;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.OutputFile;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -33,6 +37,7 @@
import org.powermock.core.classloader.annotations.PrepareForTest;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;

Expand All @@ -47,7 +52,7 @@
@PowerMockIgnore({"org.apache.logging.log4j.*", "javax.management.*", "org.apache.http.conn.ssl.*",
"com.amazonaws.http.conn.ssl.*",
"javax.net.ssl.*", "org.apache.hadoop.*", "javax.*", "com.sun.org.apache.*", "org.apache.xerces.*"})
public class TestOrphanFileClean extends TableTestBase {
public class TestOrphanFileClean extends TestBaseOptimizeBase {

@Before
public void mock() {
Expand All @@ -59,7 +64,9 @@ public void mock() {
}

@Test
public void orphanDataFileClean() {
public void orphanDataFileClean() throws IOException {
insertTableBaseDataFiles(testKeyedTable, 1L);

String baseOrphanFilePath = testKeyedTable.baseTable().location() +
File.separator + DATA_FOLDER_NAME + File.separator + "orphan.parquet";
String changeOrphanFilePath = testKeyedTable.changeTable().location() +
Expand All @@ -73,10 +80,18 @@ public void orphanDataFileClean() {
OrphanFilesCleanService.clean(testKeyedTable, System.currentTimeMillis(), true, "all", false);
Assert.assertFalse(testKeyedTable.io().exists(baseOrphanFilePath));
Assert.assertFalse(testKeyedTable.io().exists(changeOrphanFilePath));
for (FileScanTask task : testKeyedTable.baseTable().newScan().planFiles()) {
Assert.assertTrue(testKeyedTable.io().exists(task.file().path().toString()));
}
for (FileScanTask task : testKeyedTable.changeTable().newScan().planFiles()) {
Assert.assertTrue(testKeyedTable.io().exists(task.file().path().toString()));
}
}

@Test
public void orphanMetadataFileClean() {
public void orphanMetadataFileClean() throws IOException {
insertTableBaseDataFiles(testKeyedTable, 1L);

String baseOrphanFilePath = testKeyedTable.baseTable().location() + File.separator + "metadata" +
File.separator + "orphan.avro";
String changeOrphanFilePath = testKeyedTable.changeTable().location() + File.separator + "metadata" +
Expand All @@ -85,11 +100,34 @@ public void orphanMetadataFileClean() {
baseOrphanDataFile.createOrOverwrite();
OutputFile changeOrphanDataFile = testKeyedTable.io().newOutputFile(changeOrphanFilePath);
changeOrphanDataFile.createOrOverwrite();

String changeInvalidMetadataJson = testKeyedTable.changeTable().location() + File.separator + "metadata" +
File.separator + "v0.metadata.json";
testKeyedTable.io().newOutputFile(changeInvalidMetadataJson).createOrOverwrite();
Assert.assertTrue(testKeyedTable.io().exists(baseOrphanFilePath));
Assert.assertTrue(testKeyedTable.io().exists(changeOrphanFilePath));
Assert.assertTrue(testKeyedTable.io().exists(changeInvalidMetadataJson));

OrphanFilesCleanService.clean(testKeyedTable, System.currentTimeMillis(), true, "all", true);
Assert.assertFalse(testKeyedTable.io().exists(baseOrphanFilePath));
Assert.assertFalse(testKeyedTable.io().exists(changeOrphanFilePath));
Assert.assertFalse(testKeyedTable.io().exists(changeInvalidMetadataJson));

assertMetadataExists(testKeyedTable.changeTable());
assertMetadataExists(testKeyedTable.baseTable());
}

private void assertMetadataExists(Table table) {
for (Snapshot snapshot : table.snapshots()) {
Assert.assertTrue(testKeyedTable.io().exists(snapshot.manifestListLocation()));
for (ManifestFile allManifest : snapshot.allManifests()) {
Assert.assertTrue(testKeyedTable.io().exists(allManifest.path()));
}
}
for (String metadataFile : ReachableFileUtil.metadataFileLocations(table, false)) {
Assert.assertTrue(testKeyedTable.io().exists(metadataFile));
}
Assert.assertTrue(testKeyedTable.io().exists(ReachableFileUtil.versionHintLocation(table)));
}

private static class FakeFileInfoCacheService extends FileInfoCacheService {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.ams.server.service.impl;

import org.junit.Assert;
import org.junit.Test;

public class OrphanFilesCleanServiceTest {
@Test
public void testGetUriPath() {
Assert.assertEquals("/a/b/c", OrphanFilesCleanService.getUriPath("hdfs://xxxxx/a/b/c"));
Assert.assertEquals("/a/b/c", OrphanFilesCleanService.getUriPath("hdfs://localhost:8888/a/b/c"));
Assert.assertEquals("/a/b/c", OrphanFilesCleanService.getUriPath("file://xxxxx/a/b/c"));
Assert.assertEquals("/a/b/c", OrphanFilesCleanService.getUriPath("/a/b/c"));
Assert.assertEquals("/a/b/c", OrphanFilesCleanService.getUriPath("hdfs:/a/b/c"));
Assert.assertEquals("a/b/c", OrphanFilesCleanService.getUriPath("a/b/c"));
}
}

0 comments on commit 4b59f12

Please sign in to comment.