Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
Expand Down Expand Up @@ -248,6 +249,8 @@ private PitrBackupMetadata getValidBackup(TableName sTableName, TableName tTable

try {
if (backupAdmin.validateRequest(restoreRequest)) {
// check if any bulkload entry exists post this backup time and before "endtime"
checkBulkLoadAfterBackup(conn, sTableName, backup, endTime);
return backup;
}
} catch (IOException e) {
Expand All @@ -259,6 +262,31 @@ private PitrBackupMetadata getValidBackup(TableName sTableName, TableName tTable
return null;
}

/**
* Checks if any bulk load operation occurred for the specified table post last successful backup
* and before restore time.
* @param conn Active HBase connection
* @param sTableName Table for which to check bulk load history
* @param backup Last successful backup before the target recovery time
* @param endTime Target recovery time
* @throws IOException if a bulkload entry is found in between backup time and endtime
*/
private void checkBulkLoadAfterBackup(Connection conn, TableName sTableName,
PitrBackupMetadata backup, long endTime) throws IOException {
try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
List<BulkLoad> bulkLoads = backupSystemTable.readBulkloadRows(List.of(sTableName));
for (BulkLoad load : bulkLoads) {
long lastBackupTs = (backup.getType() == BackupType.FULL)
? backup.getStartTs()
: backup.getIncrCommittedWalTs();
if (lastBackupTs < load.getTimestamp() && load.getTimestamp() < endTime) {
throw new IOException("Bulk load operation detected after last successful backup for "
+ "table: " + sTableName);
Comment on lines +283 to +284
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this error message should be a little more descriptive? It is saying why there is an error, but it doesn't necessarily tell the user what they should be doing instead to prevent this error.

My understanding is the user is supposed to perform a full or incremental backup after doing a bulkload. To me, this function is detecting that a bulkload has occurred since the last backup, and it is correctly throwing an error. However, the message isn't telling the user they should do another backup after bulkloading in order to get around this error.

}
}
}
}

/**
* Determines if the given backup is valid for PITR.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -57,4 +58,14 @@ public String getBackupId() {
public String getRootDir() {
return image.getRootDir();
}

@Override
public BackupType getType() {
return image.getType();
}

@Override
public long getIncrCommittedWalTs() {
return image.getIncrCommittedWalTs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -57,4 +58,14 @@ public String getBackupId() {
public String getRootDir() {
return info.getBackupRootDir();
}

@Override
public BackupType getType() {
return info.getType();
}

@Override
public long getIncrCommittedWalTs() {
return info.getIncrCommittedWalTs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ Builder withCompleteTime(long completeTime) {
return this;
}

Builder withIncrCommittedWalTs(long incrCommittedWalTs) {
image.setIncrCommittedWalTs(incrCommittedWalTs);
return this;
}

BackupImage build() {
return image;
}
Expand All @@ -115,6 +120,7 @@ BackupImage build() {
private long completeTs;
private ArrayList<BackupImage> ancestors;
private Map<TableName, Map<String, Long>> incrTimeRanges;
private long incrCommittedWalTs;

static Builder newBuilder() {
return new Builder();
Expand All @@ -125,20 +131,22 @@ public BackupImage() {
}

private BackupImage(String backupId, BackupType type, String rootDir, List<TableName> tableList,
long startTs, long completeTs) {
long startTs, long completeTs, long incrCommittedWalTs) {
this.backupId = backupId;
this.type = type;
this.rootDir = rootDir;
this.tableList = tableList;
this.startTs = startTs;
this.completeTs = completeTs;
this.incrCommittedWalTs = incrCommittedWalTs;
}

static BackupImage fromProto(BackupProtos.BackupImage im) {
String backupId = im.getBackupId();
String rootDir = im.getBackupRootDir();
long startTs = im.getStartTs();
long completeTs = im.getCompleteTs();
long incrCommittedWalTs = im.getIncrCommittedWalTs();
List<HBaseProtos.TableName> tableListList = im.getTableListList();
List<TableName> tableList = new ArrayList<>();
for (HBaseProtos.TableName tn : tableListList) {
Expand All @@ -151,7 +159,8 @@ static BackupImage fromProto(BackupProtos.BackupImage im) {
? BackupType.FULL
: BackupType.INCREMENTAL;

BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs);
BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs,
incrCommittedWalTs);
for (BackupProtos.BackupImage img : ancestorList) {
image.addAncestor(fromProto(img));
}
Expand All @@ -170,6 +179,7 @@ BackupProtos.BackupImage toProto() {
builder.setBackupId(backupId);
builder.setCompleteTs(completeTs);
builder.setStartTs(startTs);
builder.setIncrCommittedWalTs(incrCommittedWalTs);
if (type == BackupType.FULL) {
builder.setBackupType(BackupProtos.BackupType.FULL);
} else {
Expand Down Expand Up @@ -287,6 +297,14 @@ public long getCompleteTs() {
return completeTs;
}

public long getIncrCommittedWalTs() {
return incrCommittedWalTs;
}

public void setIncrCommittedWalTs(long incrCommittedWalTs) {
this.incrCommittedWalTs = incrCommittedWalTs;
}

private void setCompleteTs(long completeTs) {
this.completeTs = completeTs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ public List<BulkLoad> readBulkloadRows(List<TableName> tableList, long endTimest
path = Bytes.toString(CellUtil.cloneValue(cell));
}
}
LOG.debug("found orig {} for {} of table {} with timestamp {}", path, fam, region,
timestamp);
LOG.debug("Found orig path {} for family {} of table {} and region {} with timestamp {}",
path, fam, table, region, timestamp);
if (timestamp <= endTimestamp) {
result.add(new BulkLoad(table, region, fam, path, row, timestamp));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -47,4 +48,10 @@ public interface PitrBackupMetadata {

/** Returns Root directory where the backup is stored */
String getRootDir();

/** Returns backup type */
BackupType getType();

/** Returns incrCommittedWalTs */
long getIncrCommittedWalTs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand All @@ -33,16 +34,13 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
Expand All @@ -68,8 +66,6 @@ public class TestIncrementalBackupWithContinuous extends TestContinuousBackup {
private static final Logger LOG =
LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);

private byte[] ROW = Bytes.toBytes("row1");
private final byte[] COLUMN = Bytes.toBytes("col");
private static final int ROWS_IN_BULK_LOAD = 100;

@Test
Expand Down Expand Up @@ -186,11 +182,55 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws
}
}

