Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private int parseAndRun(String[] args) throws IOException {
type = BackupCommand.REPAIR;
} else if (BackupCommand.MERGE.name().equalsIgnoreCase(cmd)) {
type = BackupCommand.MERGE;
} else if (BackupCommand.CLEANUP.name().equalsIgnoreCase(cmd)) {
type = BackupCommand.CLEANUP;
} else {
Comment on lines +123 to 125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would name this something more specific, unless this command intends to clean up entries that may be left behind for full and incremental backups as well

System.out.println("Unsupported command for backup: " + cmd);
printToolUsage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ enum BackupCommand {
SET_DELETE,
SET_DESCRIBE,
SET_LIST,
REPAIR
REPAIR,
CLEANUP
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.backup.impl;

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC;
Expand All @@ -43,14 +45,22 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;

import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
Expand All @@ -63,6 +73,7 @@
import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupSet;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
Expand Down Expand Up @@ -117,6 +128,8 @@ public final class BackupCommands {

public static final String REPAIR_CMD_USAGE = "Usage: hbase backup repair\n";

public static final String CLEANUP_CMD_USAGE = "Usage: hbase backup cleanup\n";

public static final String SET_CMD_USAGE = "Usage: hbase backup set COMMAND [name] [tables]\n"
+ " name Backup set name\n" + " tables Comma separated list of tables.\n"
+ "COMMAND is one of:\n" + " add add tables to a set, create a set if needed\n"
Expand Down Expand Up @@ -245,6 +258,9 @@ public static Command createCommand(Configuration conf, BackupCommand type, Comm
case MERGE:
cmd = new MergeCommand(conf, cmdline);
break;
case CLEANUP:
cmd = new CleanupCommand(conf, cmdline);
break;
case HELP:
default:
cmd = new HelpCommand(conf, cmdline);
Expand Down Expand Up @@ -853,6 +869,188 @@ protected void printUsage() {
}
}

/**
* The {@code CleanupCommand} class is responsible for removing Write-Ahead Log (WAL) and
* bulk-loaded files that are no longer needed for Point-in-Time Recovery (PITR).
* <p>
* The cleanup process follows these steps:
* <ol>
* <li>Identify the oldest full backup and its start timestamp.</li>
* <li>Delete WAL files older than this timestamp, as they are no longer usable for PITR with any
* backup.</li>
* </ol>
Comment on lines +873 to +881
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The standard approach in HBase is to delete old files via extensions of the BaseFileCleanerDelegate. For example, the BackupLogCleaner which already handles cleaning up WALs as they relate to backups.

These cleaners should be run by the HMaster's CleanerChore, which will ensure that we only delete files which live in the intersection of all cleaners' outputs. On top of that critical safety guarantee, this also has the advantage of being run periodically, automatically — for more sophisticated HBase operators, this is a critical advantage because manual operations for textbook operations like backups do not scale well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @rmdmattingly, for the review comments!

I’d like to clarify that we are specifically cleaning up WALs in the backup location (e.g., S3, where they are continuously replicated), not the cluster’s WALs. If we were dealing with cluster WALs, your point would certainly apply—does that sound correct?

Regarding keeping this command manual:

  • All other backup-related commands are currently manual.
  • This command depends on the delete command. What we are doing here is identifying the first full backup in the system and deleting all WALs before that point.
  • These WALs are needed for point-in-time recovery (PITR), where we replay WALs from the full backup up to the specified recovery point. If the full backup itself no longer exists, keeping those WALs serves no purpose.
  • Since deleting full backups is already a manual operation, there is little benefit in automating this cleanup.

That said, we could explore the possibility of running this command periodically and automatically in future iterations.
Let me know your thoughts!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, thanks for the clarifications here. Maybe we could bake this clarification into the JavaDocs

You make some good points here, but I don't think they full take into account the variety of ways in which people deploy HBase.

All other backup-related commands are currently manual.

This is true to a large extent, but the non-emergency commands have at least been exposed in the Admin interface to make programmatic backups easily achievable. Maybe wiring up through the Admin is a fair compromise?

This command depends on the delete command. What we are doing here is identifying the first full backup in the system and deleting all WALs before that point.

If this operation can only follow a delete, and WALs are made useless by said delete, then should this operation just be a part of the backup deletion process?

Since deleting full backups is already a manual operation, there is little benefit in automating this cleanup.

I don't think it's true that backup deletions are necessarily manual from an operator's perspective. For example, a company backing up their data in S3 could be making use of bucket TTLs to clean up their old backups. In that case, it would be nice for unusable WALs to clean themselves up organically too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this operation can only follow a delete, and WALs are made useless by said delete, then should this operation just be a part of the backup deletion process?

That's my point as well. We could do this cleanup as part of the backup delete command, in which case we don't need to deal with whether this should be automatic or manual.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @rmdmattingly and @anmolnar.

Okay, we can incorporate this cleanup process into the delete command itself.

Currently, the delete command is used to remove both full and incremental backups. We have now introduced a new validation for PITR-Critical Backup Deletion. Please check the PR here: #6848 and review it.

I will also add this cleanup logic at the end of the delete process to remove any WALs that can be deleted (which were previously retained due to this backup). How does that sound?

*/
public static class CleanupCommand extends Command {
CleanupCommand(Configuration conf, CommandLine cmdline) {
super(conf);
this.cmdline = cmdline;
}

@Override
public void execute() throws IOException {
super.execute();

// Validate input arguments
validateArguments();

Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);

if (backupWalDir == null || backupWalDir.isEmpty()) {
System.out
.println("WAL Directory is not specified for continuous backup. Nothing to clean!");
return;
}

try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupSystemTable sysTable = new BackupSystemTable(conn)) {

// Retrieve tables that are part of continuous backup
Map<TableName, Long> continuousBackupTables = sysTable.getContinuousBackupTableSet();
if (continuousBackupTables.isEmpty()) {
System.out.println("Continuous Backup is not enabled for any tables. Nothing to clean!");
return;
}

// Determine the earliest timestamp before which WAL files can be deleted
long cleanupCutoffTimestamp = determineCleanupCutoffTime(sysTable, continuousBackupTables);
if (cleanupCutoffTimestamp == 0) {
System.err.println("ERROR: No valid full backup found. Cleanup aborted.");
return;
}

// Update the continuous backup table's start time to match the cutoff time *before* actual
// cleanup.
// This is safe because even if the WAL cleanup fails later, we won't be accessing data
// older than
// the cutoff timestamp, ensuring consistency in what the system considers valid for
// recovery.
//
// If we did this the other way around—cleaning up first and updating the table afterward—
// a failure between these two steps could leave us in an inconsistent state where some WALs
// are already deleted, but the backup metadata still references them.
updateContinuousBackupTablesStartTime(sysTable, cleanupCutoffTimestamp);

// Perform WAL file cleanup
cleanupOldWALFiles(conf, backupWalDir, cleanupCutoffTimestamp);
}
}

/**
* Fetches the continuous backup tables from the system table and updates their start timestamps
* if the current start time is earlier than the given cutoff timestamp.
* @param sysTable The backup system table from which continuous backup tables are
* retrieved and updated.
* @param cleanupCutoffTimestamp The cutoff timestamp before which WAL files can be deleted.
* @throws IOException If an error occurs while accessing the system table.
*/
private void updateContinuousBackupTablesStartTime(BackupSystemTable sysTable,
long cleanupCutoffTimestamp) throws IOException {
Map<TableName, Long> continuousBackupTables = sysTable.getContinuousBackupTableSet();

// Identify tables that need updating
Set<TableName> tablesToUpdate = new HashSet<>();
for (Map.Entry<TableName, Long> entry : continuousBackupTables.entrySet()) {
TableName table = entry.getKey();
long startTimestamp = entry.getValue();

if (startTimestamp < cleanupCutoffTimestamp) {
tablesToUpdate.add(table);
}
}

// If no tables require updates, exit early
if (tablesToUpdate.isEmpty()) {
return;
}

// Perform the actual update in the system table
sysTable.updateContinuousBackupTableSet(tablesToUpdate, cleanupCutoffTimestamp);
}

private void validateArguments() throws IOException {
String[] args = cmdline == null ? null : cmdline.getArgs();
if (args != null && args.length > 1) {
System.err.println("ERROR: wrong number of arguments: " + args.length);
printUsage();
throw new IOException(INCORRECT_USAGE);
}
}

private long determineCleanupCutoffTime(BackupSystemTable sysTable,
Map<TableName, Long> backupTables) throws IOException {
List<BackupInfo> backupInfos = sysTable.getBackupInfos(BackupState.COMPLETE);
Collections.reverse(backupInfos); // Process from oldest to latest

for (BackupInfo backupInfo : backupInfos) {
if (BackupType.FULL.equals(backupInfo.getType())) {
return backupInfo.getStartTs();
}
}
return 0;
}

/**
* Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp.
*/
private void cleanupOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
throws IOException {
System.out.println("Starting WAL cleanup in backup directory: " + backupWalDir
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinayakphegde, are you using System.out.println() in this file so output can be displayed to the user when they use hbase command line commands?

+ " with cutoff time: " + cutoffTime);

BackupFileSystemManager manager =
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
Path bulkloadDir = manager.getBulkLoadFilesDir();
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);

System.out.println("Listing directories under: " + walDir);

FileStatus[] directories = fs.listStatus(walDir);

for (FileStatus dirStatus : directories) {
if (!dirStatus.isDirectory()) {
continue; // Skip files, we only want directories
}

Path dirPath = dirStatus.getPath();
String dirName = dirPath.getName();

try {
long dayStart = parseDayDirectory(dirName, dateFormat);
System.out
.println("Checking WAL directory: " + dirName + " (Start Time: " + dayStart + ")");

// If WAL files of that day are older than cutoff time, delete them
if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinayakphegde, just curious, what is the purpose of adding ONE_DAY_IN_MILLISECONDS - 1? Is dayStart always at the beginning of the day and ONE_DAY_IN_MILLISECONDS - 1 moves the timestamp to the end of the day?

System.out.println("Deleting outdated WAL directory: " + dirPath);
fs.delete(dirPath, true);
fs.delete(new Path(bulkloadDir, dirPath.getName()), true);
}
} catch (ParseException e) {
System.out.println("WARNING: Failed to parse directory name '" + dirName
+ "'. Skipping. Error: " + e.getMessage());
} catch (IOException e) {
System.out.println("WARNING: Failed to delete directory '" + dirPath
+ "'. Skipping. Error: " + e.getMessage());
}
}

System.out.println("Completed WAL cleanup for backup directory: " + backupWalDir);
}

