Skip to content

Commit

Permalink
HBASE-22072 High read/write intensive regions may cause long crash (#214
Browse files Browse the repository at this point in the history
)

* HBASE-22072 High read/write intensive regions may cause long crash
recovery

* Make the 'closing' variable as volatile and move the test case to
standlone class
  • Loading branch information
ramkrish86 authored and ramkrishna committed May 8, 2019
1 parent 82fd46e commit 5099ef2
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private int storeOffset = 0;

// Used to indicate that the scanner has closed (see HBASE-1107)
// Do not need to be volatile because it's always accessed via synchronized methods
private boolean closing = false;
private volatile boolean closing = false;
private final boolean get;
private final boolean explicitColumnQuery;
private final boolean useRowColBloom;
Expand Down Expand Up @@ -157,6 +156,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
final List<KeyValueScanner> currentScanners = new ArrayList<>();
// flush update lock
private final ReentrantLock flushLock = new ReentrantLock();
// lock for closing.
private final ReentrantLock closeLock = new ReentrantLock();

protected final long readPt;
private boolean topChanged = false;
Expand Down Expand Up @@ -473,31 +474,38 @@ public void close() {
}

private void close(boolean withDelayedScannersClose) {
if (this.closing) {
return;
}
if (withDelayedScannersClose) {
this.closing = true;
}
// For mob compaction, we do not have a store.
if (this.store != null) {
this.store.deleteChangedReaderObserver(this);
}
if (withDelayedScannersClose) {
clearAndClose(scannersForDelayedClose);
clearAndClose(memStoreScannersAfterFlush);
clearAndClose(flushedstoreFileScanners);
if (this.heap != null) {
this.heap.close();
this.currentScanners.clear();
this.heap = null; // CLOSED!
closeLock.lock();
// If the closeLock is acquired then any subsequent updateReaders()
// call is ignored.
try {
if (this.closing) {
return;
}
} else {
if (this.heap != null) {
this.scannersForDelayedClose.add(this.heap);
this.currentScanners.clear();
this.heap = null;
if (withDelayedScannersClose) {
this.closing = true;
}
// For mob compaction, we do not have a store.
if (this.store != null) {
this.store.deleteChangedReaderObserver(this);
}
if (withDelayedScannersClose) {
clearAndClose(scannersForDelayedClose);
clearAndClose(memStoreScannersAfterFlush);
clearAndClose(flushedstoreFileScanners);
if (this.heap != null) {
this.heap.close();
this.currentScanners.clear();
this.heap = null; // CLOSED!
}
} else {
if (this.heap != null) {
this.scannersForDelayedClose.add(this.heap);
this.currentScanners.clear();
this.heap = null;
}
}
} finally {
closeLock.unlock();
}
}

Expand Down Expand Up @@ -876,8 +884,25 @@ public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreSc
if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) {
return;
}
boolean updateReaders = false;
flushLock.lock();
try {
if (!closeLock.tryLock()) {
// The reason for doing this is that when the current store scanner does not retrieve
// any new cells, then the scanner is considered to be done. The heap of this scanner
// is not closed till the shipped() call is completed. Hence in that case if at all
// the partial close (close (false)) has been called before updateReaders(), there is no
// need for the updateReaders() to happen.
LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
// no lock acquired.
return;
}
// lock acquired
updateReaders = true;
if (this.closing) {
LOG.debug("StoreScanner already closing. There is no need to updateReaders");
return;
}
flushed = true;
final boolean isCompaction = false;
boolean usePread = get || scanUsePread;
Expand All @@ -896,6 +921,9 @@ public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreSc
}
} finally {
flushLock.unlock();
if (updateReaders) {
closeLock.unlock();
}
}
// Let the next() call handle re-creating and seeking
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import java.util.OptionalInt;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
Expand Down Expand Up @@ -74,6 +76,7 @@ public class TestStoreScanner {
private static final String CF_STR = "cf";
private static final byte[] CF = Bytes.toBytes(CF_STR);
static Configuration CONF = HBaseConfiguration.create();
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);

Expand Down Expand Up @@ -847,7 +850,6 @@ public void testScannerReseekDoesntNPE() throws Exception {
}
}


@Test @Ignore("this fails, since we don't handle deletions, etc, in peek")
public void testPeek() throws Exception {
KeyValue[] kvs = new KeyValue [] {
Expand Down
Loading

0 comments on commit 5099ef2

Please sign in to comment.