Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,9 @@ protected final int sleep(int sleepMultiplier) {
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
WALEntryBatch batch = null;
try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
batch = null;
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
Expand Down Expand Up @@ -174,14 +172,25 @@ public void run() {
continue;
}
// below are all for hasNext == YES
batch = createBatch(entryStream);
readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
sleepMultiplier = 1;
WALEntryBatch batch = createBatch(entryStream);
boolean successAddToQueue = false;
try {
readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
successAddToQueue = true;
sleepMultiplier = 1;
} finally {
if (!successAddToQueue) {
// batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
// decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
// acquired in ReplicationSourceWALReader.acquireBufferQuota.
this.releaseBufferQuota(batch);
}
}
}
} catch (WALEntryFilterRetryableException e) {
// here we have to recreate the WALEntryStream, as when filtering, we have already called
Expand Down Expand Up @@ -212,7 +221,7 @@ protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad);

// Stop if too many entries or too big
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
Expand Down Expand Up @@ -430,13 +439,26 @@ private void updateReplicationMarkerEdit(Entry entry, long offset) {
* @param size delta size for grown buffer
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
walEntryBatch.incrementUsedBufferSize(size);
return newBufferUsed >= totalBufferQuota;
}

/**
* To release the buffer quota of {@link WALEntryBatch} which acquired by
* {@link ReplicationSourceWALReader#acquireBufferQuota}
*/
private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
long usedBufferSize = walEntryBatch.getUsedBufferSize();
if (usedBufferSize > 0) {
long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}

/** Returns whether the reader thread is running */
public boolean isReaderRunning() {
return isReaderRunning && !isInterrupted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class WALEntryBatch {
private Map<String, Long> lastSeqIds = new HashMap<>();
// indicate that this is the end of the current file
private boolean endOfFile;
// indicate the buffer size used, which is added to
// ReplicationSourceWALReader.totalBufferUsed
private long usedBufferSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have recorded the size here, I think we could use it directly in many places(especially in ReplicationSourceShipper), so we do not need to calculate the size of the WALEntryBatch again?

Anyway, can be a follow on issue.

Copy link
Contributor Author

@comnetwork comnetwork Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 ,yes, I plan to open a new PR to centralize the totalBufferUsed related code and eliminate duplicate code.


/**
* @param lastWalPath Path of the WAL the last entry in this batch was read from
Expand Down Expand Up @@ -153,11 +156,19 @@ public void setLastSeqId(String region, long sequenceId) {
lastSeqIds.put(region, sequenceId);
}

public void incrementUsedBufferSize(long increment) {
usedBufferSize += increment;
}

public long getUsedBufferSize() {
return this.usedBufferSize;
}

@Override
public String toString() {
return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath
+ ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles="
+ nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile="
+ endOfFile + "]";
+ endOfFile + ",usedBufferSize=" + usedBufferSize + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,14 @@ private ReplicationSource mockReplicationSource(boolean recovered, Configuration
when(source.isRecovered()).thenReturn(recovered);
MetricsReplicationGlobalSourceSource globalMetrics =
Mockito.mock(MetricsReplicationGlobalSourceSource.class);
final AtomicLong bufferUsedCounter = new AtomicLong(0);
Mockito.doAnswer((invocationOnMock) -> {
bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class));
return null;
}).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
when(globalMetrics.getWALReaderEditsBufferBytes())
.then(invocationOnMock -> bufferUsedCounter.get());

when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
}
Expand Down Expand Up @@ -791,4 +799,80 @@ public void testEOFExceptionInOldWALsDirectory() throws Exception {
Waiter.waitFor(localConf, 10000,
(Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
}

/**
* This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some
* entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be
* decreased because {@link WALEntryBatch} is not put to
* {@link ReplicationSourceWALReader#entryBatchQueue}.
*/
@Test
public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception {
appendEntriesToLogAndSync(3);
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
for (int i = 0; i < 3; i++) {
assertNotNull(next(entryStream));
}
position = entryStream.getPosition();
}

Path walPath = getQueue().peek();
int maxThrowExceptionCount = 3;

ReplicationSource source = mockReplicationSource(false, CONF);
when(source.isPeerEnabled()).thenReturn(true);
PartialWALEntryFailingWALEntryFilter walEntryFilter =
new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3);
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId);
reader.start();
WALEntryBatch entryBatch = reader.take();

assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
long sum = entryBatch.getWalEntries().stream()
.mapToLong(ReplicationSourceWALReader::getEntrySizeExcludeBulkLoad).sum();
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get());
assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount());
assertNull(reader.poll(10));
}

private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
private int filteredWALEntryCount = -1;
private int walEntryCount = 0;
private int throwExceptionCount = -1;
private int maxThrowExceptionCount;

public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) {
this.maxThrowExceptionCount = throwExceptionLimit;
this.walEntryCount = walEntryCount;
}

@Override
public Entry filter(Entry entry) {
filteredWALEntryCount++;
if (filteredWALEntryCount < walEntryCount - 1) {
return entry;
}

filteredWALEntryCount = -1;
throwExceptionCount++;
if (throwExceptionCount <= maxThrowExceptionCount - 1) {
throw new WALEntryFilterRetryableException("failing filter");
}
return entry;
}

public int getThrowExceptionCount() {
return throwExceptionCount;
}
}

}