diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index 5b49496d626a..2b26aba03fe6 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.backup.impl; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY; @@ -190,6 +191,15 @@ private void handleContinuousBackup(Admin admin) throws IOException { // set overall backup status: complete. Here we make sure to complete the backup. // After this checkpoint, even if entering cancel process, will let the backup finished backupInfo.setState(BackupState.COMPLETE); + + if (!conf.getBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, false)) { + System.out.println("WARNING: Bulkload replication is not enabled. " + + "Since continuous backup is using HBase replication, bulk loaded files won't be backed up as part of continuous backup. " + + "To ensure bulk-loaded files are backed up, enable bulkload replication " + + "(hbase.replication.bulkload.enabled=true) and configure a unique cluster ID using " + + "hbase.replication.cluster.id. This cluster ID is required by the replication framework " + + "to uniquely identify clusters, even if continuous backup itself does not directly rely on it."); + } } private void handleNonContinuousBackup(Admin admin) throws IOException { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java index 9d1d818c207e..225d32172766 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java @@ -26,18 +26,20 @@ import org.slf4j.LoggerFactory; /** - * Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) files within - * the specified backup root directory. + * Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) and + * bulk-loaded files within the specified backup root directory. */ @InterfaceAudience.Private public class BackupFileSystemManager { private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class); public static final String WALS_DIR = "WALs"; + public static final String BULKLOAD_FILES_DIR = "bulk-load-files"; private final String peerId; private final FileSystem backupFs; private final Path backupRootDir; private final Path walsDir; + private final Path bulkLoadFilesDir; public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr) throws IOException { @@ -45,6 +47,7 @@ public BackupFileSystemManager(String peerId, Configuration conf, String backupR this.backupRootDir = new Path(backupRootDirStr); this.backupFs = FileSystem.get(backupRootDir.toUri(), conf); this.walsDir = createDirectory(WALS_DIR); + this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR); } private Path createDirectory(String dirName) throws IOException { @@ -58,6 +61,10 @@ public Path getWalsDir() { return walsDir; } + public Path getBulkLoadFilesDir() { + return bulkLoadFilesDir; + } + public FileSystem getBackupFs() { return backupFs; } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java new file mode 100644 index 000000000000..6e1271313bcd --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +/** + * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase replication. + *

+ * This utility class extracts and constructs the file paths of bulk-loaded files based on WAL + * entries. It processes bulk load descriptors and their associated store descriptors to generate + * the paths for each bulk-loaded file. + *

+ * The class is designed for scenarios where replicable bulk load operations need to be parsed and + * their file paths need to be determined programmatically. + *

+ */ +@InterfaceAudience.Private +public final class BulkLoadProcessor { + private BulkLoadProcessor() { + } + + public static List processBulkLoadFiles(List walEntries) throws IOException { + List bulkLoadFilePaths = new ArrayList<>(); + + for (WAL.Entry entry : walEntries) { + WALEdit edit = entry.getEdit(); + for (Cell cell : edit.getCells()) { + if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + TableName tableName = entry.getKey().getTableName(); + String namespace = tableName.getNamespaceAsString(); + String table = tableName.getQualifierAsString(); + bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, table)); + } + } + } + return bulkLoadFilePaths; + } + + private static List processBulkLoadDescriptor(Cell cell, String namespace, String table) + throws IOException { + List bulkLoadFilePaths = new ArrayList<>(); + WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + + if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == null) { + return bulkLoadFilePaths; // Skip if not replicable + } + + String regionName = bld.getEncodedRegionName().toStringUtf8(); + for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) { + bulkLoadFilePaths + .addAll(processStoreDescriptor(storeDescriptor, namespace, table, regionName)); + } + + return bulkLoadFilePaths; + } + + private static List processStoreDescriptor(WALProtos.StoreDescriptor storeDescriptor, + String namespace, String table, String regionName) { + List paths = new ArrayList<>(); + String columnFamily = storeDescriptor.getFamilyName().toStringUtf8(); + + for (String storeFile : storeDescriptor.getStoreFileList()) { + paths.add(new Path(namespace, + new Path(table, new Path(regionName, new Path(columnFamily, storeFile))))); + } + + return paths; + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadUploadException.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadUploadException.java new file mode 100644 index 000000000000..91a46c77e319 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadUploadException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import java.io.IOException; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class BulkLoadUploadException extends IOException { + public BulkLoadUploadException(String message) { + super(message); + } + + public BulkLoadUploadException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index 2442e0789a8d..69c445c484d8 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.backup.replication; +import com.google.errorprone.annotations.RestrictedApi; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; import java.text.SimpleDateFormat; @@ -32,9 +34,12 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -44,6 +49,7 @@ import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -54,8 +60,8 @@ /** * ContinuousBackupReplicationEndpoint is responsible for replicating WAL entries to a backup * storage. It organizes WAL entries by day and periodically flushes the data, ensuring that WAL - * files do not exceed the configured size. The class includes mechanisms for handling the WAL files - * and ensuring that the replication process is safe. + * files do not exceed the configured size. The class includes mechanisms for handling the WAL + * files, performing bulk load backups, and ensuring that the replication process is safe. */ @InterfaceAudience.Private public class ContinuousBackupReplicationEndpoint extends BaseReplicationEndpoint { @@ -302,6 +308,7 @@ private void backupWalEntries(long day, List walEntries) throws IOExc for (WAL.Entry entry : walEntries) { walWriter.append(entry); } + walWriter.sync(true); } catch (UncheckedIOException e) { String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day; @@ -309,6 +316,17 @@ private void backupWalEntries(long day, List walEntries) throws IOExc e.getMessage(), e); throw new IOException(errorMsg, e); } + + List bulkLoadFiles = BulkLoadProcessor.processBulkLoadFiles(walEntries); + + if (LOG.isTraceEnabled()) { + LOG.trace("{} Processed {} bulk load files for WAL entries", Utils.logPeerId(peerId), + bulkLoadFiles.size()); + LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId), + bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); + } + + uploadBulkLoadFiles(day, bulkLoadFiles); } private FSHLogProvider.Writer createWalWriter(long dayInMillis) { @@ -372,15 +390,159 @@ private void close() { } } + @RestrictedApi( + explanation = "Package-private for test visibility only. Do not use outside tests.", + link = "", + allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java)") + void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) + throws BulkLoadUploadException { + if (bulkLoadFiles.isEmpty()) { + LOG.debug("{} No bulk load files to upload for {}", Utils.logPeerId(peerId), dayInMillis); + return; + } + + LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId), + bulkLoadFiles.size()); + + if (LOG.isTraceEnabled()) { + LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId), + bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); + } + String dayDirectoryName = formatToDateString(dayInMillis); + Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName); + try { + backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir); + } catch (IOException e) { + throw new BulkLoadUploadException( + String.format("%s Failed to create bulkload directory in backupFS: %s", + Utils.logPeerId(peerId), bulkloadDir), + e); + } + + for (Path file : bulkLoadFiles) { + Path sourcePath; + try { + sourcePath = getBulkLoadFileStagingPath(file); + } catch (FileNotFoundException fnfe) { + throw new BulkLoadUploadException( + String.format("%s Bulk load file not found: %s", Utils.logPeerId(peerId), file), fnfe); + } catch (IOException ioe) { + throw new BulkLoadUploadException( + String.format("%s Failed to resolve source path for: %s", Utils.logPeerId(peerId), file), + ioe); + } + + Path destPath = new Path(bulkloadDir, file); + + try { + LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath, + destPath); + + copyWithCleanup(CommonFSUtils.getRootDirFileSystem(conf), sourcePath, + backupFileSystemManager.getBackupFs(), destPath, conf); + + LOG.info("{} Bulk load file {} successfully backed up to {}", Utils.logPeerId(peerId), file, + destPath); + } catch (IOException e) { + throw new BulkLoadUploadException( + String.format("%s Failed to copy bulk load file %s to %s on day %s", + Utils.logPeerId(peerId), file, destPath, formatToDateString(dayInMillis)), + e); + } + } + + LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId)); + } + + /** + * Copy a file with cleanup logic in case of failure. Always overwrite destination to avoid + * leaving corrupt partial files. + */ + @RestrictedApi( + explanation = "Package-private for test visibility only. Do not use outside tests.", + link = "", + allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java)") + static void copyWithCleanup(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, + Configuration conf) throws IOException { + try { + if (dstFS.exists(dst)) { + FileStatus srcStatus = srcFS.getFileStatus(src); + FileStatus dstStatus = dstFS.getFileStatus(dst); + + if (srcStatus.getLen() == dstStatus.getLen()) { + LOG.info("Destination file {} already exists with same length ({}). Skipping copy.", dst, + dstStatus.getLen()); + return; // Skip upload + } else { + LOG.warn( + "Destination file {} exists but length differs (src={}, dst={}). " + "Overwriting now.", + dst, srcStatus.getLen(), dstStatus.getLen()); + } + } + + // Always overwrite in case previous copy left partial data + FileUtil.copy(srcFS, src, dstFS, dst, false, true, conf); + } catch (IOException e) { + try { + if (dstFS.exists(dst)) { + dstFS.delete(dst, true); + LOG.warn("Deleted partial/corrupt destination file {} after copy failure", dst); + } + } catch (IOException cleanupEx) { + LOG.warn("Failed to cleanup destination file {} after copy failure", dst, cleanupEx); + } + throw e; + } + } + /** * Convert dayInMillis to "yyyy-MM-dd" format */ - private String formatToDateString(long dayInMillis) { + @RestrictedApi( + explanation = "Package-private for test visibility only. Do not use outside tests.", + link = "", + allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java)") + String formatToDateString(long dayInMillis) { SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); return dateFormat.format(new Date(dayInMillis)); } + private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException { + FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); + Path rootDir = CommonFSUtils.getRootDir(conf); + Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR); + Path baseNamespaceDir = new Path(rootDir, baseNSDir); + Path hFileArchiveDir = + new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir)); + + LOG.debug("{} Searching for bulk load file: {} in paths: {}, {}", Utils.logPeerId(peerId), + relativePathFromNamespace, baseNamespaceDir, hFileArchiveDir); + + Path result = + findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, relativePathFromNamespace); + LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId), result); + return result; + } + + private static Path findExistingPath(FileSystem rootFs, Path baseNamespaceDir, + Path hFileArchiveDir, Path filePath) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Checking for bulk load file at: {} and {}", new Path(baseNamespaceDir, filePath), + new Path(hFileArchiveDir, filePath)); + } + + for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath), + new Path(hFileArchiveDir, filePath) }) { + if (rootFs.exists(candidate)) { + return candidate; + } + } + + throw new FileNotFoundException("Bulk load file not found at either: " + + new Path(baseNamespaceDir, filePath) + " or " + new Path(hFileArchiveDir, filePath)); + } + private void shutdownFlushExecutor() { if (flushExecutor != null) { LOG.info("{} Initiating WAL flush executor shutdown.", Utils.logPeerId(peerId)); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java new file mode 100644 index 000000000000..9837f9e926d2 --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +/** + * Unit tests for {@link BulkLoadProcessor}. + *

+ * These tests validate the extraction of bulk-loaded file paths from WAL entries under different + * scenarios, including: + *

    + *
  • Valid replicable bulk load entries
  • + *
  • Non-replicable bulk load entries
  • + *
  • Entries with no bulk load qualifier
  • + *
  • Entries containing multiple column families
  • + *
+ */ +@Category({ SmallTests.class }) +public class TestBulkLoadProcessor { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBulkLoadProcessor.class); + + /** + * Creates a WAL.Entry containing a {@link WALProtos.BulkLoadDescriptor} with the given + * parameters. + * @param tableName The table name + * @param regionName The encoded region name + * @param replicate Whether the bulk load is marked for replication + * @param family Column family name + * @param storeFiles One or more store file names to include + * @return A WAL.Entry representing the bulk load event + */ + private WAL.Entry createBulkLoadWalEntry(TableName tableName, String regionName, + boolean replicate, String family, String... storeFiles) { + + // Build StoreDescriptor + WALProtos.StoreDescriptor.Builder storeDescBuilder = + WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family)) + .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles)); + + // Build BulkLoadDescriptor + WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = WALProtos.BulkLoadDescriptor.newBuilder() + .setReplicate(replicate).setEncodedRegionName(ByteString.copyFromUtf8(regionName)) + .setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000) // Random + .addStores(storeDescBuilder); + + byte[] value = bulkDescBuilder.build().toByteArray(); + + // Build Cell with BULK_LOAD qualifier + Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put) + .setRow(new byte[] { 1 }).setFamily(METAFAMILY).setQualifier(WALEdit.BULK_LOAD) + .setValue(value).build(); + + WALEdit edit = new WALEdit(); + edit.add(cell); + + WALKeyImpl key = new WALKeyImpl(Bytes.toBytes(regionName), // region + tableName, 0L, 0L, null); + + return new WAL.Entry(key, edit); + } + + /** + * Verifies that a valid replicable bulk load WAL entry produces the correct number and structure + * of file paths. + */ + @Test + public void testProcessBulkLoadFiles_validEntry() throws IOException { + WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", true, + "cf1", "file1", "file2"); + + List paths = BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry)); + + assertEquals(2, paths.size()); + assertTrue(paths.get(0).toString().contains("ns/tbl/region123/cf1/file1")); + assertTrue(paths.get(1).toString().contains("ns/tbl/region123/cf1/file2")); + } + + /** + * Verifies that a non-replicable bulk load entry is ignored. + */ + @Test + public void testProcessBulkLoadFiles_nonReplicableSkipped() throws IOException { + WAL.Entry entry = + createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", false, "cf1", "file1"); + + List paths = BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry)); + + assertTrue(paths.isEmpty()); + } + + /** + * Verifies that entries without the BULK_LOAD qualifier are ignored. + */ + @Test + public void testProcessBulkLoadFiles_noBulkLoadQualifier() throws IOException { + WALEdit edit = new WALEdit(); + WALKeyImpl key = new WALKeyImpl(new byte[] {}, TableName.valueOf("ns", "tbl"), 0L, 0L, null); + WAL.Entry entry = new WAL.Entry(key, edit); + + List paths = BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry)); + + assertTrue(paths.isEmpty()); + } + + /** + * Verifies that multiple WAL entries with different column families produce the correct set of + * file paths. + */ + @Test + public void testProcessBulkLoadFiles_multipleFamilies() throws IOException { + WAL.Entry entry = + createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", true, "cf1", "file1"); + WAL.Entry entry2 = + createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", true, "cf2", "fileA"); + + List paths = BulkLoadProcessor.processBulkLoadFiles(Arrays.asList(entry, entry2)); + + assertEquals(2, paths.size()); + assertTrue(paths.stream().anyMatch(p -> p.toString().contains("cf1/file1"))); + assertTrue(paths.stream().anyMatch(p -> p.toString().contains("cf2/fileA"))); + } +} diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java index 3919746d3b7b..8f8e83dbda6b 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.backup.replication; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY; import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; @@ -27,11 +29,18 @@ import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.copyWithCleanup; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.text.SimpleDateFormat; @@ -46,8 +55,10 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -64,8 +75,11 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; +import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; @@ -73,6 +87,7 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.MockedStatic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,12 +106,14 @@ public class TestContinuousBackupReplicationEndpoint { private final String replicationEndpoint = ContinuousBackupReplicationEndpoint.class.getName(); private static final String CF_NAME = "cf"; + private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier"); static FileSystem fs = null; static Path root; @BeforeClass public static void setUpBeforeClass() throws Exception { // Set the configuration properties as required + conf.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true); conf.set(REPLICATION_CLUSTER_ID, "clusterId1"); TEST_UTIL.startMiniZKCluster(); @@ -115,7 +132,7 @@ public static void tearDownAfterClass() throws Exception { } @Test - public void testWALBackup() throws IOException { + public void testWALAndBulkLoadFileBackup() throws IOException { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); TableName tableName = TableName.valueOf("table_" + methodName); String peerId = "peerId"; @@ -133,10 +150,15 @@ public void testWALBackup() throws IOException { loadRandomData(tableName, 100); assertEquals(100, getRowCount(tableName)); + Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily"); + generateHFiles(dir); + bulkLoadHFiles(tableName, dir); + assertEquals(1100, getRowCount(tableName)); + waitForReplication(15000); deleteReplicationPeer(peerId); - verifyBackup(backupRootDir.toString(), Map.of(tableName, 100)); + verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100)); deleteTable(tableName); } @@ -184,7 +206,7 @@ public void testMultiTableWALBackup() throws IOException { waitForReplication(15000); deleteReplicationPeer(peerId); - verifyBackup(backupRootDir.toString(), Map.of(table1, 100, table2, 100, table3, 50)); + verifyBackup(backupRootDir.toString(), false, Map.of(table1, 100, table2, 100, table3, 50)); for (TableName table : List.of(table1, table2, table3)) { deleteTable(table); @@ -242,7 +264,7 @@ public void testWALBackupWithPeerRestart() throws IOException, InterruptedExcept waitForReplication(20000); deleteReplicationPeer(peerId); - verifyBackup(backupRootDir.toString(), Map.of(tableName, getRowCount(tableName))); + verifyBackup(backupRootDir.toString(), false, Map.of(tableName, getRowCount(tableName))); deleteTable(tableName); } @@ -289,7 +311,7 @@ public void testDayWiseWALBackup() throws IOException { waitForReplication(15000); deleteReplicationPeer(peerId); - verifyBackup(backupRootDir.toString(), Map.of(tableName, 200)); + verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 200)); // Verify that WALs are stored in two directories, one for each day Path walDir = new Path(backupRootDir, WALS_DIR); @@ -312,6 +334,204 @@ public void testDayWiseWALBackup() throws IOException { deleteTable(tableName); } + /** + * Simulates a one-time failure during bulk load file upload. This validates that the retry logic + * in the replication endpoint works as expected. + */ + @Test + public void testBulkLoadFileUploadRetry() throws IOException { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName tableName = TableName.valueOf("table_" + methodName); + String peerId = "peerId"; + + // Reset static failure flag before test + FailingOnceContinuousBackupReplicationEndpoint.reset(); + + createTable(tableName); + + Path backupRootDir = new Path(root, methodName); + fs.mkdirs(backupRootDir); + + Map> tableMap = new HashMap<>(); + tableMap.put(tableName, new ArrayList<>()); + + addReplicationPeer(peerId, backupRootDir, tableMap, + FailingOnceContinuousBackupReplicationEndpoint.class.getName()); + + loadRandomData(tableName, 100); + assertEquals(100, getRowCount(tableName)); + + Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily"); + generateHFiles(dir); + bulkLoadHFiles(tableName, dir); + assertEquals(1100, getRowCount(tableName)); + + // Replication: first attempt fails, second attempt succeeds + waitForReplication(15000); + deleteReplicationPeer(peerId); + + verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100)); + + deleteTable(tableName); + } + + /** + * Replication endpoint that fails only once on first upload attempt, then succeeds on retry. + */ + public static class FailingOnceContinuousBackupReplicationEndpoint + extends ContinuousBackupReplicationEndpoint { + + private static boolean failedOnce = false; + + @Override + protected void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) + throws BulkLoadUploadException { + if (!failedOnce) { + failedOnce = true; + throw new BulkLoadUploadException("Simulated upload failure on first attempt"); + } + super.uploadBulkLoadFiles(dayInMillis, bulkLoadFiles); + } + + /** Reset failure state for new tests */ + public static void reset() { + failedOnce = false; + } + } + + /** + * Unit test for verifying cleanup of partial files. Simulates a failure during + * {@link FileUtil#copy(FileSystem, Path, FileSystem, Path, boolean, boolean, Configuration)} and + * checks that the destination file is deleted. + */ + @Test + public void testCopyWithCleanupDeletesPartialFile() throws Exception { + FileSystem srcFS = mock(FileSystem.class); + FileSystem dstFS = mock(FileSystem.class); + Path src = new Path("/src/file"); + Path dst = new Path("/dst/file"); + Configuration conf = new Configuration(); + + FileStatus srcStatus = mock(FileStatus.class); + FileStatus dstStatus = mock(FileStatus.class); + + when(srcFS.getFileStatus(src)).thenReturn(srcStatus); + when(dstFS.getFileStatus(dst)).thenReturn(dstStatus); + + // lengths differ -> should attempt to overwrite and then cleanup + when(srcStatus.getLen()).thenReturn(200L); + when(dstStatus.getLen()).thenReturn(100L); + + // Simulate FileUtil.copy failing + try (MockedStatic mockedFileUtil = mockStatic(FileUtil.class)) { + mockedFileUtil.when( + () -> FileUtil.copy(eq(srcFS), eq(src), eq(dstFS), eq(dst), eq(false), eq(true), eq(conf))) + .thenThrow(new IOException("simulated copy failure")); + + // Pretend partial file exists in destination + when(dstFS.exists(dst)).thenReturn(true); + + // Run the method under test + assertThrows(IOException.class, () -> copyWithCleanup(srcFS, src, dstFS, dst, conf)); + + // Verify cleanup happened + verify(dstFS).delete(dst, true); + } + } + + /** + * Simulates a stale/partial file left behind after a failed bulk load. On retry, the stale file + * should be overwritten and replication succeeds. + */ + @Test + public void testBulkLoadFileUploadWithStaleFileRetry() throws Exception { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName tableName = TableName.valueOf("table_" + methodName); + String peerId = "peerId"; + + // Reset static failure flag before test + PartiallyUploadedBulkloadFileEndpoint.reset(); + + createTable(tableName); + + Path backupRootDir = new Path(root, methodName); + fs.mkdirs(backupRootDir); + conf.set(CONF_BACKUP_ROOT_DIR, backupRootDir.toString()); + + Map> tableMap = new HashMap<>(); + tableMap.put(tableName, new ArrayList<>()); + + addReplicationPeer(peerId, backupRootDir, tableMap, + PartiallyUploadedBulkloadFileEndpoint.class.getName()); + + loadRandomData(tableName, 100); + assertEquals(100, getRowCount(tableName)); + + Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily"); + generateHFiles(dir); + bulkLoadHFiles(tableName, dir); + assertEquals(1100, getRowCount(tableName)); + + // first attempt will fail leaving stale file, second attempt should overwrite and succeed + waitForReplication(15000); + deleteReplicationPeer(peerId); + + verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100)); + + deleteTable(tableName); + } + + /** + * Replication endpoint that simulates leaving a partial file behind on first attempt, then + * succeeds on second attempt by overwriting it. + */ + public static class PartiallyUploadedBulkloadFileEndpoint + extends ContinuousBackupReplicationEndpoint { + + private static boolean firstAttempt = true; + + @Override + protected void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) + throws BulkLoadUploadException { + if (firstAttempt) { + firstAttempt = false; + try { + // Construct destination path and create a partial file + String dayDirectoryName = formatToDateString(dayInMillis); + BackupFileSystemManager backupFileSystemManager = + new BackupFileSystemManager("peer1", conf, conf.get(CONF_BACKUP_ROOT_DIR)); + Path bulkloadDir = + new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName); + + FileSystem dstFs = backupFileSystemManager.getBackupFs(); + if (!dstFs.exists(bulkloadDir)) { + dstFs.mkdirs(bulkloadDir); + } + + for (Path file : bulkLoadFiles) { + Path destPath = new Path(bulkloadDir, file); + try (FSDataOutputStream out = dstFs.create(destPath, true)) { + out.writeBytes("partial-data"); // simulate incomplete upload + } + } + } catch (IOException e) { + throw new BulkLoadUploadException("Simulated failure while creating partial file", e); + } + + // Fail after leaving partial files behind + throw new BulkLoadUploadException("Simulated upload failure on first attempt"); + } + + // Retry succeeds, overwriting stale files + super.uploadBulkLoadFiles(dayInMillis, bulkLoadFiles); + } + + /** Reset for new tests */ + public static void reset() { + firstAttempt = true; + } + } + private void createTable(TableName tableName) throws IOException { ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF_NAME)).setScope(1).build(); @@ -332,6 +552,12 @@ private void deleteTable(TableName tableName) throws IOException { private void addReplicationPeer(String peerId, Path backupRootDir, Map> tableMap) throws IOException { + addReplicationPeer(peerId, backupRootDir, tableMap, replicationEndpoint); + } + + private void addReplicationPeer(String peerId, Path backupRootDir, + Map> tableMap, String customReplicationEndpointImpl) + throws IOException { Map additionalArgs = new HashMap<>(); additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString()); additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString()); @@ -340,7 +566,7 @@ private void addReplicationPeer(String peerId, Path backupRootDir, additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() - .setReplicationEndpointImpl(replicationEndpoint).setReplicateAllUserTables(false) + .setReplicationEndpointImpl(customReplicationEndpointImpl).setReplicateAllUserTables(false) .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build(); admin.addReplicationPeer(peerId, peerConfig); @@ -358,6 +584,42 @@ private void loadRandomData(TableName tableName, int totalRows) throws IOExcepti } } + private void bulkLoadHFiles(TableName tableName, Path inputDir) throws IOException { + TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); + + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + BulkLoadHFiles loader = new BulkLoadHFilesTool(TEST_UTIL.getConfiguration()); + loader.bulkLoad(table.getName(), inputDir); + } finally { + TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); + } + } + + private void bulkLoadHFiles(TableName tableName, Map> family2Files) + throws IOException { + TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); + + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + BulkLoadHFiles loader = new BulkLoadHFilesTool(TEST_UTIL.getConfiguration()); + loader.bulkLoad(table.getName(), family2Files); + } finally { + TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); + } + } + + private void generateHFiles(Path outputDir) throws IOException { + String hFileName = "MyHFile"; + int numRows = 1000; + outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + + byte[] from = Bytes.toBytes(CF_NAME + "begin"); + byte[] to = Bytes.toBytes(CF_NAME + "end"); + + Path familyDir = new Path(outputDir, CF_NAME); + HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, new Path(familyDir, hFileName), + Bytes.toBytes(CF_NAME), QUALIFIER, from, to, numRows); + } + private void waitForReplication(int durationInMillis) { LOG.info("Waiting for replication to complete for {} ms", durationInMillis); try { @@ -370,12 +632,17 @@ private void waitForReplication(int durationInMillis) { /** * Verifies the backup process by: 1. Checking whether any WAL (Write-Ahead Log) files were - * generated in the backup directory. 2. Replaying the WAL files to restore data and check - * consistency by verifying that the restored data matches the expected row count for each table. + * generated in the backup directory. 2. Checking whether any bulk-loaded files were generated in + * the backup directory. 3. Replaying the WAL and bulk-loaded files (if present) to restore data + * and check consistency by verifying that the restored data matches the expected row count for + * each table. */ - private void verifyBackup(String backupRootDir, Map tablesWithExpectedRows) - throws IOException { + private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles, + Map tablesWithExpectedRows) throws IOException { verifyWALBackup(backupRootDir); + if (hasBulkLoadFiles) { + verifyBulkLoadBackup(backupRootDir); + } for (Map.Entry entry : tablesWithExpectedRows.entrySet()) { TableName tableName = entry.getKey(); @@ -387,6 +654,21 @@ private void verifyBackup(String backupRootDir, Map tablesWi replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName); + // replay Bulk loaded HFiles if Present + try { + Path bulkloadDir = new Path(backupRootDir, BULKLOAD_FILES_DIR); + if (fs.exists(bulkloadDir)) { + FileStatus[] directories = fs.listStatus(bulkloadDir); + for (FileStatus dirStatus : directories) { + if (dirStatus.isDirectory()) { + replayBulkLoadHFilesIfPresent(dirStatus.getPath().toString(), tableName); + } + } + } + } catch (Exception e) { + fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage()); + } + assertEquals(expectedRows, getRowCount(tableName)); } } @@ -412,6 +694,15 @@ private void verifyWALBackup(String backupRootDir) throws IOException { assertFalse("Expected some WAL files but found none!", walFiles.isEmpty()); } + private void verifyBulkLoadBackup(String backupRootDir) throws IOException { + Path bulkLoadFilesDir = new Path(backupRootDir, BULKLOAD_FILES_DIR); + assertTrue("BulkLoad Files directory does not exist!", fs.exists(bulkLoadFilesDir)); + + FileStatus[] bulkLoadFiles = fs.listStatus(bulkLoadFilesDir); + assertNotNull("No Bulk load files found!", bulkLoadFiles); + assertTrue("Expected some Bulk load files but found none!", bulkLoadFiles.length > 0); + } + private void replayWALs(String walDir, TableName tableName) { WALPlayer player = new WALPlayer(); try { @@ -422,6 +713,28 @@ private void replayWALs(String walDir, TableName tableName) { } } + private void replayBulkLoadHFilesIfPresent(String bulkLoadDir, TableName tableName) { + try { + Path tableBulkLoadDir = new Path(bulkLoadDir + "/default/" + tableName); + if (fs.exists(tableBulkLoadDir)) { + RemoteIterator fileStatusIterator = fs.listFiles(tableBulkLoadDir, true); + List bulkLoadFiles = new ArrayList<>(); + + while (fileStatusIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusIterator.next(); + Path filePath = fileStatus.getPath(); + + if (!fileStatus.isDirectory()) { + bulkLoadFiles.add(filePath); + } + } + bulkLoadHFiles(tableName, Map.of(Bytes.toBytes(CF_NAME), bulkLoadFiles)); + } + } catch (Exception e) { + fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage()); + } + } + private int getRowCount(TableName tableName) throws IOException { try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { return HBaseTestingUtil.countRows(table);