Skip to content

Commit

Permalink
HBASE-24337 Backport HBASE-23968 to branch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
Minwoo Kang committed May 6, 2020
1 parent 89ae3c5 commit f7cb89e
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
Expand Down Expand Up @@ -173,11 +174,12 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckSizeLimit = HStore.getCloseCheckInterval();
long currentTime = EnvironmentEdgeManager.currentTime();
long lastMillis = 0;
if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime();
lastMillis = currentTime;
}
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
Expand Down Expand Up @@ -216,8 +218,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
}
do {
hasMore = scanner.next(cells, scannerContext);
currentTime = EnvironmentEdgeManager.currentTime();
if (LOG.isDebugEnabled()) {
now = EnvironmentEdgeManager.currentTime();
now = currentTime;
}
if (closeChecker.isTimeLimit(store, currentTime)) {
progress.cancel();
return false;
}
for (Cell c : cells) {
if (major && CellUtil.isDelete(c)) {
Expand Down Expand Up @@ -290,16 +297,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
bytesWrittenProgressForLog += len;
}
throughputController.control(compactionName, len);
// check periodically to see if a system stop is requested
if (closeCheckSizeLimit > 0) {
bytesWrittenProgressForCloseCheck += len;
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
bytesWrittenProgressForCloseCheck = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
if (closeChecker.isSizeLimit(store, len)) {
progress.cancel();
return false;
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
((ShipperListener)writer).beforeShipped();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2206,15 +2206,13 @@ public boolean compact(CompactionContext compaction, HStore store,
* }
* Also in compactor.performCompaction():
* check periodically to see if a system stop is requested
* if (closeCheckInterval > 0) {
* bytesWritten += len;
* if (bytesWritten > closeCheckInterval) {
* bytesWritten = 0;
* if (!store.areWritesEnabled()) {
* progress.cancel();
* return false;
* }
* }
* if (closeChecker != null && closeChecker.isTimeLimit(store, now)) {
* progress.cancel();
* return false;
* }
* if (closeChecker != null && closeChecker.isSizeLimit(store, len)) {
* progress.cancel();
* return false;
* }
*/
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
protected CacheConfig cacheConf;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
static int closeCheckInterval = 0;
private AtomicLong storeSize = new AtomicLong();
private AtomicLong totalUncompressedBytes = new AtomicLong();

Expand Down Expand Up @@ -297,11 +295,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
}

if (HStore.closeCheckInterval == 0) {
HStore.closeCheckInterval = conf.getInt(
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}

this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
// Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
Expand Down Expand Up @@ -490,13 +483,6 @@ public static ChecksumType getChecksumType(Configuration conf) {
}
}

/**
* @return how many bytes to write between status checks
*/
public static int getCloseCheckInterval() {
return closeCheckInterval;
}

@Override
public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
return this.family;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Check periodically to see if a system stop is requested
*/
@InterfaceAudience.Private
public class CloseChecker {
public static final String SIZE_LIMIT_KEY = "hbase.hstore.close.check.interval";
public static final String TIME_LIMIT_KEY = "hbase.hstore.close.check.time.interval";

private final int closeCheckSizeLimit;
private final long closeCheckTimeLimit;

private long bytesWrittenProgressForCloseCheck;
private long lastCloseCheckMillis;

public CloseChecker(Configuration conf, long currentTime) {
this.closeCheckSizeLimit = conf.getInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
this.closeCheckTimeLimit = conf.getLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
this.bytesWrittenProgressForCloseCheck = 0;
this.lastCloseCheckMillis = currentTime;
}

/**
* Check periodically to see if a system stop is requested every written bytes reach size limit.
*
* @return if true, system stop.
*/
public boolean isSizeLimit(Store store, long bytesWritten) {
if (closeCheckSizeLimit <= 0) {
return false;
}

bytesWrittenProgressForCloseCheck += bytesWritten;
if (bytesWrittenProgressForCloseCheck <= closeCheckSizeLimit) {
return false;
}

bytesWrittenProgressForCloseCheck = 0;
return !store.areWritesEnabled();
}

/**
* Check periodically to see if a system stop is requested every time.
*
* @return if true, system stop.
*/
public boolean isTimeLimit(Store store, long now) {
if (closeCheckTimeLimit <= 0) {
return false;
}

final long elapsedMillis = now - lastCloseCheckMillis;
if (elapsedMillis <= closeCheckTimeLimit) {
return false;
}

lastCloseCheckMillis = now;
return !store.areWritesEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,17 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
assert writer instanceof ShipperListener;
long bytesWrittenProgressForCloseCheck = 0;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<>();
long closeCheckSizeLimit = HStore.getCloseCheckInterval();
long currentTime = EnvironmentEdgeManager.currentTime();
long lastMillis = 0;
if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime();
lastMillis = currentTime;
}
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
Expand All @@ -392,8 +392,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
try {
do {
hasMore = scanner.next(cells, scannerContext);
currentTime = EnvironmentEdgeManager.currentTime();
if (LOG.isDebugEnabled()) {
now = EnvironmentEdgeManager.currentTime();
now = currentTime;
}
if (closeChecker.isTimeLimit(store, currentTime)) {
progress.cancel();
return false;
}
// output to writer:
Cell lastCleanCell = null;
Expand All @@ -416,16 +421,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
bytesWrittenProgressForLog += len;
}
throughputController.control(compactionName, len);
// check periodically to see if a system stop is requested
if (closeCheckSizeLimit > 0) {
bytesWrittenProgressForCloseCheck += len;
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
bytesWrittenProgressForCloseCheck = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
if (closeChecker.isSizeLimit(store, len)) {
progress.cancel();
return false;
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
if (lastCleanCell != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -152,12 +154,11 @@ public void tearDown() throws Exception {
* @throws Exception
*/
@Test
public void testInterruptCompaction() throws Exception {
public void testInterruptCompactionBySize() throws Exception {
assertEquals(0, count());

// lower the polling interval for this test
int origWI = HStore.closeCheckInterval;
HStore.closeCheckInterval = 10*1000; // 10 KB
conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);

try {
// Create a couple store files w/ 15KB (over 10KB interval)
Expand Down Expand Up @@ -202,7 +203,84 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
} finally {
// don't mess up future tests
r.writestate.writesEnabled = true;
HStore.closeCheckInterval = origWI;
conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);

// Delete all Store information once done using
for (int i = 0; i < compactionThreshold; i++) {
Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
byte [][] famAndQf = {COLUMN_FAMILY, null};
delete.addFamily(famAndQf[0]);
r.delete(delete);
}
r.flush(true);

// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttl = 1000;
for (HStore store : this.r.stores.values()) {
ScanInfo old = store.getScanInfo();
ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
store.setScanInfo(si);
}
Thread.sleep(ttl);

r.compact(true);
assertEquals(0, count());
}
}

@Test
public void testInterruptCompactionByTime() throws Exception {
assertEquals(0, count());

// lower the polling interval for this test
conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);

try {
// Create a couple store files w/ 15KB (over 10KB interval)
int jmax = (int) Math.ceil(15.0/compactionThreshold);
byte [] pad = new byte[1000]; // 1 KB chunk
for (int i = 0; i < compactionThreshold; i++) {
Table loader = new RegionAsTable(r);
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
p.setDurability(Durability.SKIP_WAL);
for (int j = 0; j < jmax; j++) {
p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
}
HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
loader.put(p);
r.flush(true);
}

HRegion spyR = spy(r);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
r.writestate.writesEnabled = false;
return invocation.callRealMethod();
}
}).when(spyR).doRegionCompactionPrep();

// force a minor compaction, but not before requesting a stop
spyR.compactStores();

// ensure that the compaction stopped, all old files are intact,
HStore s = r.getStore(COLUMN_FAMILY);
assertEquals(compactionThreshold, s.getStorefilesCount());
assertTrue(s.getStorefilesSize() > 15*1000);
// and no new store files persisted past compactStores()
// only one empty dir exists in temp dir
FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
assertEquals(1, ls.length);
Path storeTempDir =
new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
assertTrue(r.getFilesystem().exists(storeTempDir));
ls = r.getFilesystem().listStatus(storeTempDir);
assertEquals(0, ls.length);
} finally {
// don't mess up future tests
r.writestate.writesEnabled = true;
conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);

// Delete all Store information once done using
for (int i = 0; i < compactionThreshold; i++) {
Expand Down
Loading

0 comments on commit f7cb89e

Please sign in to comment.