diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
new file mode 100644
index 000000000000..b2edce6b0fd1
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for handling Point-In-Time Restore (PITR).
+ *
+ * Defines the common PITR algorithm using the Template Method Pattern. Subclasses provide the
+ * metadata source (e.g., backup system table or a custom backup location).
+ *
+ * The PITR flow includes:
+ *
+ * - Validating recovery time within the PITR window
+ * - Checking for continuous backup and valid backup availability
+ * - Restoring the backup
+ * - Replaying WALs to bring tables to the target state
+ *
+ *
+ * Subclasses must implement {@link #getBackupMetadata(PointInTimeRestoreRequest)} to supply the
+ * list of completed backups.
+ */
+@InterfaceAudience.Private
+public abstract class AbstractPitrRestoreHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractPitrRestoreHandler.class);
+
+ protected final Connection conn;
+ protected final BackupAdminImpl backupAdmin;
+ protected final PointInTimeRestoreRequest request;
+
+ AbstractPitrRestoreHandler(Connection conn, PointInTimeRestoreRequest request) {
+ this.conn = conn;
+ this.backupAdmin = new BackupAdminImpl(conn);
+ this.request = request;
+ }
+
+ /**
+ * Validates the PITR request and performs the restore if valid. This is the main entry point for
+ * the PITR process and should be called by clients.
+ */
+ public final void validateAndRestore() throws IOException {
+ long endTime = request.getToDateTime();
+ validateRequestToTime(endTime);
+
+ TableName[] sourceTableArray = request.getFromTables();
+ TableName[] targetTableArray = resolveTargetTables(sourceTableArray, request.getToTables());
+
+ // Validate PITR requirements
+ validatePitr(endTime, sourceTableArray, targetTableArray);
+
+ // If only validation is required, log and return
+ if (request.isCheck()) {
+ LOG.info("PITR can be successfully executed");
+ return;
+ }
+
+ // Execute PITR process
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ Map continuousBackupTables = table.getContinuousBackupTableSet();
+ List backupMetadataList = getBackupMetadata(request);
+
+ for (int i = 0; i < sourceTableArray.length; i++) {
+ restoreTableWithWalReplay(sourceTableArray[i], targetTableArray[i], endTime,
+ continuousBackupTables, backupMetadataList, request);
+ }
+ }
+ }
+
+ /**
+ * Validates whether the requested end time falls within the allowed PITR recovery window.
+ * @param endTime The target recovery time.
+ * @throws IOException If the requested recovery time is outside the allowed window.
+ */
+ private void validateRequestToTime(long endTime) throws IOException {
+ long pitrWindowDays = conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS,
+ DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS);
+ long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+ long pitrMaxStartTime = currentTime - TimeUnit.DAYS.toMillis(pitrWindowDays);
+
+ if (endTime < pitrMaxStartTime) {
+ String errorMsg = String.format(
+ "Requested recovery time (%d) is out of the allowed PITR window (last %d days).", endTime,
+ pitrWindowDays);
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+
+ if (endTime > currentTime) {
+ String errorMsg = String.format(
+ "Requested recovery time (%d) is in the future. Current time: %d.", endTime, currentTime);
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ }
+
+ /**
+ * Resolves the target table array. If null or empty, defaults to the source table array.
+ */
+ private TableName[] resolveTargetTables(TableName[] sourceTables, TableName[] targetTables) {
+ return (targetTables == null || targetTables.length == 0) ? sourceTables : targetTables;
+ }
+
+ /**
+ * Validates whether Point-In-Time Recovery (PITR) is possible for the given tables at the
+ * specified time.
+ *
+ * PITR requires:
+ *
+ * - Continuous backup to be enabled for the source tables.
+ * - A valid backup image and corresponding WALs to be available.
+ *
+ * @param endTime The target recovery time.
+ * @param sTableArray The source tables to restore.
+ * @param tTableArray The target tables where the restore will be performed.
+ * @throws IOException If PITR is not possible due to missing continuous backup or backup images.
+ */
+ private void validatePitr(long endTime, TableName[] sTableArray, TableName[] tTableArray)
+ throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ // Retrieve the set of tables with continuous backup enabled
+ Map continuousBackupTables = table.getContinuousBackupTableSet();
+
+ // Ensure all source tables have continuous backup enabled
+ validateContinuousBackup(sTableArray, continuousBackupTables);
+
+ // Fetch completed backup information
+ List backupMetadataList = getBackupMetadata(request);
+
+ // Ensure a valid backup and WALs exist for PITR
+ validateBackupAvailability(sTableArray, tTableArray, endTime, continuousBackupTables,
+ backupMetadataList);
+ }
+ }
+
+ /**
+ * Ensures that all source tables have continuous backup enabled.
+ */
+ private void validateContinuousBackup(TableName[] tables,
+ Map continuousBackupTables) throws IOException {
+ List missingTables =
+ Arrays.stream(tables).filter(table -> !continuousBackupTables.containsKey(table)).toList();
+
+ if (!missingTables.isEmpty()) {
+ String errorMsg = "Continuous Backup is not enabled for the following tables: "
+ + missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(", "));
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ }
+
+ /**
+ * Ensures that a valid backup and corresponding WALs exist for PITR for each source table. PITR
+ * requires: 1. A valid backup available before the end time. 2. Write-Ahead Logs (WALs) covering
+ * the remaining duration up to the end time.
+ */
+ private void validateBackupAvailability(TableName[] sTableArray, TableName[] tTableArray,
+ long endTime, Map continuousBackupTables, List backups)
+ throws IOException {
+ for (int i = 0; i < sTableArray.length; i++) {
+ if (
+ !canPerformPitr(sTableArray[i], tTableArray[i], endTime, continuousBackupTables, backups)
+ ) {
+ String errorMsg = String.format(
+ "PITR failed: No valid backup/WALs found for source table %s (target: %s) before time %d",
+ sTableArray[i].getNameAsString(), tTableArray[i].getNameAsString(), endTime);
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+ }
+ }
+
+ /**
+ * Checks whether PITR can be performed for a given source-target table pair.
+ */
+ private boolean canPerformPitr(TableName stableName, TableName tTableName, long endTime,
+ Map continuousBackupTables, List backups) {
+ return getValidBackup(stableName, tTableName, endTime, continuousBackupTables, backups) != null;
+ }
+
+ /**
+ * Finds and returns the first valid backup metadata entry that can be used to restore the given
+ * source table up to the specified end time. A backup is considered valid if:
+ *
+ * - It contains the source table
+ * - It was completed before the requested end time
+ * - Its start time is after the table's continuous backup start time
+ * - It passes the restore request validation
+ *
+ */
+ private PitrBackupMetadata getValidBackup(TableName sTableName, TableName tTablename,
+ long endTime, Map continuousBackupTables, List backups) {
+ for (PitrBackupMetadata backup : backups) {
+ if (isValidBackupForPitr(backup, sTableName, endTime, continuousBackupTables)) {
+
+ RestoreRequest restoreRequest =
+ BackupUtils.createRestoreRequest(backup.getRootDir(), backup.getBackupId(), true,
+ new TableName[] { sTableName }, new TableName[] { tTablename }, false);
+
+ try {
+ if (backupAdmin.validateRequest(restoreRequest)) {
+ return backup;
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception occurred while testing the backup : {} for restore ",
+ backup.getBackupId(), e);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Determines if the given backup is valid for PITR.
+ *
+ * A backup is valid if:
+ *
+ * - It contains the source table.
+ * - It was completed before the end time.
+ * - The start timestamp of the backup is after the continuous backup start time for the
+ * table.
+ *
+ * @param backupMetadata Backup information object.
+ * @param tableName Table to check.
+ * @param endTime The target recovery time.
+ * @param continuousBackupTables Map of tables with continuous backup enabled.
+ * @return true if the backup is valid for PITR, false otherwise.
+ */
+ private boolean isValidBackupForPitr(PitrBackupMetadata backupMetadata, TableName tableName,
+ long endTime, Map continuousBackupTables) {
+ return backupMetadata.getTableNames().contains(tableName)
+ && backupMetadata.getCompleteTs() <= endTime
+ && continuousBackupTables.getOrDefault(tableName, 0L) <= backupMetadata.getStartTs();
+ }
+
+ /**
+ * Restores the table using the selected backup and replays WALs from the backup start time to the
+ * requested end time.
+ * @throws IOException if no valid backup is found or WAL replay fails
+ */
+ private void restoreTableWithWalReplay(TableName sourceTable, TableName targetTable, long endTime,
+ Map continuousBackupTables, List backupMetadataList,
+ PointInTimeRestoreRequest request) throws IOException {
+ PitrBackupMetadata backupMetadata =
+ getValidBackup(sourceTable, targetTable, endTime, continuousBackupTables, backupMetadataList);
+ if (backupMetadata == null) {
+ String errorMsg = "Could not find a valid backup and WALs for PITR for table: "
+ + sourceTable.getNameAsString();
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+
+ RestoreRequest restoreRequest = BackupUtils.createRestoreRequest(backupMetadata.getRootDir(),
+ backupMetadata.getBackupId(), false, new TableName[] { sourceTable },
+ new TableName[] { targetTable }, request.isOverwrite());
+
+ backupAdmin.restore(restoreRequest);
+ replayWal(sourceTable, targetTable, backupMetadata.getStartTs(), endTime);
+ }
+
+ /**
+ * Replays WALs to bring the table to the desired state.
+ */
+ private void replayWal(TableName sourceTable, TableName targetTable, long startTime, long endTime)
+ throws IOException {
+ String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ Path walDirPath = new Path(walBackupDir);
+ LOG.info(
+ "Starting WAL replay for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}",
+ sourceTable, targetTable, startTime, endTime, walDirPath);
+
+ List validDirs =
+ getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
+ if (validDirs.isEmpty()) {
+ LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL replay.", startTime,
+ endTime);
+ return;
+ }
+
+ executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
+ }
+
+ /**
+ * Fetches valid WAL directories based on the given time range.
+ */
+ private List getValidWalDirs(Configuration conf, Path walBackupDir, long startTime,
+ long endTime) throws IOException {
+ FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
+ FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR));
+
+ List validDirs = new ArrayList<>();
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+ for (FileStatus dayDir : dayDirs) {
+ if (!dayDir.isDirectory()) {
+ continue; // Skip files, only process directories
+ }
+
+ String dirName = dayDir.getPath().getName();
+ try {
+ Date dirDate = dateFormat.parse(dirName);
+ long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+ long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59)
+
+ // Check if this day's WAL files overlap with the required time range
+ if (dirEndTime >= startTime && dirStartTime <= endTime) {
+ validDirs.add(dayDir.getPath().toString());
+ }
+ } catch (ParseException e) {
+ LOG.warn("Skipping invalid directory name: " + dirName, e);
+ }
+ }
+ return validDirs;
+ }
+
+ /**
+ * Executes WAL replay using WALPlayer.
+ */
+ private void executeWalReplay(List walDirs, TableName sourceTable, TableName targetTable,
+ long startTime, long endTime) throws IOException {
+ Tool walPlayer = initializeWalPlayer(startTime, endTime);
+ String[] args =
+ { String.join(",", walDirs), sourceTable.getNameAsString(), targetTable.getNameAsString() };
+
+ try {
+ LOG.info("Executing WALPlayer with args: {}", Arrays.toString(args));
+ int exitCode = walPlayer.run(args);
+ if (exitCode == 0) {
+ LOG.info("WAL replay completed successfully for {}", targetTable);
+ } else {
+ throw new IOException("WAL replay failed with exit code: " + exitCode);
+ }
+ } catch (Exception e) {
+ LOG.error("Error during WAL replay for {}: {}", targetTable, e.getMessage(), e);
+ throw new IOException("Exception during WAL replay", e);
+ }
+ }
+
+ /**
+ * Initializes and configures WALPlayer.
+ */
+ private Tool initializeWalPlayer(long startTime, long endTime) {
+ Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
+ conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
+ conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
+ conf.setBoolean(IGNORE_EMPTY_FILES, true);
+ Tool walPlayer = new WALPlayer();
+ walPlayer.setConf(conf);
+ return walPlayer;
+ }
+
+ protected abstract List getBackupMetadata(PointInTimeRestoreRequest request)
+ throws IOException;
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index e82d9804f9dc..75a2a6343a58 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -17,33 +17,18 @@
*/
package org.apache.hadoop.hbase.backup.impl;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
-import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
-import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
-import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
-import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
-
import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupAdmin;
import org.apache.hadoop.hbase.backup.BackupClientFactory;
@@ -61,10 +46,7 @@
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -521,7 +503,7 @@ public void restore(RestoreRequest request) throws IOException {
new RestoreTablesClient(conn, request).execute();
}
- private boolean validateRequest(RestoreRequest request) throws IOException {
+ public boolean validateRequest(RestoreRequest request) throws IOException {
// check and load backup image manifest for the tables
Path rootPath = new Path(request.getBackupRootDir());
String backupId = request.getBackupId();
@@ -533,324 +515,28 @@ private boolean validateRequest(RestoreRequest request) throws IOException {
return BackupUtils.validate(Arrays.asList(sTableArray), manifest, conn.getConfiguration());
}
- @Override
- public void pointInTimeRestore(PointInTimeRestoreRequest request) throws IOException {
- if (request.getBackupRootDir() == null) {
- defaultPointInTimeRestore(request);
- } else {
- // TODO: special case, not supported at the moment
- throw new IOException("Custom backup location for Point-In-Time Recovery Not supported!");
- }
- LOG.info("Successfully completed Point In Time Restore for all tables.");
- }
-
- /**
- * Performs a default Point-In-Time Restore (PITR) by restoring the latest valid backup and
- * replaying the WALs to bring the table to the desired state. PITR requires: 1. A valid backup
- * available before the end time. 2. Write-Ahead Logs (WALs) covering the remaining duration up to
- * the end time.
- * @param request PointInTimeRestoreRequest containing restore parameters.
- * @throws IOException If no valid backup or WALs are found, or if an error occurs during
- * restoration.
- */
- private void defaultPointInTimeRestore(PointInTimeRestoreRequest request) throws IOException {
- long endTime = request.getToDateTime();
- validateRequestToTime(endTime);
-
- TableName[] sTableArray = request.getFromTables();
- TableName[] tTableArray = resolveTargetTables(sTableArray, request.getToTables());
-
- // Validate PITR requirements
- validatePitr(endTime, sTableArray, tTableArray);
-
- // If only validation is required, log and return
- if (request.isCheck()) {
- LOG.info("PITR can be successfully executed");
- return;
- }
-
- // Execute PITR process
- try (BackupSystemTable table = new BackupSystemTable(conn)) {
- Map continuousBackupTables = table.getContinuousBackupTableSet();
- List backupInfos = table.getBackupInfos(BackupState.COMPLETE);
-
- for (int i = 0; i < sTableArray.length; i++) {
- restoreTableWithWalReplay(sTableArray[i], tTableArray[i], endTime, continuousBackupTables,
- backupInfos, request);
- }
- }
- }
-
- /**
- * Validates whether the requested end time falls within the allowed PITR recovery window.
- * @param endTime The target recovery time.
- * @throws IOException If the requested recovery time is outside the allowed window.
- */
- private void validateRequestToTime(long endTime) throws IOException {
- long pitrWindowDays = conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS,
- DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS);
- long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
- long pitrMaxStartTime = currentTime - TimeUnit.DAYS.toMillis(pitrWindowDays);
-
- if (endTime < pitrMaxStartTime) {
- String errorMsg = String.format(
- "Requested recovery time (%d) is out of the allowed PITR window (last %d days).", endTime,
- pitrWindowDays);
- LOG.error(errorMsg);
- throw new IOException(errorMsg);
- }
-
- if (endTime > currentTime) {
- String errorMsg = String.format(
- "Requested recovery time (%d) is in the future. Current time: %d.", endTime, currentTime);
- LOG.error(errorMsg);
- throw new IOException(errorMsg);
- }
- }
-
- /**
- * Resolves the target table array. If null or empty, defaults to the source table array.
- */
- private TableName[] resolveTargetTables(TableName[] sourceTables, TableName[] targetTables) {
- return (targetTables == null || targetTables.length == 0) ? sourceTables : targetTables;
- }
-
/**
- * Validates whether Point-In-Time Recovery (PITR) is possible for the given tables at the
- * specified time.
+ * Initiates Point-In-Time Restore (PITR) for the given request.
*
- * PITR requires:
- *
- * - Continuous backup to be enabled for the source tables.
- * - A valid backup image and corresponding WALs to be available.
- *
- * @param endTime The target recovery time.
- * @param sTableArray The source tables to restore.
- * @param tTableArray The target tables where the restore will be performed.
- * @throws IOException If PITR is not possible due to missing continuous backup or backup images.
- */
- private void validatePitr(long endTime, TableName[] sTableArray, TableName[] tTableArray)
- throws IOException {
- try (BackupSystemTable table = new BackupSystemTable(conn)) {
- // Retrieve the set of tables with continuous backup enabled
- Map continuousBackupTables = table.getContinuousBackupTableSet();
-
- // Ensure all source tables have continuous backup enabled
- validateContinuousBackup(sTableArray, continuousBackupTables);
-
- // Fetch completed backup information
- List backupInfos = table.getBackupInfos(BackupState.COMPLETE);
-
- // Ensure a valid backup and WALs exist for PITR
- validateBackupAvailability(sTableArray, tTableArray, endTime, continuousBackupTables,
- backupInfos);
- }
- }
-
- /**
- * Ensures that all source tables have continuous backup enabled.
- */
- private void validateContinuousBackup(TableName[] tables,
- Map continuousBackupTables) throws IOException {
- List missingTables =
- Arrays.stream(tables).filter(table -> !continuousBackupTables.containsKey(table)).toList();
-
- if (!missingTables.isEmpty()) {
- String errorMsg = "Continuous Backup is not enabled for the following tables: "
- + missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(", "));
- LOG.error(errorMsg);
- throw new IOException(errorMsg);
- }
- }
-
- /**
- * Ensures that a valid backup and corresponding WALs exist for PITR for each source table. PITR
- * requires: 1. A valid backup available before the end time. 2. Write-Ahead Logs (WALs) covering
- * the remaining duration up to the end time.
+ * If {@code backupRootDir} is specified in the request, performs PITR using metadata from the
+ * provided custom backup location. Otherwise, defaults to using metadata from the backup system
+ * table.
+ * @param request PointInTimeRestoreRequest containing PITR parameters.
+ * @throws IOException if validation fails or restore cannot be completed.
*/
- private void validateBackupAvailability(TableName[] sTableArray, TableName[] tTableArray,
- long endTime, Map continuousBackupTables, List backupInfos)
- throws IOException {
- for (int i = 0; i < sTableArray.length; i++) {
- if (
- !canPerformPitr(sTableArray[i], tTableArray[i], endTime, continuousBackupTables,
- backupInfos)
- ) {
- String errorMsg = "Could not find a valid backup and WALs for PITR for table: "
- + sTableArray[i].getNameAsString();
- LOG.error(errorMsg);
- throw new IOException(errorMsg);
- }
- }
- }
-
- /**
- * Checks whether PITR can be performed for a given source-target table pair.
- */
- private boolean canPerformPitr(TableName stableName, TableName tTableName, long endTime,
- Map continuousBackupTables, List backupInfos) {
- return getValidBackupInfo(stableName, tTableName, endTime, continuousBackupTables, backupInfos)
- != null;
- }
-
- /**
- * Finds a valid backup for PITR that meets the required conditions.
- */
- private BackupInfo getValidBackupInfo(TableName sTableName, TableName tTablename, long endTime,
- Map continuousBackupTables, List backupInfos) {
- for (BackupInfo info : backupInfos) {
- if (isValidBackupForPitr(info, sTableName, endTime, continuousBackupTables)) {
-
- RestoreRequest restoreRequest =
- BackupUtils.createRestoreRequest(info.getBackupRootDir(), info.getBackupId(), true,
- new TableName[] { sTableName }, new TableName[] { tTablename }, false);
-
- try {
- if (validateRequest(restoreRequest)) {
- return info;
- }
- } catch (IOException e) {
- LOG.warn("Exception occurred while testing the backup : {} for restore ", info, e);
- }
- }
- }
- return null;
- }
-
- /**
- * Determines if the given backup is valid for PITR.
- *
- * A backup is valid if:
- *
- * - It contains the source table.
- * - It was completed before the end time.
- * - The start timestamp of the backup is after the continuous backup start time for the
- * table.
- *
- * @param info Backup information object.
- * @param tableName Table to check.
- * @param endTime The target recovery time.
- * @param continuousBackupTables Map of tables with continuous backup enabled.
- * @return true if the backup is valid for PITR, false otherwise.
- */
- private boolean isValidBackupForPitr(BackupInfo info, TableName tableName, long endTime,
- Map continuousBackupTables) {
- return info.getTableNames().contains(tableName) && info.getCompleteTs() <= endTime
- && continuousBackupTables.getOrDefault(tableName, 0L) <= info.getStartTs();
- }
-
- /**
- * Restores a table from a valid backup and replays WALs to reach the desired PITR state.
- */
- private void restoreTableWithWalReplay(TableName sourceTable, TableName targetTable, long endTime,
- Map continuousBackupTables, List backupInfos,
- PointInTimeRestoreRequest request) throws IOException {
- BackupInfo backupInfo =
- getValidBackupInfo(sourceTable, targetTable, endTime, continuousBackupTables, backupInfos);
- if (backupInfo == null) {
- String errorMsg = "Could not find a valid backup and WALs for PITR for table: "
- + sourceTable.getNameAsString();
- LOG.error(errorMsg);
- throw new IOException(errorMsg);
- }
-
- RestoreRequest restoreRequest = BackupUtils.createRestoreRequest(backupInfo.getBackupRootDir(),
- backupInfo.getBackupId(), false, new TableName[] { sourceTable },
- new TableName[] { targetTable }, request.isOverwrite());
-
- restore(restoreRequest);
- replayWal(sourceTable, targetTable, backupInfo.getStartTs(), endTime);
- }
-
- /**
- * Replays WALs to bring the table to the desired state.
- */
- private void replayWal(TableName sourceTable, TableName targetTable, long startTime, long endTime)
- throws IOException {
- String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
- Path walDirPath = new Path(walBackupDir);
- LOG.info(
- "Starting WAL replay for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}",
- sourceTable, targetTable, startTime, endTime, walDirPath);
-
- List validDirs =
- getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
- if (validDirs.isEmpty()) {
- LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL replay.", startTime,
- endTime);
- return;
- }
-
- executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
- }
-
- /**
- * Fetches valid WAL directories based on the given time range.
- */
- private List getValidWalDirs(Configuration conf, Path walBackupDir, long startTime,
- long endTime) throws IOException {
- FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
- FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR));
-
- List validDirs = new ArrayList<>();
- SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-
- for (FileStatus dayDir : dayDirs) {
- if (!dayDir.isDirectory()) {
- continue; // Skip files, only process directories
- }
-
- String dirName = dayDir.getPath().getName();
- try {
- Date dirDate = dateFormat.parse(dirName);
- long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
- long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59)
-
- // Check if this day's WAL files overlap with the required time range
- if (dirEndTime >= startTime && dirStartTime <= endTime) {
- validDirs.add(dayDir.getPath().toString());
- }
- } catch (ParseException e) {
- LOG.warn("Skipping invalid directory name: " + dirName, e);
- }
- }
- return validDirs;
- }
-
- /**
- * Executes WAL replay using WALPlayer.
- */
- private void executeWalReplay(List walDirs, TableName sourceTable, TableName targetTable,
- long startTime, long endTime) throws IOException {
- Tool walPlayer = initializeWalPlayer(startTime, endTime);
- String[] args =
- { String.join(",", walDirs), sourceTable.getNameAsString(), targetTable.getNameAsString() };
+ @Override
+ public void pointInTimeRestore(PointInTimeRestoreRequest request) throws IOException {
+ AbstractPitrRestoreHandler handler;
- try {
- LOG.info("Executing WALPlayer with args: {}", Arrays.toString(args));
- int exitCode = walPlayer.run(args);
- if (exitCode == 0) {
- LOG.info("WAL replay completed successfully for {}", targetTable);
- } else {
- throw new IOException("WAL replay failed with exit code: " + exitCode);
- }
- } catch (Exception e) {
- LOG.error("Error during WAL replay for {}: {}", targetTable, e.getMessage(), e);
- throw new IOException("Exception during WAL replay", e);
+ // Choose the appropriate handler based on whether a custom backup location is provided
+ if (request.getBackupRootDir() == null) {
+ handler = new DefaultPitrRestoreHandler(conn, request);
+ } else {
+ handler = new CustomBackupLocationPitrRestoreHandler(conn, request);
}
- }
+ handler.validateAndRestore();
- /**
- * Initializes and configures WALPlayer.
- */
- private Tool initializeWalPlayer(long startTime, long endTime) {
- Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
- conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
- conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
- conf.setBoolean(IGNORE_EMPTY_FILES, true);
- Tool walPlayer = new WALPlayer();
- walPlayer.setConf(conf);
- return walPlayer;
+ LOG.info("Successfully completed Point In Time Restore for all tables.");
}
@Override
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
new file mode 100644
index 000000000000..8b785a0f0504
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.util.List;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Adapter that wraps a {@link BackupImage} to expose it as {@link PitrBackupMetadata}.
+ */
+@InterfaceAudience.Private
+public class BackupImageAdapter implements PitrBackupMetadata {
+ private final BackupImage image;
+
+ public BackupImageAdapter(BackupImage image) {
+ this.image = image;
+ }
+
+ @Override
+ public List getTableNames() {
+ return image.getTableNames();
+ }
+
+ @Override
+ public long getStartTs() {
+ return image.getStartTs();
+ }
+
+ @Override
+ public long getCompleteTs() {
+ return image.getCompleteTs();
+ }
+
+ @Override
+ public String getBackupId() {
+ return image.getBackupId();
+ }
+
+ @Override
+ public String getRootDir() {
+ return image.getRootDir();
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
new file mode 100644
index 000000000000..967fae551cb5
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.util.List;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Adapter that wraps a {@link BackupInfo} to expose it as {@link PitrBackupMetadata}.
+ */
+@InterfaceAudience.Private
+public class BackupInfoAdapter implements PitrBackupMetadata {
+ private final BackupInfo info;
+
+ public BackupInfoAdapter(BackupInfo info) {
+ this.info = info;
+ }
+
+ @Override
+ public List getTableNames() {
+ return info.getTableNames();
+ }
+
+ @Override
+ public long getStartTs() {
+ return info.getStartTs();
+ }
+
+ @Override
+ public long getCompleteTs() {
+ return info.getCompleteTs();
+ }
+
+ @Override
+ public String getBackupId() {
+ return info.getBackupId();
+ }
+
+ @Override
+ public String getRootDir() {
+ return info.getBackupRootDir();
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/CustomBackupLocationPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/CustomBackupLocationPitrRestoreHandler.java
new file mode 100644
index 000000000000..1657b68d0234
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/CustomBackupLocationPitrRestoreHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * PITR restore handler that retrieves backup metadata from a custom backup root directory.
+ *
+ * This implementation is used when the PITR request specifies a custom backup location via
+ * {@code backupRootDir}.
+ */
+@InterfaceAudience.Private
+public class CustomBackupLocationPitrRestoreHandler extends AbstractPitrRestoreHandler {
+
+ public CustomBackupLocationPitrRestoreHandler(Connection conn,
+ PointInTimeRestoreRequest request) {
+ super(conn, request);
+ }
+
+ /**
+ * Retrieves completed backup entries from the given custom backup root directory and converts
+ * them into {@link PitrBackupMetadata} using {@link BackupImageAdapter}.
+ * @param request the PITR request
+ * @return list of completed backup metadata entries from the custom location
+ * @throws IOException if reading from the custom backup directory fails
+ */
+ @Override
+ protected List getBackupMetadata(PointInTimeRestoreRequest request)
+ throws IOException {
+ return HBackupFileSystem
+ .getAllBackupImages(conn.getConfiguration(), new Path(request.getBackupRootDir())).stream()
+ .map(BackupImageAdapter::new).collect(Collectors.toList());
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java
new file mode 100644
index 000000000000..c6844ba96bd3
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Default PITR restore handler that retrieves backup metadata from the system table.
+ *
+ * This implementation is used when no custom backup root directory is specified in the request.
+ */
+@InterfaceAudience.Private
+public class DefaultPitrRestoreHandler extends AbstractPitrRestoreHandler {
+
+ public DefaultPitrRestoreHandler(Connection conn, PointInTimeRestoreRequest request) {
+ super(conn, request);
+ }
+
+ /**
+ * Retrieves completed backup entries from the BackupSystemTable and converts them into
+ * {@link PitrBackupMetadata} using {@link BackupInfoAdapter}.
+ * @param request the PITR request
+ * @return list of completed backup metadata entries
+ * @throws IOException if reading from the backup system table fails
+ */
+ @Override
+ protected List getBackupMetadata(PointInTimeRestoreRequest request)
+ throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ return table.getBackupInfos(BackupInfo.BackupState.COMPLETE).stream()
+ .map(BackupInfoAdapter::new).collect(Collectors.toList());
+ }
+ }
+}
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
new file mode 100644
index 000000000000..dc135ce79c08
--- /dev/null
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.util.List;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A unified abstraction over backup metadata used during Point-In-Time Restore (PITR).
+ *
+ * This interface allows the PITR algorithm to operate uniformly over different types of backup
+ * metadata sources, such as {@link BackupInfo} (system table) and {@link BackupImage} (custom
+ * backup location), without knowing their specific implementations.
+ */
+@InterfaceAudience.Private
+public interface PitrBackupMetadata {
+
+ /** Returns List of table names included in the backup */
+ List getTableNames();
+
+ /** Returns Start timestamp of the backup */
+ long getStartTs();
+
+ /** Returns Completion timestamp of the backup */
+ long getCompleteTs();
+
+ /** Returns Unique identifier for the backup */
+ String getBackupId();
+
+ /** Returns Root directory where the backup is stored */
+ String getRootDir();
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
new file mode 100644
index 000000000000..ae26cf960501
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public final class PITRTestUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(PITRTestUtil.class);
+ private static final int DEFAULT_WAIT_FOR_REPLICATION_MS = 30_000;
+
+ private PITRTestUtil() {
+ // Utility class
+ }
+
+ public static String[] buildPITRArgs(TableName[] sourceTables, TableName[] targetTables,
+ long endTime, String backupRootDir) {
+ String sourceTableNames =
+ Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+ String targetTableNames =
+ Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+ List args = new ArrayList<>();
+ args.add("-" + OPTION_TABLE);
+ args.add(sourceTableNames);
+ args.add("-" + OPTION_TABLE_MAPPING);
+ args.add(targetTableNames);
+ args.add("-" + OPTION_TO_DATETIME);
+ args.add(String.valueOf(endTime));
+
+ if (backupRootDir != null) {
+ args.add("-" + OPTION_PITR_BACKUP_PATH);
+ args.add(backupRootDir);
+ }
+
+ return args.toArray(new String[0]);
+ }
+
+ public static String[] buildBackupArgs(String backupType, TableName[] tables,
+ boolean continuousEnabled, String backupRootDir) {
+ String tableNames =
+ Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+ List args = new ArrayList<>(
+ Arrays.asList("create", backupType, backupRootDir, "-" + OPTION_TABLE, tableNames));
+
+ if (continuousEnabled) {
+ args.add("-" + OPTION_ENABLE_CONTINUOUS_BACKUP);
+ }
+
+ return args.toArray(new String[0]);
+ }
+
+ public static void loadRandomData(HBaseTestingUtil testUtil, TableName tableName, byte[] family,
+ int totalRows) throws IOException {
+ try (Table table = testUtil.getConnection().getTable(tableName)) {
+ testUtil.loadRandomRows(table, family, 32, totalRows);
+ }
+ }
+
+ public static void waitForReplication() {
+ try {
+ LOG.info("Waiting for replication to complete for {} ms", DEFAULT_WAIT_FOR_REPLICATION_MS);
+ Thread.sleep(DEFAULT_WAIT_FOR_REPLICATION_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while waiting for replication", e);
+ }
+ }
+
+ public static int getRowCount(HBaseTestingUtil testUtil, TableName tableName) throws IOException {
+ try (Table table = testUtil.getConnection().getTable(tableName)) {
+ return HBaseTestingUtil.countRows(table);
+ }
+ }
+}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
index fb37977c4eee..a1ce9c97a687 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
@@ -18,23 +18,15 @@
package org.apache.hadoop.hbase.backup;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ToolRunner;
@@ -43,8 +35,6 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Category(LargeTests.class)
public class TestPointInTimeRestore extends TestBackupBase {
@@ -52,10 +42,7 @@ public class TestPointInTimeRestore extends TestBackupBase {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestPointInTimeRestore.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestPointInTimeRestore.class);
-
private static final String backupWalDirName = "TestPointInTimeRestoreWalDir";
- private static final int WAIT_FOR_REPLICATION_MS = 30_000;
static Path backupWalDir;
static FileSystem fs;
@@ -80,38 +67,41 @@ private static void setUpBackups() throws Exception {
// Simulate a backup taken 20 days ago
EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - 20 * ONE_DAY_IN_MILLISECONDS);
- loadRandomData(table1, 1000); // Insert initial data into table1
+ PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert initial data into
+ // table1
// Perform a full backup for table1 with continuous backup enabled
- String[] args = buildBackupArgs("full", new TableName[] { table1 }, true);
+ String[] args =
+ PITRTestUtil.buildBackupArgs("full", new TableName[] { table1 }, true, BACKUP_ROOT_DIR);
int ret = ToolRunner.run(conf1, new BackupDriver(), args);
assertEquals("Backup should succeed", 0, ret);
// Move time forward to simulate 15 days ago
EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - 15 * ONE_DAY_IN_MILLISECONDS);
- loadRandomData(table1, 1000); // Add more data to table1
- loadRandomData(table2, 500); // Insert data into table2
+ PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Add more data to table1
+ PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Insert data into table2
- waitForReplication(); // Ensure replication is complete
+ PITRTestUtil.waitForReplication(); // Ensure replication is complete
// Perform a full backup for table2 with continuous backup enabled
- args = buildBackupArgs("full", new TableName[] { table2 }, true);
+ args = PITRTestUtil.buildBackupArgs("full", new TableName[] { table2 }, true, BACKUP_ROOT_DIR);
ret = ToolRunner.run(conf1, new BackupDriver(), args);
assertEquals("Backup should succeed", 0, ret);
// Move time forward to simulate 10 days ago
EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - 10 * ONE_DAY_IN_MILLISECONDS);
- loadRandomData(table2, 500); // Add more data to table2
- loadRandomData(table3, 500); // Insert data into table3
+ PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Add more data to table2
+ PITRTestUtil.loadRandomData(TEST_UTIL, table3, famName, 500); // Insert data into table3
// Perform a full backup for table3 and table4 (without continuous backup)
- args = buildBackupArgs("full", new TableName[] { table3, table4 }, false);
+ args = PITRTestUtil.buildBackupArgs("full", new TableName[] { table3, table4 }, false,
+ BACKUP_ROOT_DIR);
ret = ToolRunner.run(conf1, new BackupDriver(), args);
assertEquals("Backup should succeed", 0, ret);
- waitForReplication(); // Ensure replication is complete before concluding setup
+ PITRTestUtil.waitForReplication(); // Ensure replication is complete before concluding setup
// Reset time mocking to avoid affecting other tests
EnvironmentEdgeManager.reset();
@@ -137,18 +127,18 @@ public static void setupAfterClass() throws IOException {
@Test
public void testPITR_FailsOutsideWindow() throws Exception {
// Case 1: Requested restore time is in the future (should fail)
- String[] args = buildPITRArgs(new TableName[] { table1 },
+ String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table1 },
new TableName[] { TableName.valueOf("restoredTable1") },
- EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS);
+ EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS, null);
int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
assertNotEquals("Restore should fail since the requested restore time is in the future", 0,
ret);
// Case 2: Requested restore time is too old (beyond the retention window, should fail)
- args = buildPITRArgs(new TableName[] { table1 },
+ args = PITRTestUtil.buildPITRArgs(new TableName[] { table1 },
new TableName[] { TableName.valueOf("restoredTable1") },
- EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS);
+ EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS, null);
ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
assertNotEquals(
@@ -162,9 +152,9 @@ public void testPITR_FailsOutsideWindow() throws Exception {
*/
@Test
public void testPointInTimeRestore_ContinuousBackupNotEnabledTables() throws Exception {
- String[] args = buildPITRArgs(new TableName[] { table3 },
+ String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table3 },
new TableName[] { TableName.valueOf("restoredTable1") },
- EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS);
+ EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS, null);
int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
assertNotEquals("Restore should fail since continuous backup is not enabled for the table", 0,
@@ -176,9 +166,9 @@ public void testPointInTimeRestore_ContinuousBackupNotEnabledTables() throws Exc
*/
@Test
public void testPointInTimeRestore_TablesWithNoProperBackupOrWals() throws Exception {
- String[] args = buildPITRArgs(new TableName[] { table2 },
+ String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table2 },
new TableName[] { TableName.valueOf("restoredTable1") },
- EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS);
+ EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS, null);
int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
assertNotEquals(
@@ -194,15 +184,17 @@ public void testPointInTimeRestore_SuccessfulRestoreForOneTable() throws Excepti
TableName restoredTable = TableName.valueOf("restoredTable");
// Perform restore operation
- String[] args = buildPITRArgs(new TableName[] { table1 }, new TableName[] { restoredTable },
- EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+ String[] args =
+ PITRTestUtil.buildPITRArgs(new TableName[] { table1 }, new TableName[] { restoredTable },
+ EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS, null);
int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
assertEquals("Restore should succeed", 0, ret);
// Validate that the restored table contains the same number of rows as the original table
assertEquals("Restored table should have the same row count as the original",
- getRowCount(table1), getRowCount(restoredTable));
+ PITRTestUtil.getRowCount(TEST_UTIL, table1),
+ PITRTestUtil.getRowCount(TEST_UTIL, restoredTable));
}
/**
@@ -214,64 +206,19 @@ public void testPointInTimeRestore_SuccessfulRestoreForMultipleTables() throws E
TableName restoredTable2 = TableName.valueOf("restoredTable2");
// Perform restore operation for multiple tables
- String[] args = buildPITRArgs(new TableName[] { table1, table2 },
+ String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table1, table2 },
new TableName[] { restoredTable1, restoredTable2 },
- EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+ EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS, null);
int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
assertEquals("Restore should succeed", 0, ret);
// Validate that the restored tables contain the same number of rows as the originals
assertEquals("Restored table1 should have the same row count as the original",
- getRowCount(table1), getRowCount(restoredTable1));
+ PITRTestUtil.getRowCount(TEST_UTIL, table1),
+ PITRTestUtil.getRowCount(TEST_UTIL, restoredTable1));
assertEquals("Restored table2 should have the same row count as the original",
- getRowCount(table2), getRowCount(restoredTable2));
- }
-
- private String[] buildPITRArgs(TableName[] sourceTables, TableName[] targetTables, long endTime) {
- String sourceTableNames =
- Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
-
- String targetTableNames =
- Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
-
- return new String[] { "-" + OPTION_TABLE, sourceTableNames, "-" + OPTION_TABLE_MAPPING,
- targetTableNames, "-" + OPTION_TO_DATETIME, String.valueOf(endTime) };
- }
-
- private static String[] buildBackupArgs(String backupType, TableName[] tables,
- boolean continuousEnabled) {
- String tableNames =
- Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
-
- if (continuousEnabled) {
- return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + OPTION_TABLE, tableNames,
- "-" + OPTION_ENABLE_CONTINUOUS_BACKUP };
- } else {
- return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-" + OPTION_TABLE, tableNames };
- }
- }
-
- private static void loadRandomData(TableName tableName, int totalRows) throws IOException {
- int rowSize = 32;
- try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
- TEST_UTIL.loadRandomRows(table, famName, rowSize, totalRows);
- }
- }
-
- private static void waitForReplication() {
- LOG.info("Waiting for replication to complete for {} ms", WAIT_FOR_REPLICATION_MS);
- try {
- Thread.sleep(WAIT_FOR_REPLICATION_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Thread was interrupted while waiting", e);
- }
- }
-
- private int getRowCount(TableName tableName) throws IOException {
- try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
- return HBaseTestingUtil.countRows(table);
- }
+ PITRTestUtil.getRowCount(TEST_UTIL, table2),
+ PITRTestUtil.getRowCount(TEST_UTIL, restoredTable2));
}
}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestoreWithCustomBackupPath.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestoreWithCustomBackupPath.java
new file mode 100644
index 000000000000..78c5ac94ba00
--- /dev/null
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestoreWithCustomBackupPath.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestPointInTimeRestoreWithCustomBackupPath extends TestBackupBase {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestPointInTimeRestoreWithCustomBackupPath.class);
+
+ private static final String backupWalDirName = "TestCustomBackupWalDir";
+ private static final String customBackupDirName = "CustomBackupRoot";
+
+ private static Path backupWalDir;
+ private static Path customBackupDir;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Path root = TEST_UTIL.getDataTestDirOnTestFS();
+ backupWalDir = new Path(root, backupWalDirName);
+ customBackupDir = new Path(root, customBackupDirName);
+
+ fs = FileSystem.get(conf1);
+ fs.mkdirs(backupWalDir);
+ fs.mkdirs(customBackupDir);
+
+ conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+
+ createAndCopyBackupData();
+ }
+
+ private static void createAndCopyBackupData() throws Exception {
+ // Simulate time 10 days ago
+ EnvironmentEdgeManager
+ .injectEdge(() -> System.currentTimeMillis() - 10 * ONE_DAY_IN_MILLISECONDS);
+ PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000);
+
+ // Perform backup with continuous backup enabled
+ String[] args =
+ PITRTestUtil.buildBackupArgs("full", new TableName[] { table1 }, true, BACKUP_ROOT_DIR);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Backup should succeed", 0, ret);
+
+ PITRTestUtil.waitForReplication();
+
+ // Copy the contents of BACKUP_ROOT_DIR to the new customBackupDir
+ Path defaultBackupDir = new Path(BACKUP_ROOT_DIR);
+ for (FileStatus status : fs.listStatus(defaultBackupDir)) {
+ Path dst = new Path(customBackupDir, status.getPath().getName());
+ FileUtil.copy(fs, status, fs, dst, true, false, conf1);
+ }
+
+ EnvironmentEdgeManager.reset();
+ }
+
+ @AfterClass
+ public static void cleanupAfterClass() throws IOException {
+ if (fs.exists(backupWalDir)) {
+ fs.delete(backupWalDir, true);
+ }
+ if (fs.exists(customBackupDir)) {
+ fs.delete(customBackupDir, true);
+ }
+
+ conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ }
+
+ @Test
+ public void testPITR_FromCustomBackupRootDir() throws Exception {
+ TableName restoredTable = TableName.valueOf("restoredTableCustomPath");
+
+ long restoreTime = EnvironmentEdgeManager.currentTime() - 2 * ONE_DAY_IN_MILLISECONDS;
+
+ String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { table1 },
+ new TableName[] { restoredTable }, restoreTime, customBackupDir.toString());
+
+ int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+ assertEquals("PITR should succeed with custom backup root dir", 0, ret);
+
+ // Validate that the restored table has same row count
+ assertEquals("Restored table should match row count",
+ PITRTestUtil.getRowCount(TEST_UTIL, table1),
+ PITRTestUtil.getRowCount(TEST_UTIL, restoredTable));
+ }
+}