From 7be58796ca7a015e15905240d99a4e9c1bd1e1af Mon Sep 17 00:00:00 2001 From: niuyulin Date: Tue, 6 Jul 2021 22:19:32 +0800 Subject: [PATCH] HBASE-26058 Add TableDescriptor attribute 'COMPACTION_OFFLOAD_ENABLED' (#3452) Signed-off-by: Duo Zhang --- .../hadoop/hbase/client/TableDescriptor.java | 8 ++++ .../hbase/client/TableDescriptorBuilder.java | 41 +++++++++++++++++++ .../hadoop/hbase/regionserver/HStore.java | 3 +- .../TestCompactionServer.java | 25 +++++++++++ hbase-shell/src/main/ruby/hbase/admin.rb | 1 + hbase-shell/src/test/ruby/hbase/admin_test.rb | 6 ++- 6 files changed, 82 insertions(+), 2 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index 1440c28787d0..8cc70844ebfe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -243,6 +243,14 @@ public interface TableDescriptor { */ boolean isCompactionEnabled(); + /** + * Check if the compaction offload enable flag of the table is true. If flag is true then + * compaction will be done with offload strategy(run on CompactionServer). Otherwise, with + * embedded strategy (run on RegionServer) + * @return true if table compaction offload enabled + */ + boolean isCompactionOffloadEnabled(); + /** * Check if the split enable flag of the table is true. If flag is false * then no region split will be done. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 1c1cbaa8723d..1afdac6e37b7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -90,6 +90,15 @@ public class TableDescriptorBuilder { private static final Bytes COMPACTION_ENABLED_KEY = new Bytes(Bytes.toBytes(COMPACTION_ENABLED)); + /** + * Used by HBase Shell interface to access this metadata + * attribute which denotes if the table is compaction offload enabled. + */ + @InterfaceAudience.Private + public static final String COMPACTION_OFFLOAD_ENABLED = "COMPACTION_OFFLOAD_ENABLED"; + private static final Bytes COMPACTION_OFFLOAD_ENABLED_KEY + = new Bytes(Bytes.toBytes(COMPACTION_OFFLOAD_ENABLED)); + /** * Used by HBase Shell interface to access this metadata * attribute which denotes if the table is split enabled. @@ -205,6 +214,11 @@ public class TableDescriptorBuilder { */ public static final boolean DEFAULT_COMPACTION_ENABLED = true; + /** + * Constant that denotes whether the table is compaction offload enabled by default + */ + public static final boolean DEFAULT_COMPACTION_OFFLOAD_ENABLED = false; + /** * Constant that denotes whether the table is split enabled by default */ @@ -445,6 +459,11 @@ public TableDescriptorBuilder setCompactionEnabled(final boolean isEnable) { return this; } + public TableDescriptorBuilder setCompactionOffloadEnabled(final boolean isEnable) { + desc.setCompactionOffloadEnabled(isEnable); + return this; + } + public TableDescriptorBuilder setSplitEnabled(final boolean isEnable) { desc.setSplitEnabled(isEnable); return this; @@ -803,6 +822,18 @@ public boolean isCompactionEnabled() { return getOrDefault(COMPACTION_ENABLED_KEY, Boolean::valueOf, DEFAULT_COMPACTION_ENABLED); } + /** + * Check if the compaction offload enable flag of the table is true. If flag is true then + * compaction will be done with offload strategy(run on CompactionServer). Otherwise, with + * embedded strategy (run on RegionServer) + * @return true if table compaction enabled + */ + @Override + public boolean isCompactionOffloadEnabled() { + return getOrDefault(COMPACTION_OFFLOAD_ENABLED_KEY, Boolean::valueOf, + DEFAULT_COMPACTION_OFFLOAD_ENABLED); + } + /** * Setting the table compaction enable flag. * @@ -813,6 +844,16 @@ public ModifyableTableDescriptor setCompactionEnabled(final boolean isEnable) { return setValue(COMPACTION_ENABLED_KEY, Boolean.toString(isEnable)); } + /** + * Setting the table compaction offload enable flag. + * + * @param isEnable True if enable compaction offload. + * @return the modifyable TD + */ + public ModifyableTableDescriptor setCompactionOffloadEnabled(final boolean isEnable) { + return setValue(COMPACTION_OFFLOAD_ENABLED_KEY, Boolean.toString(isEnable)); + } + /** * Check if the split enable flag of the table is true. If flag is false then no split will be * done. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7dc4c6e5d5a3..5cd005af1a60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1931,7 +1931,8 @@ public Optional requestCompaction(int priority, removeUnneededFiles(); if (region.getRegionServerServices() != null - && region.getRegionServerServices().isCompactionOffloadEnabled()) { + && region.getRegionServerServices().isCompactionOffloadEnabled() + && region.getTableDescriptor().isCompactionOffloadEnabled()) { if (!requestToCompactionManager(forceMajor, priority)) { // if request to cm error, do local compaction or retry return selectCompaction(priority, tracker, user, filesCompacting); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java index 13626409b671..dfc17e497501 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java @@ -268,4 +268,29 @@ public void testCompactionServerExpire() throws Exception { () -> initialNum == compactionOffloadManager.getOnlineServersList().size()); assertNull(compactionOffloadManager.getLoad(compactionServerName)); } + + @Test + public void testCompactionOffloadTableDescriptor() throws Exception { + CompactionOffloadManager compactionOffloadManager = MASTER.getCompactionOffloadManager(); + TEST_UTIL.waitFor(6000, () -> !compactionOffloadManager.getOnlineServers().isEmpty() + && null != compactionOffloadManager.getOnlineServers().get(COMPACTION_SERVER_NAME)); + + TableDescriptor htd = + TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME)) + .setCompactionOffloadEnabled(true).build(); + TEST_UTIL.getAdmin().modifyTable(htd); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME); + // invoke compact + TEST_UTIL.compact(TABLENAME, false); + TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0); + long requestCount = COMPACTION_SERVER.requestCount.sum(); + + htd = TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME)) + .setCompactionOffloadEnabled(false).build(); + TEST_UTIL.getAdmin().modifyTable(htd); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME); + // invoke compact + TEST_UTIL.compact(TABLENAME, false); + TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == requestCount); + } } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 152a70f770e3..ab0f7ef9e761 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -1488,6 +1488,7 @@ def update_tdb_from_arg(tdb, arg) tdb.setMaxFileSize(arg.delete(TableDescriptorBuilder::MAX_FILESIZE)) if arg.include?(TableDescriptorBuilder::MAX_FILESIZE) tdb.setReadOnly(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::READONLY))) if arg.include?(TableDescriptorBuilder::READONLY) tdb.setCompactionEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::COMPACTION_ENABLED))) if arg.include?(TableDescriptorBuilder::COMPACTION_ENABLED) + tdb.setCompactionOffloadEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::COMPACTION_OFFLOAD_ENABLED))) if arg.include?(TableDescriptorBuilder::COMPACTION_OFFLOAD_ENABLED) tdb.setSplitEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::SPLIT_ENABLED))) if arg.include?(TableDescriptorBuilder::SPLIT_ENABLED) tdb.setMergeEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::MERGE_ENABLED))) if arg.include?(TableDescriptorBuilder::MERGE_ENABLED) tdb.setNormalizationEnabled(JBoolean.valueOf(arg.delete(TableDescriptorBuilder::NORMALIZATION_ENABLED))) if arg.include?(TableDescriptorBuilder::NORMALIZATION_ENABLED) diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb index 683dd39f1486..09e659b9ff06 100644 --- a/hbase-shell/src/test/ruby/hbase/admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb @@ -465,12 +465,14 @@ def teardown REGION_MEMSTORE_REPLICATION => 'TRUE', SPLIT_POLICY => 'org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy', COMPACTION_ENABLED => 'false', + COMPACTION_OFFLOAD_ENABLED => 'true', SPLIT_ENABLED => 'false', MERGE_ENABLED => 'false') assert_equal(['a:', 'b:'], table(@create_test_name).get_all_columns.sort) assert_match(/12345678/, admin.describe(@create_test_name)) assert_match(/77/, admin.describe(@create_test_name)) assert_match(/'COMPACTION_ENABLED' => 'false'/, admin.describe(@create_test_name)) + assert_match(/'COMPACTION_OFFLOAD_ENABLED' => 'true'/, admin.describe(@create_test_name)) assert_match(/'SPLIT_ENABLED' => 'false'/, admin.describe(@create_test_name)) assert_match(/'MERGE_ENABLED' => 'false'/, admin.describe(@create_test_name)) assert_match(/'REGION_MEMSTORE_REPLICATION' => 'true'/, admin.describe(@create_test_name)) @@ -493,15 +495,17 @@ def teardown assert_equal(['a:', 'b:'], table(@create_test_name).get_all_columns.sort) end - define_test "create should work when attributes value 'false' is not enclosed in single quotation marks" do + define_test "create should work when attributes value 'false' or 'true' is not enclosed in single quotation marks" do drop_test_table(@create_test_name) command(:create, @create_test_name, {NAME => 'a', BLOCKCACHE => false}, COMPACTION_ENABLED => false, + COMPACTION_OFFLOAD_ENABLED => true, SPLIT_ENABLED => false, MERGE_ENABLED => false) assert_equal(['a:'], table(@create_test_name).get_all_columns.sort) assert_match(/BLOCKCACHE => 'false'/, admin.describe(@create_test_name)) assert_match(/'COMPACTION_ENABLED' => 'false'/, admin.describe(@create_test_name)) + assert_match(/'COMPACTION_OFFLOAD_ENABLED' => 'true'/, admin.describe(@create_test_name)) assert_match(/'SPLIT_ENABLED' => 'false'/, admin.describe(@create_test_name)) assert_match(/'MERGE_ENABLED' => 'false'/, admin.describe(@create_test_name)) end