Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@
import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupSet;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -892,7 +894,8 @@ private boolean canAnyOtherBackupCover(List<BackupInfo> allBackups, BackupInfo c

/**
* Cleans up Write-Ahead Logs (WALs) that are no longer required for PITR after a successful
* backup deletion.
* backup deletion. If no full backups are present, all WALs are deleted, tables are removed
* from continuous backup metadata, and the associated replication peer is disabled.
*/
private void cleanUpUnusedBackupWALs() throws IOException {
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
Expand All @@ -903,7 +906,8 @@ private void cleanUpUnusedBackupWALs() throws IOException {
return;
}

try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
try (Admin admin = conn.getAdmin();
BackupSystemTable sysTable = new BackupSystemTable(conn)) {
// Get list of tables under continuous backup
Map<TableName, Long> continuousBackupTables = sysTable.getContinuousBackupTableSet();
if (continuousBackupTables.isEmpty()) {
Expand All @@ -914,7 +918,15 @@ private void cleanUpUnusedBackupWALs() throws IOException {
// Find the earliest timestamp after which WALs are still needed
long cutoffTimestamp = determineWALCleanupCutoffTime(sysTable);
if (cutoffTimestamp == 0) {
System.err.println("ERROR: No valid full backup found. Skipping WAL cleanup.");
// No full backup exists. PITR cannot function without a base full backup.
// Clean up all WALs, remove tables from backup metadata, and disable the replication
// peer.
System.out
.println("No full backups found. Cleaning up all WALs and disabling replication peer.");

disableContinuousBackupReplicationPeer(admin);
removeAllTablesFromContinuousBackup(sysTable);
deleteAllBackupWALFiles(conf, backupWalDir);
return;
}

Expand Down Expand Up @@ -944,6 +956,16 @@ long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOExceptio
return 0;
}

private void disableContinuousBackupReplicationPeer(Admin admin) throws IOException {
for (ReplicationPeerDescription peer : admin.listReplicationPeers()) {
if (peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER) && peer.isEnabled()) {
admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
System.out.println("Disabled replication peer: " + CONTINUOUS_BACKUP_REPLICATION_PEER);
break;
}
}
}

/**
* Updates the start time for continuous backups if older than cutoff timestamp.
* @param sysTable Backup system table
Expand All @@ -966,6 +988,49 @@ void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoffTimestam
}
}

private void removeAllTablesFromContinuousBackup(BackupSystemTable sysTable)
throws IOException {
Map<TableName, Long> allTables = sysTable.getContinuousBackupTableSet();
if (!allTables.isEmpty()) {
sysTable.removeContinuousBackupTableSet(allTables.keySet());
System.out.println("Removed all tables from continuous backup metadata.");
}
}

private void deleteAllBackupWALFiles(Configuration conf, String backupWalDir)
throws IOException {
try {
BackupFileSystemManager manager =
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
Path bulkloadDir = manager.getBulkLoadFilesDir();

// Delete contents under WAL directory
if (fs.exists(walDir)) {
FileStatus[] walContents = fs.listStatus(walDir);
for (FileStatus item : walContents) {
fs.delete(item.getPath(), true); // recursive delete of each child
}
System.out.println("Deleted all contents under WAL directory: " + walDir);
}

// Delete contents under bulk load directory
if (fs.exists(bulkloadDir)) {
FileStatus[] bulkContents = fs.listStatus(bulkloadDir);
for (FileStatus item : bulkContents) {
fs.delete(item.getPath(), true); // recursive delete of each child
}
System.out.println("Deleted all contents under Bulk Load directory: " + bulkloadDir);
}

} catch (IOException e) {
System.out.println("WARNING: Failed to delete contents under backup directories: "
+ backupWalDir + ". Error: " + e.getMessage());
throw e;
}
}

/**
* Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp.
*/
Expand Down Expand Up @@ -1010,7 +1075,7 @@ void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
System.out.println("WARNING: Failed to parse directory name '" + dirName
+ "'. Skipping. Error: " + e.getMessage());
} catch (IOException e) {
System.out.println("WARNING: Failed to delete directory '" + dirPath
System.err.println("WARNING: Failed to delete directory '" + dirPath
+ "'. Skipping. Error: " + e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1587,7 +1587,7 @@ private Delete createDeleteForIncrBackupTableSet(String backupRoot) {
private Delete createDeleteForContinuousBackupTableSet(Set<TableName> tables) {
Delete delete = new Delete(rowkey(CONTINUOUS_BACKUP_SET));
for (TableName tableName : tables) {
delete.addColumns(META_FAMILY, Bytes.toBytes(tableName.getNameAsString()));
delete.addColumn(META_FAMILY, Bytes.toBytes(tableName.getNameAsString()));
}
return delete;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ private void updateContinuousBackupReplicationPeer(Admin admin) throws IOExcepti
.collect(Collectors.toMap(tableName -> tableName, tableName -> new ArrayList<>()));

try {
if (!admin.isReplicationPeerEnabled(CONTINUOUS_BACKUP_REPLICATION_PEER)) {
admin.enableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
}
admin.appendReplicationPeerTableCFs(CONTINUOUS_BACKUP_REPLICATION_PEER, tableMap);
LOG.info("Updated replication peer {} with table and column family map.",
CONTINUOUS_BACKUP_REPLICATION_PEER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,12 @@ protected BackupRequest createBackupRequest(BackupType type, List<TableName> tab
return request;
}

protected BackupRequest createBackupRequest(BackupType type, List<TableName> tables, String path,
boolean noChecksumVerify, boolean continuousBackupEnabled) {
protected BackupRequest createBackupRequest(BackupType type, List<TableName> tables,
String rootDir, boolean noChecksumVerify, boolean isContinuousBackupEnabled) {
BackupRequest.Builder builder = new BackupRequest.Builder();
BackupRequest request = builder.withBackupType(type).withTableList(tables)
.withTargetRootDir(path).withNoChecksumVerify(noChecksumVerify)
.withContinuousBackupEnabled(continuousBackupEnabled).build();
return request;
return builder.withBackupType(type).withTableList(tables).withTargetRootDir(rootDir)
.withNoChecksumVerify(noChecksumVerify).withContinuousBackupEnabled(isContinuousBackupEnabled)
.build();
}

protected String backupTables(BackupType type, List<TableName> tables, String path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.backup;

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
Expand All @@ -28,18 +29,25 @@

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -55,38 +63,55 @@ public class TestBackupDeleteWithCleanup extends TestBackupBase {

String backupWalDirName = "TestBackupDeleteWithCleanup";

@Test
public void testBackupDeleteWithCleanupLogic() throws Exception {
private FileSystem fs;
private Path backupWalDir;
private BackupSystemTable backupSystemTable;

@Before
public void setUpTest() throws Exception {
Path root = TEST_UTIL.getDataTestDirOnTestFS();
Path backupWalDir = new Path(root, backupWalDirName);
backupWalDir = new Path(root, backupWalDirName);
conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
FileSystem fs = FileSystem.get(conf1);
fs = FileSystem.get(conf1);
fs.mkdirs(backupWalDir);
backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection());
}

@After
public void tearDownTest() throws Exception {
if (backupSystemTable != null) {
backupSystemTable.close();
}
if (fs != null && backupWalDir != null) {
fs.delete(backupWalDir, true);
}

EnvironmentEdgeManager.reset();
}

@Test
public void testBackupDeleteWithCleanupLogic() throws Exception {
// Step 1: Setup Backup Folders
long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
setupBackupFolders(fs, backupWalDir, currentTime);
setupBackupFolders(currentTime);

// Log the directory structure before cleanup
logDirectoryStructure(fs, backupWalDir, "Directory structure BEFORE cleanup:");

// Step 2: Simulate Backup Creation
BackupSystemTable backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection());
backupSystemTable.addContinuousBackupTableSet(Set.of(table1),
currentTime - (2 * ONE_DAY_IN_MILLISECONDS));

EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - (2 * ONE_DAY_IN_MILLISECONDS));

String backupId = fullTableBackup(Lists.newArrayList(table1));
assertTrue(checkSucceeded(backupId));

String anotherBackupId = fullTableBackup(Lists.newArrayList(table1));
assertTrue(checkSucceeded(anotherBackupId));

// Step 3: Run Delete Command
int ret =
ToolRunner.run(conf1, new BackupDriver(), new String[] { "delete", "-l", backupId, "-fd" });
assertEquals(0, ret);
deleteBackup(backupId);

// Log the directory structure after cleanup
logDirectoryStructure(fs, backupWalDir, "Directory structure AFTER cleanup:");
Expand All @@ -96,6 +121,70 @@ public void testBackupDeleteWithCleanupLogic() throws Exception {

// Step 5: Verify System Table Update
verifySystemTableUpdate(backupSystemTable, currentTime);

// Cleanup
deleteBackup(anotherBackupId);
}

@Test
public void testSingleBackupForceDelete() throws Exception {
// Step 1: Setup Backup Folders
long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
setupBackupFolders(currentTime);

// Log the directory structure before cleanup
logDirectoryStructure(fs, backupWalDir, "Directory structure BEFORE cleanup:");

// Step 2: Simulate Backup Creation
backupSystemTable.addContinuousBackupTableSet(Set.of(table1),
currentTime - (2 * ONE_DAY_IN_MILLISECONDS));

EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - (2 * ONE_DAY_IN_MILLISECONDS));

String backupId = fullTableBackupWithContinuous(Lists.newArrayList(table1));
assertTrue(checkSucceeded(backupId));

assertTrue("Backup replication peer should be enabled after the backup",
continuousBackupReplicationPeerExistsAndEnabled());

// Step 3: Run Delete Command
deleteBackup(backupId);

// Log the directory structure after cleanup
logDirectoryStructure(fs, backupWalDir, "Directory structure AFTER cleanup:");

// Step 4: Verify CONTINUOUS_BACKUP_REPLICATION_PEER is disabled
assertFalse("Backup replication peer should be disabled or removed",
continuousBackupReplicationPeerExistsAndEnabled());

// Step 5: Verify that system table is updated to remove all the tables
Set<TableName> remainingTables = backupSystemTable.getContinuousBackupTableSet().keySet();
assertTrue("System table should have no tables after all full backups are clear",
remainingTables.isEmpty());

// Step 6: Verify that the backup WAL directory is empty
assertTrue("WAL backup directory should be empty after force delete",
areWalAndBulkloadDirsEmpty(conf1, backupWalDir.toString()));

// Step 7: Take new full backup with continuous backup enabled
String backupIdContinuous = fullTableBackupWithContinuous(Lists.newArrayList(table1));

// Step 8: Verify CONTINUOUS_BACKUP_REPLICATION_PEER is enabled again
assertTrue("Backup replication peer should be re-enabled after new backup",
continuousBackupReplicationPeerExistsAndEnabled());

// And system table has new entry
Set<TableName> newTables = backupSystemTable.getContinuousBackupTableSet().keySet();
assertTrue("System table should contain the table after new backup",
newTables.contains(table1));

// Cleanup
deleteBackup(backupIdContinuous);
}

private void setupBackupFolders(long currentTime) throws IOException {
setupBackupFolders(fs, backupWalDir, currentTime);
}

public static void setupBackupFolders(FileSystem fs, Path backupWalDir, long currentTime)
Expand Down Expand Up @@ -181,4 +270,45 @@ public static void listDirectory(FileSystem fs, Path dir, String indent) throws
}
}
}

private boolean continuousBackupReplicationPeerExistsAndEnabled() throws IOException {
return TEST_UTIL.getAdmin().listReplicationPeers().stream().anyMatch(
peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER) && peer.isEnabled());
}

private static boolean areWalAndBulkloadDirsEmpty(Configuration conf, String backupWalDir)
throws IOException {
BackupFileSystemManager manager =
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);

FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
Path bulkloadDir = manager.getBulkLoadFilesDir();

return isDirectoryEmpty(fs, walDir) && isDirectoryEmpty(fs, bulkloadDir);
}

private static boolean isDirectoryEmpty(FileSystem fs, Path dirPath) throws IOException {
if (!fs.exists(dirPath)) {
// Directory doesn't exist — treat as empty
return true;
}
FileStatus[] entries = fs.listStatus(dirPath);
return entries == null || entries.length == 0;
}

private static void deleteBackup(String backupId) throws Exception {
int ret =
ToolRunner.run(conf1, new BackupDriver(), new String[] { "delete", "-l", backupId, "-fd" });
assertEquals(0, ret);
}

private String fullTableBackupWithContinuous(List<TableName> tables) throws IOException {
try (BackupAdmin admin = new BackupAdminImpl(TEST_UTIL.getConnection())) {
BackupRequest request =
createBackupRequest(BackupType.FULL, new ArrayList<>(tables), BACKUP_ROOT_DIR, false, true);
return admin.backupTables(request);
}
}

}