Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27778 Incorrect ReplicationSourceWALReader. totalBufferUsed may… #5162

Merged
merged 1 commit into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,28 @@ public void run() {
entryStream.reset(); // reuse stream
continue;
}
// if we have already switched a file, skip reading and put it directly to the ship queue
if (!batch.isEndOfFile()) {
readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
boolean successAddToQueue = false;
try {
// if we have already switched a file, skip reading and put it directly to the ship
// queue
if (!batch.isEndOfFile()) {
readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
}
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
successAddToQueue = true;
sleepMultiplier = 1;
} finally {
if (!successAddToQueue) {
// batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
// decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
// acquired in ReplicationSourceWALReader.acquireBufferQuota.
this.releaseBufferQuota(batch);
}
}
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
sleepMultiplier = 1;
}
} catch (WALEntryFilterRetryableException | IOException e) { // stream related
if (!handleEofException(e, batch)) {
Expand Down Expand Up @@ -182,7 +194,7 @@ protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad);

// Stop if too many entries or too big
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
Expand Down Expand Up @@ -455,13 +467,26 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
* @param size delta size for grown buffer
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
walEntryBatch.incrementUsedBufferSize(size);
return newBufferUsed >= totalBufferQuota;
}

/**
* To release the buffer quota of {@link WALEntryBatch} which acquired by
* {@link ReplicationSourceWALReader#acquireBufferQuota}
*/
private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
long usedBufferSize = walEntryBatch.getUsedBufferSize();
if (usedBufferSize > 0) {
long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}

/** Returns whether the reader thread is running */
public boolean isReaderRunning() {
return isReaderRunning && !isInterrupted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class WALEntryBatch {
private Map<String, Long> lastSeqIds = new HashMap<>();
// indicate that this is the end of the current file
private boolean endOfFile;
// indicate the buffer size used, which is added to
// ReplicationSourceWALReader.totalBufferUsed
private long usedBufferSize = 0;

/**
* @param lastWalPath Path of the WAL the last entry in this batch was read from
Expand Down Expand Up @@ -153,11 +156,19 @@ public void setLastSeqId(String region, long sequenceId) {
lastSeqIds.put(region, sequenceId);
}

public void incrementUsedBufferSize(long increment) {
usedBufferSize += increment;
}

public long getUsedBufferSize() {
return this.usedBufferSize;
}

@Override
public String toString() {
return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath
+ ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles="
+ nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile="
+ endOfFile + "]";
+ endOfFile + ",usedBufferSize=" + usedBufferSize + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,13 @@ private ReplicationSource mockReplicationSource(boolean recovered, Configuration
when(source.isRecovered()).thenReturn(recovered);
MetricsReplicationGlobalSourceSource globalMetrics =
Mockito.mock(MetricsReplicationGlobalSourceSource.class);
final AtomicLong bufferUsedCounter = new AtomicLong(0);
Mockito.doAnswer((invocationOnMock) -> {
bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class));
return null;
}).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
when(globalMetrics.getWALReaderEditsBufferBytes())
.then(invocationOnMock -> bufferUsedCounter.get());
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
}
Expand Down Expand Up @@ -764,4 +771,80 @@ public void testEOFExceptionInOldWALsDirectory() throws Exception {
Waiter.waitFor(localConf, 10000,
(Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
}

/**
* This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some
* entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be
* decreased because {@link WALEntryBatch} is not put to
* {@link ReplicationSourceWALReader#entryBatchQueue}.
*/
@Test
public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception {
appendEntriesToLogAndSync(3);
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}

Path walPath = getQueue().peek();
int maxThrowExceptionCount = 3;

ReplicationSource source = mockReplicationSource(false, CONF);
when(source.isPeerEnabled()).thenReturn(true);
PartialWALEntryFailingWALEntryFilter walEntryFilter =
new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3);
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId);
reader.start();
WALEntryBatch entryBatch = reader.take();

assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
long sum = entryBatch.getWalEntries().stream()
.mapToLong(ReplicationSourceWALReader::getEntrySizeExcludeBulkLoad).sum();
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get());
assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount());
assertNull(reader.poll(10));
}

private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
private int filteredWALEntryCount = -1;
private int walEntryCount = 0;
private int throwExceptionCount = -1;
private int maxThrowExceptionCount;

public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) {
this.maxThrowExceptionCount = throwExceptionLimit;
this.walEntryCount = walEntryCount;
}

@Override
public Entry filter(Entry entry) {
filteredWALEntryCount++;
if (filteredWALEntryCount < walEntryCount - 1) {
return entry;
}

filteredWALEntryCount = -1;
throwExceptionCount++;
if (throwExceptionCount <= maxThrowExceptionCount - 1) {
throw new WALEntryFilterRetryableException("failing filter");
}
return entry;
}

public int getThrowExceptionCount() {
return throwExceptionCount;
}
}

}