private long parseDayDirectory(String dayDir, SimpleDateFormat dateFormat)
throws ParseException {
return dateFormat.parse(dayDir).getTime();
}

@Override
protected void printUsage() {
System.out.println(CLEANUP_CMD_USAGE);
}
}

public static class MergeCommand extends Command {
MergeCommand(Configuration conf, CommandLine cmdline) {
super(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,27 @@ public void addContinuousBackupTableSet(Set<TableName> tables, long startTimesta
}
}

/**
* Updates the system table with the new start timestamps for continuous backup tables.
* @param tablesToUpdate The set of tables that need their start timestamps updated.
* @param newStartTimestamp The new start timestamp to be set.
*/
public void updateContinuousBackupTableSet(Set<TableName> tablesToUpdate, long newStartTimestamp)
throws IOException {
try (Table table = connection.getTable(tableName)) {
Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET));

for (TableName tableName : tablesToUpdate) {
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(tableName.getNameAsString()),
Bytes.toBytes(newStartTimestamp));
}

table.put(put);
LOG.info("Successfully updated start timestamps for {} tables in the backup system table.",
tablesToUpdate.size());
}
}

/**
* Deletes incremental backup set for a backup destination
* @param backupRoot backup root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOExc
walWriter.append(entry);
}
walWriter.sync(true);
uploadBulkLoadFiles(bulkLoadFiles);
uploadBulkLoadFiles(day, bulkLoadFiles);
} catch (UncheckedIOException e) {
String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day;
LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day,
Expand All @@ -281,9 +281,7 @@ private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOExc
}

private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
// Convert dayInMillis to "yyyy-MM-dd" format
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
String dayDirectoryName = dateFormat.format(new Date(dayInMillis));
String dayDirectoryName = formatToDateString(dayInMillis);

FileSystem fs = backupFileSystemManager.getBackupFs();
Path walsDir = backupFileSystemManager.getWalsDir();
Expand Down Expand Up @@ -343,17 +341,21 @@ private void close() {
}
}

private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws IOException {
private void uploadBulkLoadFiles(long dayInMillis, List<Path> bulkLoadFiles) throws IOException {
LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId),
bulkLoadFiles.size());

if (LOG.isTraceEnabled()) {
LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
}
String dayDirectoryName = formatToDateString(dayInMillis);
Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName);
backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);

for (Path file : bulkLoadFiles) {
Path sourcePath = getBulkLoadFileStagingPath(file);
Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), file);
Path destPath = new Path(bulkloadDir, file);

try {
LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath,
Expand All @@ -374,6 +376,14 @@ private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws IOException {
LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId));
}

/**
* Convert dayInMillis to "yyyy-MM-dd" format
*/
private String formatToDateString(long dayInMillis) {
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
return dateFormat.format(new Date(dayInMillis));
}

private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException {
FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
Path rootDir = CommonFSUtils.getRootDir(conf);
Expand Down
Loading