diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 6434460a5d18..5da0de9a3045 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -208,7 +208,7 @@ public MemStoreSnapshot snapshot() { stopCompaction(); // region level lock ensures pushing active to pipeline is done in isolation // no concurrent update operations trying to flush the active segment - pushActiveToPipeline(getActive()); + pushActiveToPipeline(getActive(), true); resetTimeOfOldestEdit(); snapshotId = EnvironmentEdgeManager.currentTime(); // in both cases whatever is pushed to snapshot is cleared from the pipeline @@ -413,34 +413,62 @@ protected List createList(int capacity) { } /** - * Check whether anything need to be done based on the current active set size. - * The method is invoked upon every addition to the active set. - * For CompactingMemStore, flush the active set to the read-only memory if it's - * size is above threshold + * Check whether anything need to be done based on the current active set size. The method is + * invoked upon every addition to the active set. For CompactingMemStore, flush the active set to + * the read-only memory if it's size is above threshold * @param currActive intended segment to update * @param cellToAdd cell to be added to the segment * @param memstoreSizing object to accumulate changed size - * @return true if the cell can be added to the + * @return true if the cell can be added to the currActive */ - private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, + protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, MemStoreSizing memstoreSizing) { - if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) { - if (currActive.setInMemoryFlushed()) { - flushInMemory(currActive); - if (setInMemoryCompactionFlag()) { - // The thread is dispatched to do in-memory compaction in the background - InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable(); - if (LOG.isTraceEnabled()) { - LOG.trace("Dispatching the MemStore in-memory flush for store " + store - .getColumnFamilyName()); - } - getPool().execute(runnable); + long cellSize = MutableSegment.getCellLength(cellToAdd); + boolean successAdd = false; + while (true) { + long segmentDataSize = currActive.getDataSize(); + if (!inWalReplay && segmentDataSize > inmemoryFlushSize) { + // when replaying edits from WAL there is no need in in-memory flush regardless the size + // otherwise size below flush threshold try to update atomically + break; + } + if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) { + if (memstoreSizing != null) { + memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0); } + successAdd = true; + break; } - return false; } - return true; - } + + if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) { + // size above flush threshold so we flush in memory + this.tryFlushInMemoryAndCompactingAsync(currActive); + } + return successAdd; + } + + /** + * Try to flush the currActive in memory and submit the background + * {@link InMemoryCompactionRunnable} to + * {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual + * flushing in memory. + * @param currActive current Active Segment to be flush in memory. + */ + private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) { + if (currActive.setInMemoryFlushed()) { + flushInMemory(currActive); + if (setInMemoryCompactionFlag()) { + // The thread is dispatched to do in-memory compaction in the background + InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable(); + if (LOG.isTraceEnabled()) { + LOG.trace( + "Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName()); + } + getPool().execute(runnable); + } + } + } // externally visible only for tests // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock, @@ -497,26 +525,6 @@ private ThreadPoolExecutor getPool() { return getRegionServices().getInMemoryCompactionPool(); } - protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd, - MemStoreSizing memstoreSizing) { - long cellSize = MutableSegment.getCellLength(cellToAdd); - long segmentDataSize = currActive.getDataSize(); - while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) { - // when replaying edits from WAL there is no need in in-memory flush regardless the size - // otherwise size below flush threshold try to update atomically - if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) { - if (memstoreSizing != null) { - memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0); - } - // enough space for cell - no need to flush - return false; - } - segmentDataSize = currActive.getDataSize(); - } - // size above flush threshold - return true; - } - /** * The request to cancel the compaction asynchronous task (caused by in-memory flush) * The compaction may still happen if the request was sent too late @@ -528,10 +536,6 @@ private void stopCompaction() { } } - protected void pushActiveToPipeline(MutableSegment currActive) { - pushActiveToPipeline(currActive, true); - } - /** * NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to * concurrent writes and because we first add cell size to currActive.getDataSize and then diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 28e439efd4e6..965529739ed9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -229,7 +229,7 @@ public boolean flattenOneSegment(long requesterVersion, if ( s.canBeFlattened() ) { s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed if (s.isEmpty()) { - // after s.waitForUpdates() is called, there is no updates preceding,if no cells in s, + // after s.waitForUpdates() is called, there is no updates pending,if no cells in s, // we can skip it. continue; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java index 4e861da3a5cd..56a46eacff6d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellFlatMapMemStore.java @@ -821,11 +821,11 @@ public void testFlatteningToJumboCellChunkMap() throws IOException { // The in-memory flush size is bigger than the size of a single cell, // but smaller than the size of two cells. - // Therefore, the two created cells are flattened together. + // Therefore, the two created cells are flushed together as a single CSLMImmutableSegment and + // flattened. totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM - + 1 * oneCellOnCSLMHeapSize - + 1 * oneCellOnCCMHeapSize; + + 2 * oneCellOnCCMHeapSize; assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 798ddae6f265..b64f52192740 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -50,6 +50,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntBinaryOperator; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -1713,8 +1717,11 @@ public void testHFileContextSetWithCFAndTable() throws Exception { assertArrayEquals(table, hFileContext.getTableName()); } + // This test is for HBASE-26026, HBase Write be stuck when active segment has no cell + // but its dataSize exceeds inmemoryFlushSize @Test - public void testCompactingMemStoreStuckBug26026() throws IOException, InterruptedException { + public void testCompactingMemStoreNoCellButDataSizeExceedsInmemoryFlushSize() + throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); byte[] smallValue = new byte[3]; @@ -1738,12 +1745,15 @@ public void testCompactingMemStoreStuckBug26026() throws IOException, Interrupte MyCompactingMemStore2 myCompactingMemStore = ((MyCompactingMemStore2) store.memstore); assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); myCompactingMemStore.smallCellPreUpdateCounter.set(0); - myCompactingMemStore.smallCellPostUpdateCounter.set(0); myCompactingMemStore.largeCellPreUpdateCounter.set(0); - myCompactingMemStore.largeCellPostUpdateCounter.set(0); + final AtomicReference exceptionRef = new AtomicReference(); Thread smallCellThread = new Thread(() -> { - store.add(smallCell, new NonThreadSafeMemStoreSizing()); + try { + store.add(smallCell, new NonThreadSafeMemStoreSizing()); + } catch (Throwable exception) { + exceptionRef.set(exception); + } }); smallCellThread.setName(MyCompactingMemStore2.SMALL_CELL_THREAD_NAME); smallCellThread.start(); @@ -1751,9 +1761,9 @@ public void testCompactingMemStoreStuckBug26026() throws IOException, Interrupte String oldThreadName = Thread.currentThread().getName(); try { /** - * 1.smallCellThread enters CompactingMemStore.shouldFlushInMemory first, when largeCellThread - * enters CompactingMemStore.shouldFlushInMemory, CompactingMemStore.active.getDataSize could - * not accommodate cellToAdd and CompactingMemStore.shouldFlushInMemory return true. + * 1.smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then + * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then largeCellThread + * invokes flushInMemory. *

* 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread * can add cell to currentActive . That is to say when largeCellThread called flushInMemory @@ -1772,6 +1782,143 @@ public void testCompactingMemStoreStuckBug26026() throws IOException, Interrupte Thread.currentThread().setName(oldThreadName); } + assertTrue(exceptionRef.get() == null); + + } + + // This test is for HBASE-26210, HBase Write be stuck when there is cell which size exceeds + // InmemoryFlushSize + @Test(timeout = 60000) + public void testCompactingMemStoreCellExceedInmemoryFlushSize() + throws IOException, InterruptedException { + Configuration conf = HBaseConfiguration.create(); + conf.set(HStore.MEMSTORE_CLASS_NAME, CompactingMemStore.class.getName()); + + init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) + .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); + + int size = (int) ((CompactingMemStore) store.memstore).getInmemoryFlushSize(); + byte[] value = new byte[size + 1]; + + MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(); + long timestamp = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + Cell cell = createCell(qf1, timestamp, seqId, value); + int cellByteSize = MutableSegment.getCellLength(cell); + store.add(cell, memStoreSizing); + assertTrue(memStoreSizing.getCellsCount() == 1); + assertTrue(memStoreSizing.getDataSize() == cellByteSize); + } + + // This test is for HBASE-26210 also, test write large cell and small cell concurrently when + // InmemoryFlushSize is smaller,equal with and larger than cell size. + @Test + public void testCompactingMemStoreWriteLargeCellAndSmallCellConcurrently() + throws IOException, InterruptedException { + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> largeCellByteSize - 1); + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> largeCellByteSize); + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize - 1); + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize); + doWriteTestLargeCellAndSmallCellConcurrently( + (smallCellByteSize, largeCellByteSize) -> smallCellByteSize + largeCellByteSize + 1); + } + + private void doWriteTestLargeCellAndSmallCellConcurrently( + IntBinaryOperator getFlushByteSize) + throws IOException, InterruptedException { + + Configuration conf = HBaseConfiguration.create(); + + byte[] smallValue = new byte[3]; + byte[] largeValue = new byte[100]; + final long timestamp = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue); + final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue); + int smallCellByteSize = MutableSegment.getCellLength(smallCell); + int largeCellByteSize = MutableSegment.getCellLength(largeCell); + int flushByteSize = getFlushByteSize.applyAsInt(smallCellByteSize, largeCellByteSize); + boolean flushByteSizeLessThanSmallAndLargeCellSize = + flushByteSize < (smallCellByteSize + largeCellByteSize); + + conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore3.class.getName()); + conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushByteSize * 200)); + + + init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family) + .setInMemoryCompaction(MemoryCompactionPolicy.BASIC).build()); + + MyCompactingMemStore3 myCompactingMemStore = ((MyCompactingMemStore3) store.memstore); + assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == flushByteSize); + myCompactingMemStore.disableCompaction(); + if (flushByteSizeLessThanSmallAndLargeCellSize) { + myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = true; + } else { + myCompactingMemStore.flushByteSizeLessThanSmallAndLargeCellSize = false; + } + + + final ThreadSafeMemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); + final AtomicLong totalCellByteSize = new AtomicLong(0); + final AtomicReference exceptionRef = new AtomicReference(); + Thread smallCellThread = new Thread(() -> { + try { + for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { + long currentTimestamp = timestamp + i; + Cell cell = createCell(qf1, currentTimestamp, seqId, smallValue); + totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); + store.add(cell, memStoreSizing); + } + } catch (Throwable exception) { + exceptionRef.set(exception); + + } + }); + smallCellThread.setName(MyCompactingMemStore3.SMALL_CELL_THREAD_NAME); + smallCellThread.start(); + + String oldThreadName = Thread.currentThread().getName(); + try { + /** + * When flushByteSizeLessThanSmallAndLargeCellSize is true: + *

