Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26210 HBase Write should be doomed to hang when cell size excee… #3604

Merged
merged 5 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public MemStoreSnapshot snapshot() {
stopCompaction();
// region level lock ensures pushing active to pipeline is done in isolation
// no concurrent update operations trying to flush the active segment
pushActiveToPipeline(getActive());
pushActiveToPipeline(getActive(), true);
resetTimeOfOldestEdit();
snapshotId = EnvironmentEdgeManager.currentTime();
// in both cases whatever is pushed to snapshot is cleared from the pipeline
Expand Down Expand Up @@ -413,34 +413,62 @@ protected List<KeyValueScanner> createList(int capacity) {
}

/**
* Check whether anything need to be done based on the current active set size.
* The method is invoked upon every addition to the active set.
* For CompactingMemStore, flush the active set to the read-only memory if it's
* size is above threshold
* Check whether anything need to be done based on the current active set size. The method is
* invoked upon every addition to the active set. For CompactingMemStore, flush the active set to
* the read-only memory if it's size is above threshold
* @param currActive intended segment to update
* @param cellToAdd cell to be added to the segment
* @param memstoreSizing object to accumulate changed size
* @return true if the cell can be added to the
* @return true if the cell can be added to the currActive
*/
private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
if (currActive.setInMemoryFlushed()) {
flushInMemory(currActive);
if (setInMemoryCompactionFlag()) {
// The thread is dispatched to do in-memory compaction in the background
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
if (LOG.isTraceEnabled()) {
LOG.trace("Dispatching the MemStore in-memory flush for store " + store
.getColumnFamilyName());
}
getPool().execute(runnable);
long cellSize = MutableSegment.getCellLength(cellToAdd);
boolean successAdd = false;
while (true) {
long segmentDataSize = currActive.getDataSize();
if (!inWalReplay && segmentDataSize > inmemoryFlushSize) {
// when replaying edits from WAL there is no need in in-memory flush regardless the size
// otherwise size below flush threshold try to update atomically
break;
}
if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
}
successAdd = true;
break;
}
return false;
}
return true;
}

if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) {
// size above flush threshold so we flush in memory
this.tryFlushInMemoryAndCompactingAsync(currActive);
}
return successAdd;
}

/**
* Try to flush the currActive in memory and submit the background
* {@link InMemoryCompactionRunnable} to
* {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual
* flushing in memory.
* @param currActive current Active Segment to be flush in memory.
*/
private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) {
if (currActive.setInMemoryFlushed()) {
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
flushInMemory(currActive);
if (setInMemoryCompactionFlag()) {
// The thread is dispatched to do in-memory compaction in the background
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
if (LOG.isTraceEnabled()) {
LOG.trace(
"Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
}
getPool().execute(runnable);
}
}
}

// externally visible only for tests
// when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
Expand Down Expand Up @@ -497,26 +525,6 @@ private ThreadPoolExecutor getPool() {
return getRegionServices().getInMemoryCompactionPool();
}

protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
long cellSize = MutableSegment.getCellLength(cellToAdd);
long segmentDataSize = currActive.getDataSize();
while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
// when replaying edits from WAL there is no need in in-memory flush regardless the size
// otherwise size below flush threshold try to update atomically
if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
}
// enough space for cell - no need to flush
return false;
}
segmentDataSize = currActive.getDataSize();
}
// size above flush threshold
return true;
}

/**
* The request to cancel the compaction asynchronous task (caused by in-memory flush)
* The compaction may still happen if the request was sent too late
Expand All @@ -528,10 +536,6 @@ private void stopCompaction() {
}
}

protected void pushActiveToPipeline(MutableSegment currActive) {
pushActiveToPipeline(currActive, true);
}

/**
* NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
* concurrent writes and because we first add cell size to currActive.getDataSize and then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public boolean flattenOneSegment(long requesterVersion,
if ( s.canBeFlattened() ) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
if (s.isEmpty()) {
// after s.waitForUpdates() is called, there is no updates preceding,if no cells in s,
// after s.waitForUpdates() is called, there is no updates pending,if no cells in s,
// we can skip it.
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,11 +821,11 @@ public void testFlatteningToJumboCellChunkMap() throws IOException {

// The in-memory flush size is bigger than the size of a single cell,
// but smaller than the size of two cells.
// Therefore, the two created cells are flattened together.
// Therefore, the two created cells are flushed together as a single CSLMImmutableSegment and
// flattened.
totalHeapSize = MutableSegment.DEEP_OVERHEAD
+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ 1 * oneCellOnCSLMHeapSize
+ 1 * oneCellOnCCMHeapSize;
+ 2 * oneCellOnCCMHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
}

Expand Down
Loading