Skip to content

Commit

Permalink
HBASE-26718 HFileArchiver can remove referenced StoreFiles from the a…
Browse files Browse the repository at this point in the history
…rchive (#4274)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
  • Loading branch information
d-c-manning authored and apurtell committed Mar 28, 2022
1 parent 9355506 commit da2c32c
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -488,26 +488,57 @@ private static boolean resolveAndArchiveFile(Path archiveDir, File currentFile,
Path archiveFile = new Path(archiveDir, filename);
FileSystem fs = currentFile.getFileSystem();

// if the file already exists in the archive, move that one to a timestamped backup. This is a
// really, really unlikely situtation, where we get the same name for the existing file, but
// is included just for that 1 in trillion chance.
// An existing destination file in the archive is unexpected, but we handle it here.
if (fs.exists(archiveFile)) {
LOG.debug("{} already exists in archive, moving to timestamped backup and " +
"overwriting current.", archiveFile);
if (!fs.exists(currentFile.getPath())) {
// If the file already exists in the archive, and there is no current file to archive, then
// assume that the file in archive is correct. This is an unexpected situation, suggesting a
// race condition or split brain.
// In HBASE-26718 this was found when compaction incorrectly happened during warmupRegion.
LOG.warn("{} exists in archive. Attempted to archive nonexistent file {}.", archiveFile,
currentFile);
// We return success to match existing behavior in this method, where FileNotFoundException
// in moveAndClose is ignored.
return true;
}
// There is a conflict between the current file and the already existing archived file.
// Move the archived file to a timestamped backup. This is a really, really unlikely
// situation, where we get the same name for the existing file, but is included just for that
// 1 in trillion chance. We are potentially incurring data loss in the archive directory if
// the files are not identical. The timestamped backup will be cleaned by HFileCleaner as it
// has no references.
FileStatus curStatus = fs.getFileStatus(currentFile.getPath());
FileStatus archiveStatus = fs.getFileStatus(archiveFile);
long curLen = curStatus.getLen();
long archiveLen = archiveStatus.getLen();
long curMtime = curStatus.getModificationTime();
long archiveMtime = archiveStatus.getModificationTime();
if (curLen != archiveLen) {
LOG.error("{} already exists in archive with different size than current {}."
+ " archiveLen: {} currentLen: {} archiveMtime: {} currentMtime: {}",
archiveFile, currentFile, archiveLen, curLen, archiveMtime, curMtime);
throw new IOException(archiveFile + " already exists in archive with different size" +
" than " + currentFile);
}

LOG.error("{} already exists in archive, moving to timestamped backup and overwriting"
+ " current {}. archiveLen: {} currentLen: {} archiveMtime: {} currentMtime: {}",
archiveFile, currentFile, archiveLen, curLen, archiveMtime, curMtime);

// move the archive file to the stamped backup
Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime);
if (!fs.rename(archiveFile, backedupArchiveFile)) {
LOG.error("Could not rename archive file to backup: " + backedupArchiveFile
+ ", deleting existing file in favor of newer.");
// try to delete the exisiting file, if we can't rename it
// try to delete the existing file, if we can't rename it
if (!fs.delete(archiveFile, false)) {
throw new IOException("Couldn't delete existing archive file (" + archiveFile
+ ") or rename it to the backup file (" + backedupArchiveFile
+ ") to make room for similarly named file.");
}
} else {
LOG.info("Backed up archive file from {} to {}.", archiveFile, backedupArchiveFile);
}
LOG.debug("Backed up archive file from " + archiveFile);
}

LOG.trace("No existing file in archive for {}, free to archive original file.", archiveFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -31,6 +33,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -155,18 +158,56 @@ public void testArchiveStoreFilesDifferentFileSystemsWallAndRootSame() throws Ex
HFileArchiver::archiveStoreFiles);
}

@Test
public void testArchiveStoreFilesDifferentFileSystemsFileAlreadyArchived() throws Exception {
String baseDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
testArchiveStoreFilesDifferentFileSystems("/hbase/wals/", baseDir, true, false, false,
HFileArchiver::archiveStoreFiles);
}

@Test
public void testArchiveStoreFilesDifferentFileSystemsArchiveFileMatchCurrent() throws Exception {
String baseDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
testArchiveStoreFilesDifferentFileSystems("/hbase/wals/", baseDir, true, true, false,
HFileArchiver::archiveStoreFiles);
}