+ * 1.smallCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize first, then + * largeCellThread enters MyCompactingMemStore3.checkAndAddToActiveSize, and then + * largeCellThread invokes flushInMemory. + *

+ * 2. After largeCellThread finished CompactingMemStore.flushInMemory method, smallCellThread + * can run into MyCompactingMemStore3.checkAndAddToActiveSize again. + *

+ * When flushByteSizeLessThanSmallAndLargeCellSize is false: smallCellThread and + * largeCellThread concurrently write one cell and wait each other, and then write another + * cell etc. + */ + Thread.currentThread().setName(MyCompactingMemStore3.LARGE_CELL_THREAD_NAME); + for (int i = 1; i <= MyCompactingMemStore3.CELL_COUNT; i++) { + long currentTimestamp = timestamp + i; + Cell cell = createCell(qf2, currentTimestamp, seqId, largeValue); + totalCellByteSize.addAndGet(MutableSegment.getCellLength(cell)); + store.add(cell, memStoreSizing); + } + smallCellThread.join(); + + assertTrue(exceptionRef.get() == null); + assertTrue(memStoreSizing.getCellsCount() == (MyCompactingMemStore3.CELL_COUNT * 2)); + assertTrue(memStoreSizing.getDataSize() == totalCellByteSize.get()); + if (flushByteSizeLessThanSmallAndLargeCellSize) { + assertTrue(myCompactingMemStore.flushCounter.get() == MyCompactingMemStore3.CELL_COUNT); + } else { + assertTrue( + myCompactingMemStore.flushCounter.get() <= (MyCompactingMemStore3.CELL_COUNT - 1)); + } + } finally { + Thread.currentThread().setName(oldThreadName); + } } private HStoreFile mockStoreFileWithLength(long length) { @@ -1875,7 +2022,7 @@ protected List createList(int capacity) { return new ArrayList<>(capacity); } @Override - protected void pushActiveToPipeline(MutableSegment active) { + protected void pushActiveToPipeline(MutableSegment active, boolean checkEmpty) { if (START_TEST.get()) { try { getScannerLatch.await(); @@ -1884,7 +2031,7 @@ protected void pushActiveToPipeline(MutableSegment active) { } } - super.pushActiveToPipeline(active); + super.pushActiveToPipeline(active, checkEmpty); if (START_TEST.get()) { snapshotLatch.countDown(); } @@ -1981,8 +2128,6 @@ public static class MyCompactingMemStore2 extends CompactingMemStore { private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); private final AtomicInteger largeCellPreUpdateCounter = new AtomicInteger(0); private final AtomicInteger smallCellPreUpdateCounter = new AtomicInteger(0); - private final AtomicInteger largeCellPostUpdateCounter = new AtomicInteger(0); - private final AtomicInteger smallCellPostUpdateCounter = new AtomicInteger(0); public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparator, HStore store, RegionServicesForStores regionServices, @@ -1990,16 +2135,17 @@ public MyCompactingMemStore2(Configuration conf, CellComparatorImpl cellComparat super(conf, cellComparator, store, regionServices, compactionPolicy); } - protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd, + @Override + protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, MemStoreSizing memstoreSizing) { if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { int currentCount = largeCellPreUpdateCounter.incrementAndGet(); if (currentCount <= 1) { try { /** - * smallCellThread enters super.shouldFlushInMemory first, when largeCellThread enters - * super.shouldFlushInMemory, currActive.getDataSize could not accommodate cellToAdd and - * super.shouldFlushInMemory return true. + * smallCellThread enters CompactingMemStore.checkAndAddToActiveSize first, then + * largeCellThread enters CompactingMemStore.checkAndAddToActiveSize, and then + * largeCellThread invokes flushInMemory. */ preCyclicBarrier.await(); } catch (Throwable e) { @@ -2008,7 +2154,7 @@ protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd, } } - boolean returnValue = super.shouldFlushInMemory(currActive, cellToAdd, memstoreSizing); + boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { try { preCyclicBarrier.await(); @@ -2051,4 +2197,93 @@ protected void flushInMemory(MutableSegment currentActiveMutableSegment) { } } + + public static class MyCompactingMemStore3 extends CompactingMemStore { + private static final String LARGE_CELL_THREAD_NAME = "largeCellThread"; + private static final String SMALL_CELL_THREAD_NAME = "smallCellThread"; + + private final CyclicBarrier preCyclicBarrier = new CyclicBarrier(2); + private final CyclicBarrier postCyclicBarrier = new CyclicBarrier(2); + private final AtomicInteger flushCounter = new AtomicInteger(0); + private static final int CELL_COUNT = 5; + private boolean flushByteSizeLessThanSmallAndLargeCellSize = true; + + public MyCompactingMemStore3(Configuration conf, CellComparatorImpl cellComparator, + HStore store, RegionServicesForStores regionServices, + MemoryCompactionPolicy compactionPolicy) throws IOException { + super(conf, cellComparator, store, regionServices, compactionPolicy); + } + + @Override + protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd, + MemStoreSizing memstoreSizing) { + if (!flushByteSizeLessThanSmallAndLargeCellSize) { + return super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); + } + if (Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)) { + try { + preCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + boolean returnValue = super.checkAndAddToActiveSize(currActive, cellToAdd, memstoreSizing); + if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { + try { + preCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + return returnValue; + } + + @Override + protected void postUpdate(MutableSegment currentActiveMutableSegment) { + super.postUpdate(currentActiveMutableSegment); + if (!flushByteSizeLessThanSmallAndLargeCellSize) { + try { + postCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + return; + } + + if (Thread.currentThread().getName().equals(SMALL_CELL_THREAD_NAME)) { + try { + postCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + + @Override + protected void flushInMemory(MutableSegment currentActiveMutableSegment) { + super.flushInMemory(currentActiveMutableSegment); + flushCounter.incrementAndGet(); + if (!flushByteSizeLessThanSmallAndLargeCellSize) { + return; + } + + assertTrue(Thread.currentThread().getName().equals(LARGE_CELL_THREAD_NAME)); + try { + postCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + } + + void disableCompaction() { + allowCompaction.set(false); + } + + void enableCompaction() { + allowCompaction.set(true); + } + + } }