Skip to content

Commit

Permalink
HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… (
Browse files Browse the repository at this point in the history
#2546) (#2849)

Signed-off-by: Ankit Singhal <ankit@apache.org>
Signed-off-by: Josh Elser <elserj@apache.org>
  • Loading branch information
wchevreuil authored Jan 12, 2021
1 parent 51e8cf4 commit fdae12d
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();

for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
if(worker.entryReader != null) {
Expand All @@ -710,6 +711,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}

for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
Expand All @@ -728,6 +730,9 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
worker.entryReader.interrupt();
}
}
//If worker is already stopped but there was still entries batched,
//we need to clear buffer used for non processed entries
worker.clearWALEntryBatch();
}

if (join) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.LongAccumulator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -54,7 +56,7 @@ public enum WorkerState {
private final Configuration conf;
protected final String walGroupId;
protected final PriorityBlockingQueue<Path> queue;
private final ReplicationSourceInterface source;
private final ReplicationSource source;

// Last position in the log that we sent to ZooKeeper
// It will be accessed by the stats thread so make it volatile
Expand All @@ -74,7 +76,7 @@ public enum WorkerState {
private final int shipEditsTimeout;

public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
this.conf = conf;
this.walGroupId = walGroupId;
this.queue = queue;
Expand Down Expand Up @@ -125,6 +127,7 @@ public final void run() {
if (!isFinished()) {
setWorkerState(WorkerState.STOPPED);
} else {
source.workerThreads.remove(this.walGroupId);
postFinish();
}
}
Expand Down Expand Up @@ -330,4 +333,56 @@ public boolean sleepForRetries(String msg, int sleepMultiplier) {
}
return sleepMultiplier < maxRetriesMultiplier;
}

/**
* Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
* in case there were unprocessed entries batched by the reader to the shipper,
* but the shipper didn't manage to ship those because the replication source is being terminated.
* In that case, it iterates through the batched entries and decrease the pending
* entries size from <code>ReplicationSourceManager.totalBufferUser</code>
* <p/>
* <b>NOTES</b>
* 1) This method should only be called upon replication source termination.
* It blocks waiting for both shipper and reader threads termination,
* to make sure no race conditions
* when updating <code>ReplicationSourceManager.totalBufferUser</code>.
*
* 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
* have been triggered interruption/termination prior to calling this method.
*/
void clearWALEntryBatch() {
long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
while(this.isAlive() || this.entryReader.isAlive()){
try {
if (System.currentTimeMillis() >= timeout) {
LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
+ "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
return;
} else {
// Wait both shipper and reader threads to stop
Thread.sleep(this.sleepForRetries);
}
} catch (InterruptedException e) {
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
+ "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
return;
}
}
LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
entryReader.entryBatchQueue.forEach(w -> {
entryReader.entryBatchQueue.remove(w);
w.getWalEntries().forEach(e -> {
long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
});
});
if( LOG.isTraceEnabled()) {
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
}
long newBufferUsed = source.getSourceManager().getTotalBufferUsed()
.addAndGet(-totalToDecrement.longValue());
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread {
private final WALEntryFilter filter;
private final ReplicationSource source;

private final BlockingQueue<WALEntryBatch> entryBatchQueue;
@InterfaceAudience.Private
final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


import java.io.IOException;
import java.util.ArrayList;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -131,6 +133,8 @@ public void testDefaultSkipsMetaWAL() throws IOException {
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Mockito.when(manager.getGlobalMetrics()).
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
Expand Down Expand Up @@ -272,6 +276,47 @@ public void testTerminateTimeout() throws Exception {
}
}

@Test
public void testTerminateClearsBuffer() throws Exception {
ReplicationSource source = new ReplicationSource();
ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
MetricsReplicationGlobalSourceSource mockMetrics =
mock(MetricsReplicationGlobalSourceSource.class);
AtomicLong buffer = new AtomicLong();
Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
source.init(testConf, null, mockManager, null, mockPeer, null,
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source);
ReplicationSourceShipper shipper =
new ReplicationSourceShipper(conf, null, null, source);
shipper.entryReader = reader;
source.workerThreads.put("testPeer", shipper);
WALEntryBatch batch = new WALEntryBatch(10, logDir);
WAL.Entry mockEntry = mock(WAL.Entry.class);
WALEdit mockEdit = mock(WALEdit.class);
WALKeyImpl mockKey = mock(WALKeyImpl.class);
when(mockEntry.getEdit()).thenReturn(mockEdit);
when(mockEdit.isEmpty()).thenReturn(false);
when(mockEntry.getKey()).thenReturn(mockKey);
when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
when(mockEdit.heapSize()).thenReturn(10000L);
when(mockEdit.size()).thenReturn(0);
ArrayList<Cell> cells = new ArrayList<>();
KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
Bytes.toBytes("1"), Bytes.toBytes("v1"));
cells.add(kv);
when(mockEdit.getCells()).thenReturn(cells);
reader.addEntryToBatch(batch, mockEntry);
reader.entryBatchQueue.put(batch);
source.terminate("test");
assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
}

/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
Expand Down Expand Up @@ -471,6 +516,8 @@ private RegionServerServices setupForAbortTests(ReplicationSource rs, Configurat
when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Mockito.when(manager.getGlobalMetrics()).
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
Expand Down

0 comments on commit fdae12d

Please sign in to comment.