@Test(expected = IOException.class)
public void testArchiveStoreFilesDifferentFileSystemsArchiveFileMismatch() throws Exception {
String baseDir = CommonFSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
testArchiveStoreFilesDifferentFileSystems("/hbase/wals/", baseDir, true, true, true,
HFileArchiver::archiveStoreFiles);
}

private void testArchiveStoreFilesDifferentFileSystems(String walDir, String expectedBase,
ArchivingFunction<Configuration, FileSystem, RegionInfo, Path, byte[],
Collection<HStoreFile>> archivingFunction) throws IOException {
testArchiveStoreFilesDifferentFileSystems(walDir, expectedBase, false, true, false,
archivingFunction);
}

private void testArchiveStoreFilesDifferentFileSystems(String walDir, String expectedBase,
boolean archiveFileExists, boolean sourceFileExists, boolean archiveFileDifferentLength,
ArchivingFunction<Configuration, FileSystem, RegionInfo, Path, byte[],
Collection<HStoreFile>> archivingFunction) throws IOException {
FileSystem mockedFileSystem = mock(FileSystem.class);
Configuration conf = new Configuration(UTIL.getConfiguration());
if(walDir != null) {
conf.set(CommonFSUtils.HBASE_WAL_DIR, walDir);
}
Path filePath = new Path("/mockDir/wals/mockFile");
when(mockedFileSystem.getScheme()).thenReturn("mockFS");
when(mockedFileSystem.mkdirs(any())).thenReturn(true);
when(mockedFileSystem.exists(any())).thenReturn(true);
HashMap<Path,Boolean> existsTracker = new HashMap<>();
Path filePath = new Path("/mockDir/wals/mockFile");
String expectedDir = expectedBase +
"archive/data/default/mockTable/mocked-region-encoded-name/testfamily/mockFile";
existsTracker.put(new Path(expectedDir), archiveFileExists);
existsTracker.put(filePath, sourceFileExists);
when(mockedFileSystem.exists(any())).thenAnswer(invocation ->
existsTracker.getOrDefault((Path)invocation.getArgument(0), true));
FileStatus mockedStatus = mock(FileStatus.class);
when(mockedStatus.getLen()).thenReturn(12L).thenReturn(archiveFileDifferentLength ? 34L : 12L);
when(mockedFileSystem.getFileStatus(any())).thenReturn(mockedStatus);
RegionInfo mockedRegion = mock(RegionInfo.class);
TableName tableName = TableName.valueOf("mockTable");
when(mockedRegion.getTable()).thenReturn(tableName);
Expand All @@ -179,11 +220,30 @@ private void testArchiveStoreFilesDifferentFileSystems(String walDir, String exp
when(mockedFile.getPath()).thenReturn(filePath);
when(mockedFileSystem.rename(any(),any())).thenReturn(true);
archivingFunction.apply(conf, mockedFileSystem, mockedRegion, tableDir, family, list);
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
verify(mockedFileSystem, times(2)).rename(pathCaptor.capture(), any());
String expectedDir = expectedBase +
"archive/data/default/mockTable/mocked-region-encoded-name/testfamily/mockFile";
assertTrue(pathCaptor.getAllValues().get(0).toString().equals(expectedDir));

if (sourceFileExists) {
ArgumentCaptor<Path> srcPath = ArgumentCaptor.forClass(Path.class);
ArgumentCaptor<Path> destPath = ArgumentCaptor.forClass(Path.class);
if (archiveFileExists) {
// Verify we renamed the archived file to sideline, and then renamed the source file.
verify(mockedFileSystem, times(2)).rename(srcPath.capture(), destPath.capture());
assertEquals(expectedDir, srcPath.getAllValues().get(0).toString());
assertEquals(filePath, srcPath.getAllValues().get(1));
assertEquals(expectedDir, destPath.getAllValues().get(1).toString());
} else {
// Verify we renamed the source file to the archived file.
verify(mockedFileSystem, times(1)).rename(srcPath.capture(), destPath.capture());
assertEquals(filePath, srcPath.getAllValues().get(0));
assertEquals(expectedDir, destPath.getAllValues().get(0).toString());
}
} else {
if (archiveFileExists) {
// Verify we did not rename. No source file with a present archive file should be a no-op.
verify(mockedFileSystem, never()).rename(any(), any());
} else {
fail("Unsupported test conditions: sourceFileExists and archiveFileExists both false.");
}
}
}

@FunctionalInterface
Expand Down

0 comments on commit da2c32c

Please sign in to comment.