Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public enum BackupPhase {
*/
private long completeTs;

/**
* Committed WAL timestamp for incremental backup
*/
private long incrCommittedWalTs;

/**
* Total bytes of incremental logs copied
*/
Expand Down Expand Up @@ -293,6 +298,14 @@ public void setCompleteTs(long endTs) {
this.completeTs = endTs;
}

public long getIncrCommittedWalTs() {
return incrCommittedWalTs;
}

public void setIncrCommittedWalTs(long timestamp) {
this.incrCommittedWalTs = timestamp;
}

public long getTotalBytesCopied() {
return totalBytesCopied;
}
Expand Down Expand Up @@ -549,6 +562,7 @@ public String getShortDescription() {
sb.append("{");
sb.append("ID=" + backupId).append(",");
sb.append("Type=" + getType()).append(",");
sb.append("IsContinuous=" + isContinuousBackupEnabled()).append(",");
sb.append("Tables=" + getTableListAsString()).append(",");
sb.append("State=" + getState()).append(",");
Calendar cal = Calendar.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,28 +859,47 @@ public String backupTables(BackupRequest request) throws IOException {

String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
if (type == BackupType.INCREMENTAL) {
Set<TableName> incrTableSet;
try (BackupSystemTable table = new BackupSystemTable(conn)) {
incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
}
if (request.isContinuousBackupEnabled()) {
Set<TableName> continuousBackupTableSet;
try (BackupSystemTable table = new BackupSystemTable(conn)) {
continuousBackupTableSet = table.getContinuousBackupTableSet().keySet();
}
if (continuousBackupTableSet.isEmpty()) {
String msg = "Continuous backup table set contains no tables. "
+ "You need to run Continuous backup first "
+ (tableList != null ? "on " + StringUtils.join(tableList, ",") : "");
throw new IOException(msg);
}
if (!continuousBackupTableSet.containsAll(tableList)) {
String extraTables = StringUtils.join(tableList, ",");
String msg = "Some tables (" + extraTables + ") haven't gone through Continuous backup. "
+ "Perform Continuous backup on " + extraTables + " first, then retry the command";
throw new IOException(msg);
}
} else {
Set<TableName> incrTableSet;
try (BackupSystemTable table = new BackupSystemTable(conn)) {
incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
}

if (incrTableSet.isEmpty()) {
String msg =
"Incremental backup table set contains no tables. " + "You need to run full backup first "
if (incrTableSet.isEmpty()) {
String msg = "Incremental backup table set contains no tables. "
+ "You need to run full backup first "
+ (tableList != null ? "on " + StringUtils.join(tableList, ",") : "");

throw new IOException(msg);
}
if (tableList != null) {
tableList.removeAll(incrTableSet);
if (!tableList.isEmpty()) {
String extraTables = StringUtils.join(tableList, ",");
String msg = "Some tables (" + extraTables + ") haven't gone through full backup. "
+ "Perform full backup on " + extraTables + " first, " + "then retry the command";
throw new IOException(msg);
}
if (tableList != null) {
tableList.removeAll(incrTableSet);
if (!tableList.isEmpty()) {
String extraTables = StringUtils.join(tableList, ",");
String msg = "Some tables (" + extraTables + ") haven't gone through full backup. "
+ "Perform full backup on " + extraTables + " first, then retry the command";
throw new IOException(msg);
}
}
tableList = Lists.newArrayList(incrTableSet);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rmdmattingly, I had a quick question for clarification — this is not related to the changes in this PR, but rather about the current incremental backup process.

It seems like we're using incrTableSet to determine the tables for incremental backup. Could you help us understand why we consider all tables marked for incremental backup instead of just the user-specified tables?

Understanding the reasoning behind this will help us better integrate the continuous backup functionality with the existing logic. Appreciate your insights — thank you!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason: [HBASE-14038] The incremental backup is controlled by the 'incremental backup table set'.
For example, if the table set contains (table1, table2, table3). Incremental backup will back up the WALs, which cover all the tables in the table set.
It is to avoid copying the same set of WALs, which would the likely case if you backup up table1, then backup table2.

}
tableList = Lists.newArrayList(incrTableSet);
}
if (tableList != null && !tableList.isEmpty()) {
for (TableName table : tableList) {
Expand All @@ -907,7 +926,12 @@ public String backupTables(BackupRequest request) throws IOException {
}
}
if (nonExistingTableList != null) {
if (type == BackupType.INCREMENTAL) {
// Non-continuous incremental backup is controlled by 'incremental backup table set'
// and not by user provided backup table list. This is an optimization to avoid copying
// the same set of WALs for incremental backups of different tables at different times
// HBASE-14038. Since continuous incremental backup and full backup backs-up user provided
// table list, we should inform use about non-existence of input table(s)
if (type == BackupType.INCREMENTAL && !request.isContinuousBackupEnabled()) {
// Update incremental backup set
tableList = excludeNonExistingTables(tableList, nonExistingTableList);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,20 @@ public void execute() throws IOException {
try (Admin admin = conn.getAdmin()) {
beginBackup(backupManager, backupInfo);

// Gather the bulk loads being tracked by the system, which can be deleted (since their data
// will be part of the snapshot being taken). We gather this list before taking the actual
// snapshots for the same reason as the log rolls.
List<BulkLoad> bulkLoadsToDelete = backupManager.readBulkloadRows(tableList);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn’t quite figure out why this step is needed. Could you please explain the reasoning behind it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed so subsequent incremental backup (just after this full backup) captures only bulkload from this full backup, so fullbackup deletes existing bulkload entries from system table

So an incremental backup post a full backup will capture only bulkload entries from fullbackup

This change is actually part of https://github.com/apache/hbase/pull/6506/files#diff-18c753bdf4ce717d55d98646bf46723970d2bd4a470a815eb9512c7d94398274 which is merged, I added here because it was necessary for bulkload test I added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I have a few questions:

  1. Why is this needed? Can’t we get the bulk loaded files from the backup location, just like we do with WAL files? Why not handle bulkload files the same way?
  2. Also, one thing I noticed — by default, when backup is enabled in the cluster, we start accumulating all HFiles (both store files and bulkloaded files) using BackupObserver. These files are registered under bulkLoadTableName to prevent cleaner chores from deleting them during incremental backups, etc. But in our case, we don’t want this behavior, right? The whole idea is to back up files into the backup location, not retain them on the source cluster. I think we should enable BackupObserver only for the traditional (non-continuous) backup path. Otherwise, we’re unnecessarily accumulating files in the source cluster.
  3. I also noticed this:
  • In this code, we read all WAL files since the last backup, convert them to HFiles, and store them in the backup location.
  • Again, in this line, we’re reading bulkloaded files from the table and backing them up.
  • However, the list of bulkloaded files also includes regular store files — as seen here — which means we’re potentially processing normal HFiles twice?
  1. Regarding: “I added here because it was necessary for the bulkload test I added”. -- ideally, we shouldn’t modify core logic only to support a specific test case. It might be better to adapt the test to the intended behavior.

@anmolnar @kgeisz @taklwu — please share your thoughts as well.

Copy link
Author

@ankitsol ankitsol Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1) & 2) points I see 2 advantages with the coded approach in this PR. First, this behaviour would be same for both continuous and non-continuous incremental backup (ie using bulkload files from source cluster). Second, using source cluster hfiles wrt bulkload operation instead of backup hfiles would reduce processing time during backup and cost of storage space of backup area. Backing up bulkload operation would also delay backup of WALs since backing up WALs and bulkload files are serial in execution.

Backing bulkload files idea was necessary when we were planning to use WALPlayer with bulkload restore capability. Now I don't see any advantage of backing up of bulkload files

  1. BackupObserver.preCommitStoreFile() in invoked only for bulkload operation so for bulkloaded hfiles only one time copy happens.

  2. This code actually resolves a bug for properly handling of bulkload operation, no modification of core logic although it might seem like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the question is what happens if the source cluster/storage is gone and users need a DR?
We could still get back data from snapshots and WALs from the external storage. Is there a way to restore the bulkloads in such a situation?
Also, what happens if those bulkload hfiles are compacted? Do we track it somewhere?

Copy link
Author

@ankitsol ankitsol Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kota-SH Post bulkload operation, user is advised to take a backup, as part of continuous backup and PITR design (I think this is not yet documented, I will add a comment in design to add this part)

Earlier we implemented WALPlayer with bulkload restore functionality but that could have resulted in timeouts or performance issues, so we dropped this WALPlayer modification and decided to ask user to take a backup (ideally incremental backup) post a bulkload operation so restore is fast

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kota-SH Regarding compaction, there is preCommitStoreFile() hook which registers these bulkloaded files in backup system table and avoids deletion during compaction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, maybe asking differently, is this a one-way approach for continuous backup if we couple with the optimization of HBASE-29003 that reduce the additional bulkload HFiles of the source cluster?

and without this change, or as @Kota-SH pointed about the if the source cluster/directory is not accessible, which backup can we use ? especially can we still have incremental recovery?

My two cents on this approach, building on top of HBASE-29003, is that it seems reasonable. at least incremental backup already has this code change that uses the source cluster/storage. I was wondered the feedback on the design docs is also suggesting us to work closer with the logic of incremental backup, and such we could avoid introducing similar logic but in fact is serving the same thing.

Meanwhile, it's worth thinking of
a. the original plan that copies all the bulkloaded HFiles in between a incremental backup was too slow, other than this approach, do we have any alternative?
b. are we 100% against the continuous backup reads HFiles/bulkload HFiles from the source storage? HBASE-29003 made a very good point about storage usage, especially the HDFS use cases with 3 replicas.


point 3 and 4, I assumed @ankitsol already answered, so I don't have comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense — thanks for the discussion, folks! 👍


if (backupInfo.isContinuousBackupEnabled()) {
handleContinuousBackup(admin);
} else {
handleNonContinuousBackup(admin);
}

backupManager
.deleteBulkLoadedRows(bulkLoadsToDelete.stream().map(BulkLoad::getRowKey).toList());

completeBackup(conn, backupInfo, BackupType.FULL, conf);
} catch (Exception e) {
failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@
*/
package org.apache.hadoop.hbase.backup.impl;

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand All @@ -48,6 +58,7 @@
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
Expand All @@ -60,6 +71,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
Expand Down Expand Up @@ -262,9 +274,19 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
// case PREPARE_INCREMENTAL:
beginBackup(backupManager, backupInfo);
backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
LOG.debug("For incremental backup, current table set is "
+ backupManager.getIncrementalBackupTableSet());
newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
// Non-continuous Backup incremental backup is controlled by 'incremental backup table set'
// and not by user provided backup table list. This is an optimization to avoid copying
// the same set of WALs for incremental backups of different tables at different times
// HBASE-14038
// Continuous-incremental backup backs up user provided table list/set
Set<TableName> currentTableSet;
if (backupInfo.isContinuousBackupEnabled()) {
currentTableSet = backupInfo.getTables();
} else {
currentTableSet = backupManager.getIncrementalBackupTableSet();
newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
}
LOG.debug("For incremental backup, the current table set is {}", currentTableSet);
} catch (Exception e) {
// fail the overall backup and return
failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
Expand All @@ -291,21 +313,24 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
// set overall backup status: complete. Here we make sure to complete the backup.
// After this checkpoint, even if entering cancel process, will let the backup finished
try {
// Set the previousTimestampMap which is before this current log roll to the manifest.
Map<TableName, Map<String, Long>> previousTimestampMap = backupManager.readLogTimestampMap();
backupInfo.setIncrTimestampMap(previousTimestampMap);

// The table list in backupInfo is good for both full backup and incremental backup.
// For incremental backup, it contains the incremental backup table set.
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);

Map<TableName, Map<String, Long>> newTableSetTimestampMap =
backupManager.readLogTimestampMap();

backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
Long newStartCode =
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);
if (!backupInfo.isContinuousBackupEnabled()) {
// Set the previousTimestampMap which is before this current log roll to the manifest.
Map<TableName, Map<String, Long>> previousTimestampMap =
backupManager.readLogTimestampMap();
backupInfo.setIncrTimestampMap(previousTimestampMap);

// The table list in backupInfo is good for both full backup and incremental backup.
// For incremental backup, it contains the incremental backup table set.
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);

Map<TableName, Map<String, Long>> newTableSetTimestampMap =
backupManager.readLogTimestampMap();

backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
Long newStartCode =
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);
}

List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());

Expand Down Expand Up @@ -362,23 +387,88 @@ protected void deleteBulkLoadDirectory() throws IOException {
}

protected void convertWALsToHFiles() throws IOException {
// get incremental backup file list and prepare parameters for DistCp
List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
// Get list of tables in incremental backup set
Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
// filter missing files out (they have been copied by previous backups)
incrBackupFileList = filterMissingFiles(incrBackupFileList);
List<String> tableList = new ArrayList<String>();
for (TableName table : tableSet) {
// Check if table exists
if (tableExists(table, conn)) {
tableList.add(table.getNameAsString());
} else {
LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
long previousBackupTs = 0L;
if (backupInfo.isContinuousBackupEnabled()) {
Set<TableName> tableSet = backupInfo.getTables();
List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
for (TableName table : tableSet) {
for (BackupInfo backup : backupInfos) {
// find previous backup for this table
if (backup.getTables().contains(table)) {
LOG.info("Found previous backup of type {} with id {} for table {}", backup.getType(),
backup.getBackupId(), table.getNameAsString());
List<String> walBackupFileList;
if (backup.getType() == BackupType.FULL) {
previousBackupTs = backup.getStartTs();
} else {
previousBackupTs = backup.getIncrCommittedWalTs();
}
walBackupFileList = getBackupLogs(previousBackupTs);
walToHFiles(walBackupFileList, Arrays.asList(table.getNameAsString()),
previousBackupTs);
break;
}
}
}
} else {
// get incremental backup file list and prepare parameters for DistCp
List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
// Get list of tables in incremental backup set
Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
// filter missing files out (they have been copied by previous backups)
incrBackupFileList = filterMissingFiles(incrBackupFileList);
List<String> tableList = new ArrayList<String>();
for (TableName table : tableSet) {
// Check if table exists
if (tableExists(table, conn)) {
tableList.add(table.getNameAsString());
} else {
LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
}
}
walToHFiles(incrBackupFileList, tableList, previousBackupTs);
}
walToHFiles(incrBackupFileList, tableList);
}

private List<String> getBackupLogs(long startTs) throws IOException {
// get log files from backup dir
String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
if (Strings.isNullOrEmpty(walBackupDir)) {
throw new IOException(
"Incremental backup requires the WAL backup directory " + CONF_CONTINUOUS_BACKUP_WAL_DIR);
}
List<String> resultLogFiles = new ArrayList<>();
Path walBackupPath = new Path(walBackupDir);
FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR));
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

for (FileStatus dayDir : dayDirs) {
if (!dayDir.isDirectory()) {
continue; // Skip files, only process directories
}

String dirName = dayDir.getPath().getName();
try {
Date dirDate = dateFormat.parse(dirName);
long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59)

if (dirEndTime >= startTs) {
Path dirPath = dayDir.getPath();
FileStatus[] logs = backupFs.listStatus(dirPath);
for (FileStatus log : logs) {
String filepath = log.getPath().toString();
LOG.debug("Found WAL file: {}", filepath);
resultLogFiles.add(filepath);
}
}
} catch (ParseException e) {
LOG.warn("Skipping invalid directory name: " + dirName, e);
}
}
return resultLogFiles;
}

protected boolean tableExists(TableName table, Connection conn) throws IOException {
Expand All @@ -387,7 +477,8 @@ protected boolean tableExists(TableName table, Connection conn) throws IOExcepti
}
}

protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
protected void walToHFiles(List<String> dirPaths, List<String> tableList, long previousBackupTs)
throws IOException {
Tool player = new WALPlayer();

// Player reads all files in arbitrary directory structure and creates
Expand All @@ -401,6 +492,14 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
conf.set(JOB_NAME_CONF_KEY, jobname);
if (backupInfo.isContinuousBackupEnabled()) {
conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs));
// committedWALsTs is needed only for Incremental backups with continuous backup
// since these do not depend on log roll ts
long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);
backupInfo.setIncrCommittedWalTs(committedWALsTs);
conf.set(WALInputFormat.END_TIME_KEY, Long.toString(committedWALsTs));
}
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };

try {
Expand Down
Loading