From 6510facb151aaa9790177ed2bd7a4b7cce6791f0 Mon Sep 17 00:00:00 2001 From: Thomas Sarens Date: Sat, 6 Apr 2024 19:22:17 +0200 Subject: [PATCH] HBASE-28483 backup merge fails on bulkloaded hfiles (#5795) Signed-off-by: Bryan Beaudreault --- ...estIncrementalBackupMergeWithBulkLoad.java | 250 ++++++++++++++++++ .../hbase/mapreduce/HFileInputFormat.java | 25 +- hbase-replication/pom.xml | 5 - 3 files changed, 266 insertions(+), 14 deletions(-) create mode 100644 hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithBulkLoad.java diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithBulkLoad.java new file mode 100644 index 000000000000..058413fa1d15 --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithBulkLoad.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE; +import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup; +import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup; +import static org.apache.hadoop.hbase.backup.BackupType.FULL; +import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testing.TestingHBaseCluster; +import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(MediumTests.class) +@RunWith(Parameterized.class) +public class TestIncrementalBackupMergeWithBulkLoad { + + private static final Logger LOG = + LoggerFactory.getLogger(TestIncrementalBackupMergeWithBulkLoad.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestIncrementalBackupMergeWithBulkLoad.class); + + @Parameterized.Parameters(name = "{index}: useBulkLoad={0}") + public static Iterable data() { + return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED; + } + + @Parameterized.Parameter(0) + public boolean useBulkLoad; + + private TableName sourceTable; + private TableName targetTable; + + private List allTables; + private static TestingHBaseCluster cluster; + private static final Path BACKUP_ROOT_DIR = new Path("backupIT"); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0"); + + @BeforeClass + public static void beforeClass() throws Exception { + Configuration conf = HBaseConfiguration.create(); + enableBackup(conf); + cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build()); + cluster.start(); + } + + @AfterClass + public static void afterClass() throws Exception { + cluster.stop(); + } + + @Before + public void setUp() throws Exception { + sourceTable = TableName.valueOf("table-" + useBulkLoad); + targetTable = TableName.valueOf("another-table-" + useBulkLoad); + allTables = Arrays.asList(sourceTable, targetTable); + createTable(sourceTable); + createTable(targetTable); + } + + @Test + public void testMergeContainingBulkloadedHfiles() throws Exception { + Instant timestamp = Instant.now(); + + String backupId = backup(FULL, allTables); + BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, COMPLETE); + assertTrue(backupInfo.getTables().contains(sourceTable)); + + // load some data + load(sourceTable, timestamp, "data"); + + String backupId1 = backup(INCREMENTAL, allTables); + backupInfo = verifyBackup(cluster.getConf(), backupId1, INCREMENTAL, COMPLETE); + assertTrue(backupInfo.getTables().contains(sourceTable)); + + String backupId2 = backup(INCREMENTAL, allTables); + backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE); + assertTrue(backupInfo.getTables().contains(sourceTable)); + + merge(new String[] { backupId1, backupId2 }); + backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, COMPLETE); + assertTrue(backupInfo.getTables().contains(sourceTable)); + validateDataEquals(sourceTable, "data"); + } + + private void createTable(TableName tableName) throws IOException { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); + try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); + Admin admin = connection.getAdmin()) { + admin.createTable(builder.build()); + } + } + + private void load(TableName tableName, Instant timestamp, String data) throws IOException { + if (useBulkLoad) { + hFileBulkLoad(tableName, timestamp, data); + } else { + putLoad(tableName, timestamp, data); + } + } + + private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException { + LOG.info("Writing new data to HBase using normal Puts: {}", data); + try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) { + Table table = connection.getTable(sourceTable); + List puts = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli()); + put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data)); + puts.add(put); + + if (i % 100 == 0) { + table.put(puts); + puts.clear(); + } + } + if (!puts.isEmpty()) { + table.put(puts); + } + connection.getAdmin().flush(tableName); + } + } + + private void hFileBulkLoad(TableName tableName, Instant timestamp, String data) + throws IOException { + FileSystem fs = FileSystem.get(cluster.getConf()); + LOG.info("Writing new data to HBase using BulkLoad: {}", data); + // HFiles require this strict directory structure to allow to load them + Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID()); + fs.mkdirs(hFileRootPath); + Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY)); + fs.mkdirs(hFileFamilyPath); + try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf()) + .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID())) + .withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes()) + .withColumnFamily(COLUMN_FAMILY).build()) + .create()) { + for (int i = 0; i < 10; i++) { + writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"), + timestamp.toEpochMilli(), Bytes.toBytes(data))); + } + } + Map result = + BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath); + assertFalse(result.isEmpty()); + } + + private String backup(BackupType backupType, List tables) throws IOException { + LOG.info("Creating the backup ..."); + + try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); + BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { + BackupRequest backupRequest = + new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString()) + .withTableList(new ArrayList<>(tables)).withBackupType(backupType).build(); + return backupAdmin.backupTables(backupRequest); + } + } + + private void merge(String[] backupIds) throws IOException { + LOG.info("Merging the backups ..."); + + try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); + BackupAdmin backupAdmin = new BackupAdminImpl(connection)) { + backupAdmin.mergeBackups(backupIds); + } + } + + private void validateDataEquals(TableName tableName, String expectedData) throws IOException { + try (Connection connection = ConnectionFactory.createConnection(cluster.getConf()); + Table table = connection.getTable(tableName)) { + Scan scan = new Scan(); + scan.readAllVersions(); + scan.setRaw(true); + scan.setBatch(100); + + for (Result sourceResult : table.getScanner(scan)) { + List sourceCells = sourceResult.listCells(); + for (Cell cell : sourceCells) { + assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength())); + } + } + } + } + +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java index 1fdcf4bcfd44..1bbbe513f738 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.OptionalLong; import org.apache.hadoop.conf.Configuration; @@ -152,19 +151,27 @@ protected List listStatus(JobContext job) throws IOException { List result = new ArrayList(); // Explode out directories that match the original FileInputFormat filters - // since HFiles are written to directories where the - // directory name is the column name + // Typically these are -level dirs, only requiring 1 level of recursion to + // get the -level dirs where the HFile are written, but in some cases + // -level dirs are provided requiring 2 levels of recursion. for (FileStatus status : super.listStatus(job)) { - if (status.isDirectory()) { - FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); - Collections.addAll(result, fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)); - } else { - result.add(status); - } + addFilesRecursively(job, status, result); } return result; } + private static void addFilesRecursively(JobContext job, FileStatus status, + List result) throws IOException { + if (status.isDirectory()) { + FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); + for (FileStatus fileStatus : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { + addFilesRecursively(job, fileStatus, result); + } + } else { + result.add(status); + } + } + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml index a4b6c38bc4b4..1857ff03a356 100644 --- a/hbase-replication/pom.xml +++ b/hbase-replication/pom.xml @@ -119,11 +119,6 @@ mockito-core test - - org.hamcrest - hamcrest-library - test - org.slf4j jcl-over-slf4j