Skip to content

Commit

Permalink
HBASE-23553 Snapshot referenced data files are deleted in some case (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi authored Dec 11, 2019
1 parent 72ab1c1 commit fc816ac
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3857,4 +3857,9 @@ public void run() {
public AsyncClusterConnection getAsyncClusterConnection() {
return asyncClusterConnection;
}

@VisibleForTesting
public CompactedHFilesDischarger getCompactedHFilesDischarger() {
return compactedFileDischarger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,16 @@ public void storeFile(final RegionInfo regionInfo, final String family,
String hfile = storeFile.getName();
if (HFileLink.isHFileLink(hfile)) {
names.add(HFileLink.getReferencedHFileName(hfile));
} else if (StoreFileInfo.isReference(hfile)) {
Path refPath = StoreFileInfo.getReferredToFile(new Path(new Path(
new Path(new Path(regionInfo.getTable().getNamespaceAsString(),
regionInfo.getTable().getQualifierAsString()), regionInfo.getEncodedName()),
family), hfile));
names.add(hfile);
names.add(refPath.getName());
if (HFileLink.isHFileLink(refPath.getName())) {
names.add(HFileLink.getReferencedHFileName(refPath.getName()));
}
} else {
names.add(hfile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hbase.client;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -29,13 +32,19 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
Expand Down Expand Up @@ -306,4 +315,133 @@ private static void verifyRow(Result result) throws IOException {
}
}

@Test
public void testMergeRegion() throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testMergeRegion");
String snapshotName = tableName.getNameAsString() + "_snapshot";
Configuration conf = UTIL.getConfiguration();
Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
long timeout = 20000; // 20s
try (Admin admin = UTIL.getAdmin()) {
List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
.collect(Collectors.toList());
// create table with 3 regions
Table table = UTIL.createTable(tableName, FAMILIES, 1, bbb, yyy, 3);
List<RegionInfo> regions = admin.getRegions(tableName);
Assert.assertEquals(3, regions.size());
RegionInfo region0 = regions.get(0);
RegionInfo region1 = regions.get(1);
RegionInfo region2 = regions.get(2);
// put some data in the table
UTIL.loadTable(table, FAMILIES);
admin.flush(tableName);
// wait flush is finished
UTIL.waitFor(timeout, () -> {
try {
Path tableDir = FSUtils.getTableDir(rootDir, tableName);
for (RegionInfo region : regions) {
Path regionDir = new Path(tableDir, region.getEncodedName());
for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) {
if (fs.listStatus(familyDir).length != 1) {
return false;
}
}
}
return true;
} catch (IOException e) {
LOG.warn("Failed check if flush is finished", e);
return false;
}
});
// merge 2 regions
admin.compactionSwitch(false, serverList);
admin.mergeRegionsAsync(region0.getEncodedNameAsBytes(), region1.getEncodedNameAsBytes(),
true);
UTIL.waitFor(timeout, () -> admin.getRegions(tableName).size() == 2);
List<RegionInfo> mergedRegions = admin.getRegions(tableName);
RegionInfo mergedRegion =
mergedRegions.get(0).getEncodedName().equals(region2.getEncodedName())
? mergedRegions.get(1)
: mergedRegions.get(0);
// snapshot
admin.snapshot(snapshotName, tableName);
Assert.assertEquals(1, admin.listSnapshots().size());
// major compact
admin.compactionSwitch(true, serverList);
admin.majorCompactRegion(mergedRegion.getRegionName());
// wait until merged region has no reference
UTIL.waitFor(timeout, () -> {
try {
for (RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster()
.getRegionServerThreads()) {
HRegionServer regionServer = regionServerThread.getRegionServer();
for (HRegion subRegion : regionServer.getRegions(tableName)) {
if (subRegion.getRegionInfo().getEncodedName()
.equals(mergedRegion.getEncodedName())) {
regionServer.getCompactedHFilesDischarger().chore();
}
}
}
Path tableDir = FSUtils.getTableDir(rootDir, tableName);
HRegionFileSystem regionFs = HRegionFileSystem
.openRegionFromFileSystem(UTIL.getConfiguration(), fs, tableDir, mergedRegion, true);
return !regionFs.hasReferences(admin.getDescriptor(tableName));
} catch (IOException e) {
LOG.warn("Failed check merged region has no reference", e);
return false;
}
});
// run catalog janitor to clean and wait for parent regions are archived
UTIL.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
UTIL.waitFor(timeout, () -> {
try {
Path tableDir = FSUtils.getTableDir(rootDir, tableName);
for (FileStatus fileStatus : fs.listStatus(tableDir)) {
String name = fileStatus.getPath().getName();
if (name.equals(region0.getEncodedName()) || name.equals(region1.getEncodedName())) {
return false;
}
}
return true;
} catch (IOException e) {
LOG.warn("Check if parent regions are archived error", e);
return false;
}
});
// set file modify time and then run cleaner
long time = System.currentTimeMillis() - TimeToLiveHFileCleaner.DEFAULT_TTL * 1000;
traverseAndSetFileTime(HFileArchiveUtil.getArchivePath(conf), time);
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
// scan snapshot
try (TableSnapshotScanner scanner = new TableSnapshotScanner(conf,
UTIL.getDataTestDirOnTestFS(snapshotName), snapshotName, new Scan(bbb, yyy))) {
verifyScanner(scanner, bbb, yyy);
}
} catch (Exception e) {
LOG.error("scan snapshot error", e);
Assert.fail("Should not throw FileNotFoundException");
Assert.assertTrue(e.getCause() != null);
Assert.assertTrue(e.getCause().getCause() instanceof FileNotFoundException);
} finally {
tearDownCluster();
}
}

private void traverseAndSetFileTime(Path path, long time) throws IOException {
fs.setTimes(path, time, -1);
if (fs.isDirectory(path)) {
List<FileStatus> allPaths = Arrays.asList(fs.listStatus(path));
List<FileStatus> subDirs =
allPaths.stream().filter(FileStatus::isDirectory).collect(Collectors.toList());
List<FileStatus> files =
allPaths.stream().filter(FileStatus::isFile).collect(Collectors.toList());
for (FileStatus subDir : subDirs) {
traverseAndSetFileTime(subDir.getPath(), time);
}
for (FileStatus file : files) {
fs.setTimes(file.getPath(), time, -1);
}
}
}
}

0 comments on commit fc816ac

Please sign in to comment.