From f7cb89ef95425057ef0d62cc654ff92596cff2b0 Mon Sep 17 00:00:00 2001 From: Minwoo Kang Date: Thu, 2 Apr 2020 10:46:01 +0900 Subject: [PATCH] HBASE-24337 Backport HBASE-23968 to branch-2 --- .../hbase/mob/DefaultMobStoreCompactor.java | 26 +++--- .../hadoop/hbase/regionserver/HRegion.java | 16 ++-- .../hadoop/hbase/regionserver/HStore.java | 14 --- .../compactions/CloseChecker.java | 80 +++++++++++++++++ .../regionserver/compactions/Compactor.java | 26 +++--- .../hbase/regionserver/TestCompaction.java | 86 ++++++++++++++++++- .../compactions/TestCloseChecker.java | 80 +++++++++++++++++ 7 files changed, 274 insertions(+), 54 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CloseChecker.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCloseChecker.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index e73a7d2226d7..9ca10e617519 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; @@ -173,11 +174,12 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel // we have to use a do/while loop. List cells = new ArrayList<>(); // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME - int closeCheckSizeLimit = HStore.getCloseCheckInterval(); + long currentTime = EnvironmentEdgeManager.currentTime(); long lastMillis = 0; if (LOG.isDebugEnabled()) { - lastMillis = EnvironmentEdgeManager.currentTime(); + lastMillis = currentTime; } + CloseChecker closeChecker = new CloseChecker(conf, currentTime); String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); long now = 0; boolean hasMore; @@ -216,8 +218,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel } do { hasMore = scanner.next(cells, scannerContext); + currentTime = EnvironmentEdgeManager.currentTime(); if (LOG.isDebugEnabled()) { - now = EnvironmentEdgeManager.currentTime(); + now = currentTime; + } + if (closeChecker.isTimeLimit(store, currentTime)) { + progress.cancel(); + return false; } for (Cell c : cells) { if (major && CellUtil.isDelete(c)) { @@ -290,16 +297,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel bytesWrittenProgressForLog += len; } throughputController.control(compactionName, len); - // check periodically to see if a system stop is requested - if (closeCheckSizeLimit > 0) { - bytesWrittenProgressForCloseCheck += len; - if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { - bytesWrittenProgressForCloseCheck = 0; - if (!store.areWritesEnabled()) { - progress.cancel(); - return false; - } - } + if (closeChecker.isSizeLimit(store, len)) { + progress.cancel(); + return false; } if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { ((ShipperListener)writer).beforeShipped(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index fb227ecf5a4d..27489ec88599 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2206,15 +2206,13 @@ public boolean compact(CompactionContext compaction, HStore store, * } * Also in compactor.performCompaction(): * check periodically to see if a system stop is requested - * if (closeCheckInterval > 0) { - * bytesWritten += len; - * if (bytesWritten > closeCheckInterval) { - * bytesWritten = 0; - * if (!store.areWritesEnabled()) { - * progress.cancel(); - * return false; - * } - * } + * if (closeChecker != null && closeChecker.isTimeLimit(store, now)) { + * progress.cancel(); + * return false; + * } + * if (closeChecker != null && closeChecker.isSizeLimit(store, len)) { + * progress.cancel(); + * return false; * } */ try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 610381936fe7..0b85f93b2adb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -154,8 +154,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, protected CacheConfig cacheConf; private long lastCompactSize = 0; volatile boolean forceMajor = false; - /* how many bytes to write between status checks */ - static int closeCheckInterval = 0; private AtomicLong storeSize = new AtomicLong(); private AtomicLong totalUncompressedBytes = new AtomicLong(); @@ -297,11 +295,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; } - if (HStore.closeCheckInterval == 0) { - HStore.closeCheckInterval = conf.getInt( - "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); - } - this.storeEngine = createStoreEngine(this, this.conf, this.comparator); List hStoreFiles = loadStoreFiles(warmup); // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read @@ -490,13 +483,6 @@ public static ChecksumType getChecksumType(Configuration conf) { } } - /** - * @return how many bytes to write between status checks - */ - public static int getCloseCheckInterval() { - return closeCheckInterval; - } - @Override public ColumnFamilyDescriptor getColumnFamilyDescriptor() { return this.family; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CloseChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CloseChecker.java new file mode 100644 index 000000000000..64ef7e7aa87d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CloseChecker.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Check periodically to see if a system stop is requested + */ +@InterfaceAudience.Private +public class CloseChecker { + public static final String SIZE_LIMIT_KEY = "hbase.hstore.close.check.interval"; + public static final String TIME_LIMIT_KEY = "hbase.hstore.close.check.time.interval"; + + private final int closeCheckSizeLimit; + private final long closeCheckTimeLimit; + + private long bytesWrittenProgressForCloseCheck; + private long lastCloseCheckMillis; + + public CloseChecker(Configuration conf, long currentTime) { + this.closeCheckSizeLimit = conf.getInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); + this.closeCheckTimeLimit = conf.getLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); + this.bytesWrittenProgressForCloseCheck = 0; + this.lastCloseCheckMillis = currentTime; + } + + /** + * Check periodically to see if a system stop is requested every written bytes reach size limit. + * + * @return if true, system stop. + */ + public boolean isSizeLimit(Store store, long bytesWritten) { + if (closeCheckSizeLimit <= 0) { + return false; + } + + bytesWrittenProgressForCloseCheck += bytesWritten; + if (bytesWrittenProgressForCloseCheck <= closeCheckSizeLimit) { + return false; + } + + bytesWrittenProgressForCloseCheck = 0; + return !store.areWritesEnabled(); + } + + /** + * Check periodically to see if a system stop is requested every time. + * + * @return if true, system stop. + */ + public boolean isTimeLimit(Store store, long now) { + if (closeCheckTimeLimit <= 0) { + return false; + } + + final long elapsedMillis = now - lastCloseCheckMillis; + if (elapsedMillis <= closeCheckTimeLimit) { + return false; + } + + lastCloseCheckMillis = now; + return !store.areWritesEnabled(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 10fac550180b..bc8d9432da31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -368,17 +368,17 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { assert writer instanceof ShipperListener; - long bytesWrittenProgressForCloseCheck = 0; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. List cells = new ArrayList<>(); - long closeCheckSizeLimit = HStore.getCloseCheckInterval(); + long currentTime = EnvironmentEdgeManager.currentTime(); long lastMillis = 0; if (LOG.isDebugEnabled()) { - lastMillis = EnvironmentEdgeManager.currentTime(); + lastMillis = currentTime; } + CloseChecker closeChecker = new CloseChecker(conf, currentTime); String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); long now = 0; boolean hasMore; @@ -392,8 +392,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel try { do { hasMore = scanner.next(cells, scannerContext); + currentTime = EnvironmentEdgeManager.currentTime(); if (LOG.isDebugEnabled()) { - now = EnvironmentEdgeManager.currentTime(); + now = currentTime; + } + if (closeChecker.isTimeLimit(store, currentTime)) { + progress.cancel(); + return false; } // output to writer: Cell lastCleanCell = null; @@ -416,16 +421,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel bytesWrittenProgressForLog += len; } throughputController.control(compactionName, len); - // check periodically to see if a system stop is requested - if (closeCheckSizeLimit > 0) { - bytesWrittenProgressForCloseCheck += len; - if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { - bytesWrittenProgressForCloseCheck = 0; - if (!store.areWritesEnabled()) { - progress.cancel(); - return false; - } - } + if (closeChecker.isSizeLimit(store, len)) { + progress.cancel(); + return false; } if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { if (lastCleanCell != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 06bab0bc60f6..ee954bf1a867 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; +import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; +import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -152,12 +154,11 @@ public void tearDown() throws Exception { * @throws Exception */ @Test - public void testInterruptCompaction() throws Exception { + public void testInterruptCompactionBySize() throws Exception { assertEquals(0, count()); // lower the polling interval for this test - int origWI = HStore.closeCheckInterval; - HStore.closeCheckInterval = 10*1000; // 10 KB + conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */); try { // Create a couple store files w/ 15KB (over 10KB interval) @@ -202,7 +203,84 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } finally { // don't mess up future tests r.writestate.writesEnabled = true; - HStore.closeCheckInterval = origWI; + conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); + + // Delete all Store information once done using + for (int i = 0; i < compactionThreshold; i++) { + Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); + byte [][] famAndQf = {COLUMN_FAMILY, null}; + delete.addFamily(famAndQf[0]); + r.delete(delete); + } + r.flush(true); + + // Multiple versions allowed for an entry, so the delete isn't enough + // Lower TTL and expire to ensure that all our entries have been wiped + final int ttl = 1000; + for (HStore store : this.r.stores.values()) { + ScanInfo old = store.getScanInfo(); + ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); + store.setScanInfo(si); + } + Thread.sleep(ttl); + + r.compact(true); + assertEquals(0, count()); + } + } + + @Test + public void testInterruptCompactionByTime() throws Exception { + assertEquals(0, count()); + + // lower the polling interval for this test + conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */); + + try { + // Create a couple store files w/ 15KB (over 10KB interval) + int jmax = (int) Math.ceil(15.0/compactionThreshold); + byte [] pad = new byte[1000]; // 1 KB chunk + for (int i = 0; i < compactionThreshold; i++) { + Table loader = new RegionAsTable(r); + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); + p.setDurability(Durability.SKIP_WAL); + for (int j = 0; j < jmax; j++) { + p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); + } + HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); + loader.put(p); + r.flush(true); + } + + HRegion spyR = spy(r); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + r.writestate.writesEnabled = false; + return invocation.callRealMethod(); + } + }).when(spyR).doRegionCompactionPrep(); + + // force a minor compaction, but not before requesting a stop + spyR.compactStores(); + + // ensure that the compaction stopped, all old files are intact, + HStore s = r.getStore(COLUMN_FAMILY); + assertEquals(compactionThreshold, s.getStorefilesCount()); + assertTrue(s.getStorefilesSize() > 15*1000); + // and no new store files persisted past compactStores() + // only one empty dir exists in temp dir + FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); + assertEquals(1, ls.length); + Path storeTempDir = + new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); + assertTrue(r.getFilesystem().exists(storeTempDir)); + ls = r.getFilesystem().listStatus(storeTempDir); + assertEquals(0, ls.length); + } finally { + // don't mess up future tests + r.writestate.writesEnabled = true; + conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); // Delete all Store information once done using for (int i = 0; i < compactionThreshold; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCloseChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCloseChecker.java new file mode 100644 index 000000000000..ef42c19a2a69 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCloseChecker.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; +import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestCloseChecker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCloseChecker.class); + + @Test + public void testIsClosed() { + Store enableWrite = mock(Store.class); + when(enableWrite.areWritesEnabled()).thenReturn(true); + + Store disableWrite = mock(Store.class); + when(disableWrite.areWritesEnabled()).thenReturn(false); + + Configuration conf = new Configuration(); + + long currentTime = System.currentTimeMillis(); + + conf.setInt(SIZE_LIMIT_KEY, 10); + conf.setLong(TIME_LIMIT_KEY, 10); + + CloseChecker closeChecker = new CloseChecker(conf, currentTime); + assertFalse(closeChecker.isTimeLimit(enableWrite, currentTime)); + assertFalse(closeChecker.isSizeLimit(enableWrite, 10L)); + + closeChecker = new CloseChecker(conf, currentTime); + assertFalse(closeChecker.isTimeLimit(enableWrite, currentTime + 11)); + assertFalse(closeChecker.isSizeLimit(enableWrite, 11L)); + + closeChecker = new CloseChecker(conf, currentTime); + assertTrue(closeChecker.isTimeLimit(disableWrite, currentTime + 11)); + assertTrue(closeChecker.isSizeLimit(disableWrite, 11L)); + + for (int i = 0; i < 10; i++) { + int plusTime = 5 * i; + assertFalse(closeChecker.isTimeLimit(enableWrite, currentTime + plusTime)); + assertFalse(closeChecker.isSizeLimit(enableWrite, 5L)); + } + + closeChecker = new CloseChecker(conf, currentTime); + assertFalse(closeChecker.isTimeLimit(disableWrite, currentTime + 6)); + assertFalse(closeChecker.isSizeLimit(disableWrite, 6)); + assertTrue(closeChecker.isTimeLimit(disableWrite, currentTime + 12)); + assertTrue(closeChecker.isSizeLimit(disableWrite, 6)); + } +} \ No newline at end of file