Skip to content

Commit

Permalink
Process review comments, bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
DieterDP-ng committed Sep 24, 2024
1 parent 7d76842 commit a31a19c
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backu
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Cleaning WALs if they are older than the newest backups (for all roots). "
"Cleaning WALs if they are older than the WAL cleanup time-boundary. "
+ "Checking WALs against {} backups: {}",
backups.size(),
backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", ")));
Expand All @@ -105,12 +105,21 @@ private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backu
}
}

// This map tracks, for every address, the least recent (= oldest / lowest timestamp) inclusion
// in any backup. In other words, it is the timestamp boundary up to which all backups roots
// have included the WAL in their backup.
if (LOG.isDebugEnabled()) {
LOG.debug("WAL cleanup time-boundary using info from: {}. ",
newestBackupPerRootDir.entrySet().stream()
.map(e -> "Backup root " + e.getKey() + ": " + e.getValue().getBackupId()).sorted()
.collect(Collectors.joining(", ")));
}

// This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp)
// inclusion in any backup. In other words, it is the timestamp boundary up to which all backup
// roots have included the WAL in their backup.
Map<Address, Long> boundaries = new HashMap<>();
for (BackupInfo backupInfo : newestBackupPerRootDir.values()) {
for (TableName table : backupInfo.getTables()) {
// Iterate over all tables in the timestamp map, which contains all tables covered in the
// backup root, not just the tables included in that specific backup (which could be a subset)
for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) {
for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
.entrySet()) {
Address address = Address.fromString(entry.getKey());
Expand Down Expand Up @@ -209,7 +218,7 @@ protected static boolean canDeleteFile(Map<Address, Long> addressToBoundaryTs, P

if (!addressToBoundaryTs.containsKey(walServerAddress)) {
if (LOG.isDebugEnabled()) {
LOG.debug("No backup found for server: {}. Deleting file: {}",
LOG.debug("No cleanup WAL time-boundary found for server: {}. Ok to delete file: {}",
walServerAddress.getHostName(), path);
}
return true;
Expand All @@ -219,15 +228,14 @@ protected static boolean canDeleteFile(Map<Address, Long> addressToBoundaryTs, P
if (backupBoundary >= walTimestamp) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Backup found for server: {}. All backups from {} are newer than file, so deleting: {}",
"WAL cleanup time-boundary found for server {}: {}. Ok to delete older file: {}",
walServerAddress.getHostName(), backupBoundary, path);
}
return true;
}

if (LOG.isDebugEnabled()) {
LOG.debug(
"Backup found for server: {}. Backup from {} is older than the file, so keeping: {}",
LOG.debug("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}",
walServerAddress.getHostName(), backupBoundary, path);
}
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected String backupTables(BackupType type, List<TableName> tables, String pa
try {
conn = ConnectionFactory.createConnection(conf1);
badmin = new BackupAdminImpl(conn);
BackupRequest request = createBackupRequest(type, tables, path);
BackupRequest request = createBackupRequest(type, new ArrayList<>(tables), path);
backupId = badmin.backupTables(request);
} finally {
if (badmin != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -36,7 +35,6 @@
import org.apache.hadoop.hbase.backup.TestBackupBase;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
Expand All @@ -48,9 +46,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

@Category(LargeTests.class)
public class TestBackupLogCleaner extends TestBackupBase {

Expand All @@ -68,104 +63,151 @@ public void testBackupLogCleaner() throws Exception {
Path backupRoot1 = new Path(BACKUP_ROOT_DIR, "root1");
Path backupRoot2 = new Path(BACKUP_ROOT_DIR, "root2");

// Create full backup for all tables
LOG.info("create full backup image for all tables");

List<TableName> tableSetFullList = Lists.newArrayList(table1, table2, table3, table4);
List<TableName> tableSetFull = List.of(table1, table2, table3, table4);
List<TableName> tableSet14 = List.of(table1, table4);
List<TableName> tableSet23 = List.of(table2, table3);

try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
// Verify that we have no backup sessions yet
assertFalse(systemTable.hasBackupSessions());

List<FileStatus> walFilesBeforeBackup = getListOfWALFiles(TEST_UTIL.getConfiguration());
BackupLogCleaner cleaner = new BackupLogCleaner();
cleaner.setConf(TEST_UTIL.getConfiguration());
Map<String, Object> params = new HashMap<>();
params.put(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster());
cleaner.init(params);
cleaner.setConf(TEST_UTIL.getConfiguration());
cleaner.init(Map.of(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster()));

// We can delete all files because we do not have yet recorded backup sessions
// All WAL files can be deleted because we do not have backups
List<FileStatus> walFilesBeforeBackup = getListOfWALFiles(TEST_UTIL.getConfiguration());
Iterable<FileStatus> deletable = cleaner.getDeletableFiles(walFilesBeforeBackup);
int size = Iterables.size(deletable);
assertEquals(walFilesBeforeBackup.size(), size);
assertEquals(walFilesBeforeBackup, deletable);

// Create a FULL backup (backupRoot 1)
String backupIdFull = backupTables(BackupType.FULL, tableSetFullList, backupRoot1.toString());
assertTrue(checkSucceeded(backupIdFull));
// Create a FULL backup B1 in backupRoot R1, containing all tables
String backupIdB1 = backupTables(BackupType.FULL, tableSetFull, backupRoot1.toString());
assertTrue(checkSucceeded(backupIdB1));

// New list of WAL files is greater than the previous one,
// because new WAL per RS have been opened after full backup
Set<FileStatus> walFilesAfterFullBackup =
// As part of a backup, WALs are rolled, so we expect a new WAL file
Set<FileStatus> walFilesAfterB1 =
mergeAsSet(walFilesBeforeBackup, getListOfWALFiles(TEST_UTIL.getConfiguration()));
assertTrue(walFilesBeforeBackup.size() < walFilesAfterFullBackup.size());
assertTrue(walFilesBeforeBackup.size() < walFilesAfterB1.size());

// We can only delete the WALs preceding the FULL backup
deletable = cleaner.getDeletableFiles(walFilesAfterFullBackup);
size = Iterables.size(deletable);
assertEquals(walFilesBeforeBackup.size(), size);
// Currently, we only have backup B1, so we can delete any WAL preceding B1
deletable = cleaner.getDeletableFiles(walFilesAfterB1);
assertEquals(toSet(walFilesBeforeBackup), toSet(deletable));

// Insert some data
Connection conn = ConnectionFactory.createConnection(conf1);
Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
t1.put(p1);
Connection conn = TEST_UTIL.getConnection();
try (Table t1 = conn.getTable(table1)) {
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
t1.put(p1);
}
}
t1.close();

Table t2 = conn.getTable(table2);
Put p2;
for (int i = 0; i < 5; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
t2.put(p2);

try (Table t2 = conn.getTable(table2)) {
Put p2;
for (int i = 0; i < 5; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
t2.put(p2);
}
}
t2.close();

// Create an INCREMENTAL backup (backupRoot 1)
List<TableName> tableSetIncList = Lists.newArrayList(table1, table2, table3);
String backupIdIncMultiple =
backupTables(BackupType.INCREMENTAL, tableSetIncList, backupRoot1.toString());
assertTrue(checkSucceeded(backupIdIncMultiple));

// There should be more WALs due to the rolling of Region Servers
Set<FileStatus> walFilesAfterIncBackup =
mergeAsSet(walFilesAfterFullBackup, getListOfWALFiles(TEST_UTIL.getConfiguration()));
assertTrue(walFilesAfterFullBackup.size() < walFilesAfterIncBackup.size());

// We can only delete the WALs preceding the INCREMENTAL backup
deletable = cleaner.getDeletableFiles(walFilesAfterIncBackup);
size = Iterables.size(deletable);
assertEquals(walFilesAfterFullBackup.size(), size);

// Create a FULL backup (backupRoot 2)
String backupIdFull2 = backupTables(BackupType.FULL, tableSetIncList, backupRoot2.toString());
assertTrue(checkSucceeded(backupIdFull2));

// There should be more WALs due to the rolling of Region Servers
Set<FileStatus> walFilesAfterFullBackup2 =
mergeAsSet(walFilesAfterFullBackup, getListOfWALFiles(TEST_UTIL.getConfiguration()));
assertTrue(walFilesAfterIncBackup.size() < walFilesAfterFullBackup2.size());

// We created a backup in a different root, so the WAL dependencies of the first root did not
// change. I.e. the same files should be deletable as after the incremental backup.
deletable = cleaner.getDeletableFiles(walFilesAfterFullBackup2);
size = Iterables.size(deletable);
assertEquals(walFilesAfterFullBackup.size(), size);

conn.close();

// Create an INCREMENTAL backup B2 in backupRoot R1, requesting tables 1 & 4.
// Note that incremental tables always include all tables already included in the backup root,
// i.e. the backup will contain all tables (1, 2, 3, 4), ignoring what we specify here.
LOG.debug("Creating B2");
String backupIdB2 = backupTables(BackupType.INCREMENTAL, tableSet14, backupRoot1.toString());
assertTrue(checkSucceeded(backupIdB2));

// As part of a backup, WALs are rolled, so we expect a new WAL file
Set<FileStatus> walFilesAfterB2 =
mergeAsSet(walFilesAfterB1, getListOfWALFiles(TEST_UTIL.getConfiguration()));
assertTrue(walFilesAfterB1.size() < walFilesAfterB2.size());

// At this point, we have backups in root R1: B1 and B2.
// We only consider the most recent backup (B2) to determine which WALs can be deleted:
// all WALs preceding B2
deletable = cleaner.getDeletableFiles(walFilesAfterB2);
assertEquals(toSet(walFilesAfterB1), toSet(deletable));

// Create a FULL backup B3 in backupRoot R2, containing tables 1 & 4
LOG.debug("Creating B3");
String backupIdB3 = backupTables(BackupType.FULL, tableSetFull, backupRoot2.toString());
assertTrue(checkSucceeded(backupIdB3));

// As part of a backup, WALs are rolled, so we expect a new WAL file
Set<FileStatus> walFilesAfterB3 =
mergeAsSet(walFilesAfterB2, getListOfWALFiles(TEST_UTIL.getConfiguration()));
assertTrue(walFilesAfterB2.size() < walFilesAfterB3.size());

// At this point, we have backups in:
// root R1: B1 (timestamp=0, all tables), B2 (TS=1, all tables)
// root R2: B3 (TS=2, [T1, T4])
//
// To determine the WAL-deletion boundary, we only consider the most recent backup per root,
// so [B2, B3]. From these, we take the least recent as WAL-deletion boundary: B2, it contains
// all tables, so acts as the deletion boundary. I.e. only WALs preceding B2 are deletable.
deletable = cleaner.getDeletableFiles(walFilesAfterB3);
assertEquals(toSet(walFilesAfterB1), toSet(deletable));

// Create a FULL backup B4 in backupRoot R1, with a subset of tables
LOG.debug("Creating B4");
String backupIdB4 = backupTables(BackupType.FULL, tableSet14, backupRoot1.toString());
assertTrue(checkSucceeded(backupIdB4));

// As part of a backup, WALs are rolled, so we expect a new WAL file
Set<FileStatus> walFilesAfterB4 =
mergeAsSet(walFilesAfterB3, getListOfWALFiles(TEST_UTIL.getConfiguration()));
assertTrue(walFilesAfterB3.size() < walFilesAfterB4.size());

// At this point, we have backups in:
// root R1: B1 (timestamp=0, all tables), B2 (TS=1, all tables), B4 (TS=3, [T1, T4])
// root R2: B3 (TS=2, [T1, T4])
//
// To determine the WAL-deletion boundary, we only consider the most recent backup per root,
// so [B4, B3]. They contain the following timestamp boundaries per table:
// B4: { T1: 3, T2: 1, T3: 1, T4: 3 }
// B3: { T1: 2, T4: 2 }
// Taking the minimum timestamp (= 1), this means all WALs preceding B2 can be deleted.
deletable = cleaner.getDeletableFiles(walFilesAfterB4);
assertEquals(toSet(walFilesAfterB1), toSet(deletable));

// Create a FULL backup B5 in backupRoot R1, for tables 2 & 3
String backupIdB5 = backupTables(BackupType.FULL, tableSet23, backupRoot1.toString());
assertTrue(checkSucceeded(backupIdB5));

// As part of a backup, WALs are rolled, so we expect a new WAL file
Set<FileStatus> walFilesAfterB5 =
mergeAsSet(walFilesAfterB4, getListOfWALFiles(TEST_UTIL.getConfiguration()));
assertTrue(walFilesAfterB4.size() < walFilesAfterB5.size());

// At this point, we have backups in:
// root R1: ..., B2 (TS=1, all tables), B4 (TS=3, [T1, T4]), B5 (TS=4, [T2, T3])
// root R2: B3 (TS=2, [T1, T4])
//
// To determine the WAL-deletion boundary, we only consider the most recent backup per root,
// so [B5, B3]. They contain the following timestamp boundaries per table:
// B4: { T1: 3, T2: 4, T3: 4, T4: 3 }
// B3: { T1: 2, T4: 2 }
// Taking the minimum timestamp (= 2), this means all WALs preceding B3 can be deleted.
deletable = cleaner.getDeletableFiles(walFilesAfterB5);
assertEquals(toSet(walFilesAfterB2), toSet(deletable));
}
}

private Set<FileStatus> mergeAsSet(Collection<FileStatus> toCopy, Collection<FileStatus> toAdd) {
Set<FileStatus> result = new HashSet<>(toCopy);
Set<FileStatus> result = new LinkedHashSet<>(toCopy);
result.addAll(toAdd);
return result;
}

private <T> Set<T> toSet(Iterable<T> iterable) {
Set<T> result = new LinkedHashSet<>();
iterable.forEach(result::add);
return result;
}

@Test
public void testCleansUpHMasterWal() {
Path path = new Path("/hbase/MasterData/WALs/hmaster,60000,1718808578163");
Expand Down

0 comments on commit a31a19c

Please sign in to comment.