diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 28768679fb65..256f9a78624f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -143,16 +143,28 @@ public void run() { entryStream.reset(); // reuse stream continue; } - // if we have already switched a file, skip reading and put it directly to the ship queue - if (!batch.isEndOfFile()) { - readWALEntries(entryStream, batch); - currentPosition = entryStream.getPosition(); + boolean successAddToQueue = false; + try { + // if we have already switched a file, skip reading and put it directly to the ship + // queue + if (!batch.isEndOfFile()) { + readWALEntries(entryStream, batch); + currentPosition = entryStream.getPosition(); + } + // need to propagate the batch even it has no entries since it may carry the last + // sequence id information for serial replication. + LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); + entryBatchQueue.put(batch); + successAddToQueue = true; + sleepMultiplier = 1; + } finally { + if (!successAddToQueue) { + // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should + // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which + // acquired in ReplicationSourceWALReader.acquireBufferQuota. + this.releaseBufferQuota(batch); + } } - // need to propagate the batch even it has no entries since it may carry the last - // sequence id information for serial replication. - LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); - entryBatchQueue.put(batch); - sleepMultiplier = 1; } } catch (WALEntryFilterRetryableException | IOException e) { // stream related if (!handleEofException(e, batch)) { @@ -182,7 +194,7 @@ protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry, entrySize); updateBatchStats(batch, entry, entrySize); - boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad); + boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad); // Stop if too many entries or too big return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity @@ -455,13 +467,26 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { * @param size delta size for grown buffer * @return true if we should clear buffer and push all */ - private boolean acquireBufferQuota(long size) { + private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) { long newBufferUsed = totalBufferUsed.addAndGet(size); // Record the new buffer usage this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + walEntryBatch.incrementUsedBufferSize(size); return newBufferUsed >= totalBufferQuota; } + /** + * To release the buffer quota of {@link WALEntryBatch} which acquired by + * {@link ReplicationSourceWALReader#acquireBufferQuota} + */ + private void releaseBufferQuota(WALEntryBatch walEntryBatch) { + long usedBufferSize = walEntryBatch.getUsedBufferSize(); + if (usedBufferSize > 0) { + long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize); + this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + } + } + /** Returns whether the reader thread is running */ public boolean isReaderRunning() { return isReaderRunning && !isInterrupted(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index b5ef0f92bccb..06c97db785d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -52,6 +52,9 @@ class WALEntryBatch { private Map lastSeqIds = new HashMap<>(); // indicate that this is the end of the current file private boolean endOfFile; + // indicate the buffer size used, which is added to + // ReplicationSourceWALReader.totalBufferUsed + private long usedBufferSize = 0; /** * @param lastWalPath Path of the WAL the last entry in this batch was read from @@ -153,11 +156,19 @@ public void setLastSeqId(String region, long sequenceId) { lastSeqIds.put(region, sequenceId); } + public void incrementUsedBufferSize(long increment) { + usedBufferSize += increment; + } + + public long getUsedBufferSize() { + return this.usedBufferSize; + } + @Override public String toString() { return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" - + endOfFile + "]"; + + endOfFile + ",usedBufferSize=" + usedBufferSize + "]"; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index eda89b232c37..a997cd0a9d58 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -280,6 +280,13 @@ private ReplicationSource mockReplicationSource(boolean recovered, Configuration when(source.isRecovered()).thenReturn(recovered); MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(MetricsReplicationGlobalSourceSource.class); + final AtomicLong bufferUsedCounter = new AtomicLong(0); + Mockito.doAnswer((invocationOnMock) -> { + bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class)); + return null; + }).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong()); + when(globalMetrics.getWALReaderEditsBufferBytes()) + .then(invocationOnMock -> bufferUsedCounter.get()); when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); return source; } @@ -764,4 +771,80 @@ public void testEOFExceptionInOldWALsDirectory() throws Exception { Waiter.waitFor(localConf, 10000, (Waiter.Predicate) () -> logQueue.getQueueSize(fakeWalGroupId) == 1); } + + /** + * This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some + * entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be + * decreased because {@link WALEntryBatch} is not put to + * {@link ReplicationSourceWALReader#entryBatchQueue}. + */ + @Test + public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception { + appendEntriesToLogAndSync(3); + // get ending position + long position; + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { + entryStream.next(); + entryStream.next(); + entryStream.next(); + position = entryStream.getPosition(); + } + + Path walPath = getQueue().peek(); + int maxThrowExceptionCount = 3; + + ReplicationSource source = mockReplicationSource(false, CONF); + when(source.isPeerEnabled()).thenReturn(true); + PartialWALEntryFailingWALEntryFilter walEntryFilter = + new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3); + ReplicationSourceWALReader reader = + new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId); + reader.start(); + WALEntryBatch entryBatch = reader.take(); + + assertNotNull(entryBatch); + assertEquals(3, entryBatch.getWalEntries().size()); + long sum = entryBatch.getWalEntries().stream() + .mapToLong(ReplicationSourceWALReader::getEntrySizeExcludeBulkLoad).sum(); + assertEquals(position, entryBatch.getLastWalPosition()); + assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(3, entryBatch.getNbRowKeys()); + assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get()); + assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes()); + assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount()); + assertNull(reader.poll(10)); + } + + private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter { + private int filteredWALEntryCount = -1; + private int walEntryCount = 0; + private int throwExceptionCount = -1; + private int maxThrowExceptionCount; + + public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) { + this.maxThrowExceptionCount = throwExceptionLimit; + this.walEntryCount = walEntryCount; + } + + @Override + public Entry filter(Entry entry) { + filteredWALEntryCount++; + if (filteredWALEntryCount < walEntryCount - 1) { + return entry; + } + + filteredWALEntryCount = -1; + throwExceptionCount++; + if (throwExceptionCount <= maxThrowExceptionCount - 1) { + throw new WALEntryFilterRetryableException("failing filter"); + } + return entry; + } + + public int getThrowExceptionCount() { + return throwExceptionCount; + } + } + }