Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles,
int numActiveFiles = activeFiles.size();
updateFileLists(activeFiles, archiveFiles);
if (activeFiles.size() < numActiveFiles) {
// We've archived some files, delete bulkloads directory
// and re-try
deleteBulkLoadDirectory();
continue;
}

Expand Down Expand Up @@ -242,7 +245,7 @@ private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName tn,
incrementalCopyBulkloadHFiles(tgtFs, tn);
}

private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
public void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get away with making this protected, rather than public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TestIncrementalBackupWithBulkload test class isn't in the same package (...hbase.backup.impl vs hbase.backup), so the test method can't access the protected method. We could either:

  • separate just these 2 test methods into their own "impl" test file in a impl package
  • leave it as being public
  • move all the TestincrementalBackup* files to a impl module (moving just one seems confusing to future readers)

I would lean towards the first option -- what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think leaving it as public in that case is fine. The class is IA.Private so that will restrict visibility to internal uses only

throws IOException {
List<String> newlyArchived = new ArrayList<>();

Expand All @@ -252,9 +255,23 @@ private void updateFileLists(List<String> activeFiles, List<String> archiveFiles
}
}

if (newlyArchived.size() > 0) {
if (!newlyArchived.isEmpty()) {
String rootDir = CommonFSUtils.getRootDir(conf).toString();

activeFiles.removeAll(newlyArchived);
archiveFiles.addAll(newlyArchived);
for (String file : newlyArchived) {
String archivedFile = file.substring(rootDir.length() + 1);
Path archivedFilePath = new Path(HFileArchiveUtil.getArchivePath(conf), archivedFile);
archivedFile = archivedFilePath.toString();

if (!fs.exists(archivedFilePath)) {
throw new IOException(String.format(
"File %s no longer exists, and no archived file %s exists for it", file, archivedFile));
}

LOG.debug("Archived file {} has been updated", archivedFile);
archiveFiles.add(archivedFile);
}
}

LOG.debug(newlyArchived.size() + " files have been archived.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -38,6 +40,8 @@
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -147,6 +151,98 @@ private boolean containsRowWithKey(Table table, String rowKey) throws IOExceptio
return result.containsColumn(famName, qualName);
}

@Test
public void testUpdateFileListsRaceCondition() throws Exception {
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
// Test the race condition where files are archived during incremental backup
FileSystem fs = TEST_UTIL.getTestFileSystem();

String regionName = "region1";
String columnFamily = "cf";
String filename1 = "hfile1";
String filename2 = "hfile2";

Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
Path activeFile1 =
new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename1);
Path activeFile2 =
new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename2);

fs.mkdirs(activeFile1.getParent());
fs.create(activeFile1).close();
fs.create(activeFile2).close();

List<String> activeFiles = new ArrayList<>();
activeFiles.add(activeFile1.toString());
activeFiles.add(activeFile2.toString());
List<String> archiveFiles = new ArrayList<>();

Path archiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), table1,
regionName, columnFamily);
Path archivedFile1 = new Path(archiveDir, filename1);
fs.mkdirs(archiveDir);
assertTrue("File should be moved to archive", fs.rename(activeFile1, archivedFile1));

TestBackupBase.IncrementalTableBackupClientForTest client =
new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
"test_backup_id",
createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR));

client.updateFileLists(activeFiles, archiveFiles);

assertEquals("Only one file should remain in active files", 1, activeFiles.size());
assertEquals("File2 should still be in active files", activeFile2.toString(),
activeFiles.get(0));
assertEquals("One file should be added to archive files", 1, archiveFiles.size());
assertEquals("Archived file should have correct path", archivedFile1.toString(),
archiveFiles.get(0));
systemTable.finishBackupExclusiveOperation();
}

}

@Test
public void testUpdateFileListsMissingArchivedFile() throws Exception {
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
// Test that IOException is thrown when file doesn't exist in archive location
FileSystem fs = TEST_UTIL.getTestFileSystem();

String regionName = "region2";
String columnFamily = "cf";
String filename = "missing_file";

Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
Path activeFile =
new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename);

fs.mkdirs(activeFile.getParent());
fs.create(activeFile).close();

List<String> activeFiles = new ArrayList<>();
activeFiles.add(activeFile.toString());
List<String> archiveFiles = new ArrayList<>();

// Delete the file but don't create it in archive location
fs.delete(activeFile, false);

TestBackupBase.IncrementalTableBackupClientForTest client =
new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
"test_backup_id",
createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR));

// This should throw IOException since file doesn't exist in archive
try {
client.updateFileLists(activeFiles, archiveFiles);
fail("Expected IOException to be thrown");
} catch (IOException e) {
// Expected
}
systemTable.finishBackupExclusiveOperation();
}
}

private void performBulkLoad(String keyPrefix) throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME);
Expand Down