diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java index 1e09a6c4aba9..1c6bc4077d7f 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java @@ -81,39 +81,64 @@ public void init(Map params) { } } - private Map getServerToNewestBackupTs(List backups) + /** + * Calculates the timestamp boundary up to which all backup roots have already included the WAL. + * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental + * backups. + */ + private Map serverToPreservationBoundaryTs(List backups) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug( - "Cleaning WALs if they are older than the newest backups. " + "Cleaning WALs if they are older than the WAL cleanup time-boundary. " + "Checking WALs against {} backups: {}", backups.size(), backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", "))); } - Map serverAddressToNewestBackupMap = new HashMap<>(); - - Map tableNameBackupInfoMap = new HashMap<>(); - for (BackupInfo backupInfo : backups) { - for (TableName table : backupInfo.getTables()) { - tableNameBackupInfoMap.putIfAbsent(table, backupInfo.getStartTs()); - if (tableNameBackupInfoMap.get(table) <= backupInfo.getStartTs()) { - tableNameBackupInfoMap.put(table, backupInfo.getStartTs()); - for (Map.Entry entry : backupInfo.getTableSetTimestampMap().get(table) - .entrySet()) { - serverAddressToNewestBackupMap.put(Address.fromString(entry.getKey()), - entry.getValue()); + + // This map tracks, for every backup root, the most recent created backup (= highest timestamp) + Map newestBackupPerRootDir = new HashMap<>(); + for (BackupInfo backup : backups) { + BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir()); + if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) { + newestBackupPerRootDir.put(backup.getBackupRootDir(), backup); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("WAL cleanup time-boundary using info from: {}. ", + newestBackupPerRootDir.entrySet().stream() + .map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted() + .collect(Collectors.joining(", "))); + } + + // This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp) + // inclusion in any backup. In other words, it is the timestamp boundary up to which all backup + // roots have included the WAL in their backup. + Map boundaries = new HashMap<>(); + for (BackupInfo backupInfo : newestBackupPerRootDir.values()) { + // Iterate over all tables in the timestamp map, which contains all tables covered in the + // backup root, not just the tables included in that specific backup (which could be a subset) + for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) { + for (Map.Entry entry : backupInfo.getTableSetTimestampMap().get(table) + .entrySet()) { + Address address = Address.fromString(entry.getKey()); + Long storedTs = boundaries.get(address); + if (storedTs == null || entry.getValue() < storedTs) { + boundaries.put(address, entry.getValue()); } } } } if (LOG.isDebugEnabled()) { - for (Map.Entry entry : serverAddressToNewestBackupMap.entrySet()) { - LOG.debug("Server: {}, Newest Backup: {}", entry.getKey().getHostName(), entry.getValue()); + for (Map.Entry entry : boundaries.entrySet()) { + LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(), + entry.getValue()); } } - return serverAddressToNewestBackupMap; + return boundaries; } @Override @@ -128,10 +153,11 @@ public Iterable getDeletableFiles(Iterable files) { return files; } - Map addressToNewestBackupMap; + Map serverToPreservationBoundaryTs; try { try (BackupManager backupManager = new BackupManager(conn, getConf())) { - addressToNewestBackupMap = getServerToNewestBackupTs(backupManager.getBackupHistory(true)); + serverToPreservationBoundaryTs = + serverToPreservationBoundaryTs(backupManager.getBackupHistory(true)); } } catch (IOException ex) { LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs", @@ -139,7 +165,7 @@ public Iterable getDeletableFiles(Iterable files) { return Collections.emptyList(); } for (FileStatus file : files) { - if (canDeleteFile(addressToNewestBackupMap, file.getPath())) { + if (canDeleteFile(serverToPreservationBoundaryTs, file.getPath())) { filteredFiles.add(file); } } @@ -174,7 +200,7 @@ public boolean isStopped() { return this.stopped; } - protected static boolean canDeleteFile(Map addressToNewestBackupMap, Path path) { + protected static boolean canDeleteFile(Map addressToBoundaryTs, Path path) { if (isHMasterWAL(path)) { return true; } @@ -190,28 +216,27 @@ protected static boolean canDeleteFile(Map addressToNewestBackupM Address walServerAddress = Address.fromString(hostname); long walTimestamp = AbstractFSWALProvider.getTimestamp(path.getName()); - if (!addressToNewestBackupMap.containsKey(walServerAddress)) { + if (!addressToBoundaryTs.containsKey(walServerAddress)) { if (LOG.isDebugEnabled()) { - LOG.debug("No backup found for server: {}. Deleting file: {}", + LOG.debug("No cleanup WAL time-boundary found for server: {}. Ok to delete file: {}", walServerAddress.getHostName(), path); } return true; } - Long lastBackupTs = addressToNewestBackupMap.get(walServerAddress); - if (lastBackupTs >= walTimestamp) { + Long backupBoundary = addressToBoundaryTs.get(walServerAddress); + if (backupBoundary >= walTimestamp) { if (LOG.isDebugEnabled()) { LOG.debug( - "Backup found for server: {}. Backup from {} is newer than file, so deleting: {}", - walServerAddress.getHostName(), lastBackupTs, path); + "WAL cleanup time-boundary found for server {}: {}. Ok to delete older file: {}", + walServerAddress.getHostName(), backupBoundary, path); } return true; } if (LOG.isDebugEnabled()) { - LOG.debug( - "Backup found for server: {}. Backup from {} is older than the file, so keeping: {}", - walServerAddress.getHostName(), lastBackupTs, path); + LOG.debug("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}", + walServerAddress.getHostName(), backupBoundary, path); } } catch (Exception ex) { LOG.warn("Error occurred while filtering file: {}. Ignoring cleanup of this log", path, ex); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 86aa0f8bd923..b9a76347440e 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -414,7 +414,7 @@ protected String backupTables(BackupType type, List tables, String pa try { conn = ConnectionFactory.createConnection(conf1); badmin = new BackupAdminImpl(conn); - BackupRequest request = createBackupRequest(type, tables, path); + BackupRequest request = createBackupRequest(type, new ArrayList<>(tables), path); backupId = badmin.backupTables(request); } finally { if (badmin != null) { diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java index e372c6ad1533..56bb25837810 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java @@ -17,13 +17,16 @@ */ package org.apache.hadoop.hbase.backup.master; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -32,7 +35,6 @@ import org.apache.hadoop.hbase.backup.TestBackupBase; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; @@ -44,9 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - @Category(LargeTests.class) public class TestBackupLogCleaner extends TestBackupBase { @@ -61,78 +60,152 @@ public class TestBackupLogCleaner extends TestBackupBase { @Test public void testBackupLogCleaner() throws Exception { + Path backupRoot1 = new Path(BACKUP_ROOT_DIR, "root1"); + Path backupRoot2 = new Path(BACKUP_ROOT_DIR, "root2"); - // #1 - create full backup for all tables - LOG.info("create full backup image for all tables"); - - List tableSetFullList = Lists.newArrayList(table1, table2, table3, table4); + List tableSetFull = List.of(table1, table2, table3, table4); + List tableSet14 = List.of(table1, table4); + List tableSet23 = List.of(table2, table3); try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { // Verify that we have no backup sessions yet assertFalse(systemTable.hasBackupSessions()); - List walFiles = getListOfWALFiles(TEST_UTIL.getConfiguration()); BackupLogCleaner cleaner = new BackupLogCleaner(); cleaner.setConf(TEST_UTIL.getConfiguration()); - Map params = new HashMap<>(); - params.put(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster()); - cleaner.init(params); - cleaner.setConf(TEST_UTIL.getConfiguration()); - - Iterable deletable = cleaner.getDeletableFiles(walFiles); - int size = Iterables.size(deletable); - - // We can delete all files because we do not have yet recorded backup sessions - assertTrue(size == walFiles.size()); - - String backupIdFull = fullTableBackup(tableSetFullList); - assertTrue(checkSucceeded(backupIdFull)); - // Check one more time - deletable = cleaner.getDeletableFiles(walFiles); - // We can delete wal files because they were saved into backup system table table - size = Iterables.size(deletable); - assertTrue(size == walFiles.size()); - - List newWalFiles = getListOfWALFiles(TEST_UTIL.getConfiguration()); - LOG.debug("WAL list after full backup"); - - // New list of wal files is greater than the previous one, - // because new wal per RS have been opened after full backup - assertTrue(walFiles.size() < newWalFiles.size()); - Connection conn = ConnectionFactory.createConnection(conf1); - // #2 - insert some data to table - Table t1 = conn.getTable(table1); - Put p1; - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - p1 = new Put(Bytes.toBytes("row-t1" + i)); - p1.addColumn(famName, qualName, Bytes.toBytes("val" + i)); - t1.put(p1); + cleaner.init(Map.of(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster())); + + // All WAL files can be deleted because we do not have backups + List walFilesBeforeBackup = getListOfWALFiles(TEST_UTIL.getConfiguration()); + Iterable deletable = cleaner.getDeletableFiles(walFilesBeforeBackup); + assertEquals(walFilesBeforeBackup, deletable); + + // Create a FULL backup B1 in backupRoot R1, containing all tables + String backupIdB1 = backupTables(BackupType.FULL, tableSetFull, backupRoot1.toString()); + assertTrue(checkSucceeded(backupIdB1)); + + // As part of a backup, WALs are rolled, so we expect a new WAL file + Set walFilesAfterB1 = + mergeAsSet(walFilesBeforeBackup, getListOfWALFiles(TEST_UTIL.getConfiguration())); + assertTrue(walFilesBeforeBackup.size() < walFilesAfterB1.size()); + + // Currently, we only have backup B1, so we can delete any WAL preceding B1 + deletable = cleaner.getDeletableFiles(walFilesAfterB1); + assertEquals(toSet(walFilesBeforeBackup), toSet(deletable)); + + // Insert some data + Connection conn = TEST_UTIL.getConnection(); + try (Table t1 = conn.getTable(table1)) { + Put p1; + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p1 = new Put(Bytes.toBytes("row-t1" + i)); + p1.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + t1.put(p1); + } } - t1.close(); - - Table t2 = conn.getTable(table2); - Put p2; - for (int i = 0; i < 5; i++) { - p2 = new Put(Bytes.toBytes("row-t2" + i)); - p2.addColumn(famName, qualName, Bytes.toBytes("val" + i)); - t2.put(p2); + try (Table t2 = conn.getTable(table2)) { + Put p2; + for (int i = 0; i < 5; i++) { + p2 = new Put(Bytes.toBytes("row-t2" + i)); + p2.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + t2.put(p2); + } } - t2.close(); - - // #3 - incremental backup for multiple tables - - List tableSetIncList = Lists.newArrayList(table1, table2, table3); - String backupIdIncMultiple = - backupTables(BackupType.INCREMENTAL, tableSetIncList, BACKUP_ROOT_DIR); - assertTrue(checkSucceeded(backupIdIncMultiple)); - deletable = cleaner.getDeletableFiles(newWalFiles); + // Create an INCREMENTAL backup B2 in backupRoot R1, requesting tables 1 & 4. + // Note that incremental tables always include all tables already included in the backup root, + // i.e. the backup will contain all tables (1, 2, 3, 4), ignoring what we specify here. + LOG.debug("Creating B2"); + String backupIdB2 = backupTables(BackupType.INCREMENTAL, tableSet14, backupRoot1.toString()); + assertTrue(checkSucceeded(backupIdB2)); + + // As part of a backup, WALs are rolled, so we expect a new WAL file + Set walFilesAfterB2 = + mergeAsSet(walFilesAfterB1, getListOfWALFiles(TEST_UTIL.getConfiguration())); + assertTrue(walFilesAfterB1.size() < walFilesAfterB2.size()); + + // At this point, we have backups in root R1: B1 and B2. + // We only consider the most recent backup (B2) to determine which WALs can be deleted: + // all WALs preceding B2 + deletable = cleaner.getDeletableFiles(walFilesAfterB2); + assertEquals(toSet(walFilesAfterB1), toSet(deletable)); + + // Create a FULL backup B3 in backupRoot R2, containing tables 1 & 4 + LOG.debug("Creating B3"); + String backupIdB3 = backupTables(BackupType.FULL, tableSetFull, backupRoot2.toString()); + assertTrue(checkSucceeded(backupIdB3)); + + // As part of a backup, WALs are rolled, so we expect a new WAL file + Set walFilesAfterB3 = + mergeAsSet(walFilesAfterB2, getListOfWALFiles(TEST_UTIL.getConfiguration())); + assertTrue(walFilesAfterB2.size() < walFilesAfterB3.size()); + + // At this point, we have backups in: + // root R1: B1 (timestamp=0, all tables), B2 (TS=1, all tables) + // root R2: B3 (TS=2, [T1, T4]) + // + // To determine the WAL-deletion boundary, we only consider the most recent backup per root, + // so [B2, B3]. From these, we take the least recent as WAL-deletion boundary: B2, it contains + // all tables, so acts as the deletion boundary. I.e. only WALs preceding B2 are deletable. + deletable = cleaner.getDeletableFiles(walFilesAfterB3); + assertEquals(toSet(walFilesAfterB1), toSet(deletable)); + + // Create a FULL backup B4 in backupRoot R1, with a subset of tables + LOG.debug("Creating B4"); + String backupIdB4 = backupTables(BackupType.FULL, tableSet14, backupRoot1.toString()); + assertTrue(checkSucceeded(backupIdB4)); + + // As part of a backup, WALs are rolled, so we expect a new WAL file + Set walFilesAfterB4 = + mergeAsSet(walFilesAfterB3, getListOfWALFiles(TEST_UTIL.getConfiguration())); + assertTrue(walFilesAfterB3.size() < walFilesAfterB4.size()); + + // At this point, we have backups in: + // root R1: B1 (timestamp=0, all tables), B2 (TS=1, all tables), B4 (TS=3, [T1, T4]) + // root R2: B3 (TS=2, [T1, T4]) + // + // To determine the WAL-deletion boundary, we only consider the most recent backup per root, + // so [B4, B3]. They contain the following timestamp boundaries per table: + // B4: { T1: 3, T2: 1, T3: 1, T4: 3 } + // B3: { T1: 2, T4: 2 } + // Taking the minimum timestamp (= 1), this means all WALs preceding B2 can be deleted. + deletable = cleaner.getDeletableFiles(walFilesAfterB4); + assertEquals(toSet(walFilesAfterB1), toSet(deletable)); + + // Create a FULL backup B5 in backupRoot R1, for tables 2 & 3 + String backupIdB5 = backupTables(BackupType.FULL, tableSet23, backupRoot1.toString()); + assertTrue(checkSucceeded(backupIdB5)); + + // As part of a backup, WALs are rolled, so we expect a new WAL file + Set walFilesAfterB5 = + mergeAsSet(walFilesAfterB4, getListOfWALFiles(TEST_UTIL.getConfiguration())); + assertTrue(walFilesAfterB4.size() < walFilesAfterB5.size()); + + // At this point, we have backups in: + // root R1: ..., B2 (TS=1, all tables), B4 (TS=3, [T1, T4]), B5 (TS=4, [T2, T3]) + // root R2: B3 (TS=2, [T1, T4]) + // + // To determine the WAL-deletion boundary, we only consider the most recent backup per root, + // so [B5, B3]. They contain the following timestamp boundaries per table: + // B4: { T1: 3, T2: 4, T3: 4, T4: 3 } + // B3: { T1: 2, T4: 2 } + // Taking the minimum timestamp (= 2), this means all WALs preceding B3 can be deleted. + deletable = cleaner.getDeletableFiles(walFilesAfterB5); + assertEquals(toSet(walFilesAfterB2), toSet(deletable)); + } + } - assertTrue(Iterables.size(deletable) == newWalFiles.size()); + private Set mergeAsSet(Collection toCopy, Collection toAdd) { + Set result = new LinkedHashSet<>(toCopy); + result.addAll(toAdd); + return result; + } - conn.close(); - } + private Set toSet(Iterable iterable) { + Set result = new LinkedHashSet<>(); + iterable.forEach(result::add); + return result; } @Test