private void verifyTable(Table t1) throws IOException {
Get g = new Get(ROW);
Result r = t1.get(g);
assertEquals(1, r.size());
assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN));
@Test
public void testPitrFailureDueToMissingBackupPostBulkload() throws Exception {
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
TableName tableName1 = TableName.valueOf("table_" + methodName);
TEST_UTIL.createTable(tableName1, famName);
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {

// The test starts with no data, and no bulk loaded rows.
int expectedRowCount = 0;
assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());

// Create continuous backup, bulk loads are now being tracked
String backup1 = backupTables(BackupType.FULL, List.of(tableName1), BACKUP_ROOT_DIR, true);
assertTrue(checkSucceeded(backup1));

loadTable(TEST_UTIL.getConnection().getTable(tableName1));
expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH;
performBulkLoad("bulkPreIncr", methodName, tableName1);
expectedRowCount += ROWS_IN_BULK_LOAD;
assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size());

loadTable(TEST_UTIL.getConnection().getTable(tableName1));
Thread.sleep(5000);

// Incremental backup
String backup2 =
backupTables(BackupType.INCREMENTAL, List.of(tableName1), BACKUP_ROOT_DIR, true);
assertTrue(checkSucceeded(backup2));
assertEquals(0, systemTable.readBulkloadRows(List.of(tableName1)).size());

performBulkLoad("bulkPostIncr", methodName, tableName1);
assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size());

loadTable(TEST_UTIL.getConnection().getTable(tableName1));
Thread.sleep(10000);
long restoreTs = BackupUtils.getReplicationCheckpoint(TEST_UTIL.getConnection());

// expect restore failure due to no backup post bulkPostIncr bulkload
TableName restoredTable = TableName.valueOf("restoredTable");
String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { tableName1 },
new TableName[] { restoredTable }, restoreTs, null);
int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
assertNotEquals("Restore should fail since there is one bulkload without any backup", 0, ret);
} finally {
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
}
}

private void performBulkLoad(String keyPrefix, String testDir, TableName tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ private static void setUpBackups() throws Exception {
// Simulate a backup taken 20 days ago
EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - 20 * ONE_DAY_IN_MILLISECONDS);
PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert initial data into
// table1
// Insert initial data into table1
PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000);

// Perform a full backup for table1 with continuous backup enabled
String[] args =
Expand Down
1 change: 1 addition & 0 deletions hbase-protocol-shaded/src/main/protobuf/Backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message BackupImage {
optional uint64 complete_ts = 6;
repeated BackupImage ancestors = 7;
repeated TableServerTimestamp tst_map = 8;
optional uint64 incr_committed_wal_ts = 9;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,11 @@ public int run(String[] args) throws Exception {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
if (ret == 0) {
System.out.println("Bulk load completed successfully.");
System.out.println("IMPORTANT: Please take a backup of the table immediately if this table "
Comment on lines +1199 to +1200
Copy link

Copilot AI Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the framework’s logger (e.g., LOG.info) instead of System.out.println for consistency with the rest of HBase’s logging.

Suggested change
System.out.println("Bulk load completed successfully.");
System.out.println("IMPORTANT: Please take a backup of the table immediately if this table "
LOG.info("Bulk load completed successfully.");
LOG.info("IMPORTANT: Please take a backup of the table immediately if this table "

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not related, you can ignore it.

+ "is part of continuous backup");
}
System.exit(ret);
}

Expand Down