Skip to content

Commit

Permalink
HBASE-28705 BackupLogCleaner cleans required WALs when using multiple…
Browse files Browse the repository at this point in the history
… backuproots (#6040)

The BackupLogCleaner is responsible for avoiding the deletion of WAL/logs
that still need to be included in a future backup.

The logic to decide which files can be deleted did not work correctly when
multiple backup roots are used. Each backup root has a different chain of
backups (full, incremental1, incremental2, ...). So, if any chain requires
a log, it should be preserved. This was not the case.

The result was that logs could be incorrectly deleted, resulting in data loss
in backups.

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Ray Mattingly <rmattingly@apache.org >
Co-authored-by: DieterDP <90392398+DieterDP-ng@users.noreply.github.com>
  • Loading branch information
ndimiduk and DieterDP-ng authored Sep 26, 2024
1 parent 350bdec commit 4913c4f
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,39 +81,64 @@ public void init(Map<String, Object> params) {
}
}

private Map<Address, Long> getServerToNewestBackupTs(List<BackupInfo> backups)
/**
* Calculates the timestamp boundary up to which all backup roots have already included the WAL.
* I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental
* backups.
*/
private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backups)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Cleaning WALs if they are older than the newest backups. "
"Cleaning WALs if they are older than the WAL cleanup time-boundary. "
+ "Checking WALs against {} backups: {}",
backups.size(),
backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", ")));
}
Map<Address, Long> serverAddressToNewestBackupMap = new HashMap<>();

Map<TableName, Long> tableNameBackupInfoMap = new HashMap<>();
for (BackupInfo backupInfo : backups) {
for (TableName table : backupInfo.getTables()) {
tableNameBackupInfoMap.putIfAbsent(table, backupInfo.getStartTs());
if (tableNameBackupInfoMap.get(table) <= backupInfo.getStartTs()) {
tableNameBackupInfoMap.put(table, backupInfo.getStartTs());
for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
.entrySet()) {
serverAddressToNewestBackupMap.put(Address.fromString(entry.getKey()),
entry.getValue());

// This map tracks, for every backup root, the most recent created backup (= highest timestamp)
Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>();
for (BackupInfo backup : backups) {
BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir());
if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) {
newestBackupPerRootDir.put(backup.getBackupRootDir(), backup);
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("WAL cleanup time-boundary using info from: {}. ",
newestBackupPerRootDir.entrySet().stream()
.map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted()
.collect(Collectors.joining(", ")));
}

// This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp)
// inclusion in any backup. In other words, it is the timestamp boundary up to which all backup
// roots have included the WAL in their backup.
Map<Address, Long> boundaries = new HashMap<>();
for (BackupInfo backupInfo : newestBackupPerRootDir.values()) {
// Iterate over all tables in the timestamp map, which contains all tables covered in the
// backup root, not just the tables included in that specific backup (which could be a subset)
for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) {
for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
.entrySet()) {
Address address = Address.fromString(entry.getKey());
Long storedTs = boundaries.get(address);
if (storedTs == null || entry.getValue() < storedTs) {
boundaries.put(address, entry.getValue());
}
}
}
}

if (LOG.isDebugEnabled()) {
for (Map.Entry<Address, Long> entry : serverAddressToNewestBackupMap.entrySet()) {
LOG.debug("Server: {}, Newest Backup: {}", entry.getKey().getHostName(), entry.getValue());
for (Map.Entry<Address, Long> entry : boundaries.entrySet()) {
LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(),
entry.getValue());
}
}

return serverAddressToNewestBackupMap;
return boundaries;
}

@Override
Expand All @@ -128,18 +153,19 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
return files;
}

Map<Address, Long> addressToNewestBackupMap;
Map<Address, Long> serverToPreservationBoundaryTs;
try {
try (BackupManager backupManager = new BackupManager(conn, getConf())) {
addressToNewestBackupMap = getServerToNewestBackupTs(backupManager.getBackupHistory(true));
serverToPreservationBoundaryTs =
serverToPreservationBoundaryTs(backupManager.getBackupHistory(true));
}
} catch (IOException ex) {
LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
ex.getMessage(), ex);
return Collections.emptyList();
}
for (FileStatus file : files) {
if (canDeleteFile(addressToNewestBackupMap, file.getPath())) {
if (canDeleteFile(serverToPreservationBoundaryTs, file.getPath())) {
filteredFiles.add(file);
}
}
Expand Down Expand Up @@ -174,7 +200,7 @@ public boolean isStopped() {
return this.stopped;
}

protected static boolean canDeleteFile(Map<Address, Long> addressToNewestBackupMap, Path path) {
protected static boolean canDeleteFile(Map<Address, Long> addressToBoundaryTs, Path path) {
if (isHMasterWAL(path)) {
return true;
}
Expand All @@ -190,28 +216,27 @@ protected static boolean canDeleteFile(Map<Address, Long> addressToNewestBackupM
Address walServerAddress = Address.fromString(hostname);
long walTimestamp = WAL.getTimestamp(path.getName());

if (!addressToNewestBackupMap.containsKey(walServerAddress)) {
if (!addressToBoundaryTs.containsKey(walServerAddress)) {
if (LOG.isDebugEnabled()) {
LOG.debug("No backup found for server: {}. Deleting file: {}",
LOG.debug("No cleanup WAL time-boundary found for server: {}. Ok to delete file: {}",
walServerAddress.getHostName(), path);
}
return true;
}

Long lastBackupTs = addressToNewestBackupMap.get(walServerAddress);
if (lastBackupTs >= walTimestamp) {
Long backupBoundary = addressToBoundaryTs.get(walServerAddress);
if (backupBoundary >= walTimestamp) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Backup found for server: {}. Backup from {} is newer than file, so deleting: {}",
walServerAddress.getHostName(), lastBackupTs, path);
"WAL cleanup time-boundary found for server {}: {}. Ok to delete older file: {}",
walServerAddress.getHostName(), backupBoundary, path);
}
return true;
}

if (LOG.isDebugEnabled()) {
LOG.debug(
"Backup found for server: {}. Backup from {} is older than the file, so keeping: {}",
walServerAddress.getHostName(), lastBackupTs, path);
LOG.debug("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}",
walServerAddress.getHostName(), backupBoundary, path);
}
} catch (Exception ex) {
LOG.warn("Error occurred while filtering file: {}. Ignoring cleanup of this log", path, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected String backupTables(BackupType type, List<TableName> tables, String pa
try {
conn = ConnectionFactory.createConnection(conf1);
badmin = new BackupAdminImpl(conn);
BackupRequest request = createBackupRequest(type, tables, path);
BackupRequest request = createBackupRequest(type, new ArrayList<>(tables), path);
backupId = badmin.backupTables(request);
} finally {
if (badmin != null) {
Expand Down
Loading

0 comments on commit 4913c4f

Please sign in to comment.