Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… #2546

Merged
merged 4 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();

for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
if(worker.entryReader != null) {
Expand All @@ -694,6 +695,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}

for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
Expand All @@ -712,6 +714,9 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
worker.entryReader.interrupt();
}
}
//If worker is already stopped but there was still entries batched,
//we need to clear buffer used for non processed entries
worker.clearWALEntryBatch();
}

if (join) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.LongAccumulator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -325,4 +327,56 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}

/**
* Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
* in case there were unprocessed entries batched by the reader to the shipper,
* but the shipper didn't manage to ship those because the replication source is being terminated.
* In that case, it iterates through the batched entries and decrease the pending
* entries size from <code>ReplicationSourceManager.totalBufferUser</code>
* <p/>
* <b>NOTES</b>
* 1) This method should only be called upon replication source termination.
* It blocks waiting for both shipper and reader threads termination,
* to make sure no race conditions
* when updating <code>ReplicationSourceManager.totalBufferUser</code>.
*
* 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
* have been triggered interruption/termination prior to calling this method.
*/
void clearWALEntryBatch() {
long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
while(this.isAlive() || this.entryReader.isAlive()){
try {
if (System.currentTimeMillis() >= timeout) {
LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
+ "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
return;
} else {
// Wait both shipper and reader threads to stop
Thread.sleep(this.sleepForRetries);
}
} catch (InterruptedException e) {
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
+ "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please restore interrupt flag here (Thread.currentThread().interrupt();) and then return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do any handling of interrupt at ReplicationSource. Would you still think we need this here?

return;
}
}
LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
entryReader.entryBatchQueue.forEach(w -> {
entryReader.entryBatchQueue.remove(w);
ankitsinghal marked this conversation as resolved.
Show resolved Hide resolved
w.getWalEntries().forEach(e -> {
long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e);
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
});
});
if( LOG.isTraceEnabled()) {
LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
}
long newBufferUsed = source.getSourceManager().getTotalBufferUsed()
.addAndGet(-totalToDecrement.longValue());
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread {
private final WALEntryFilter filter;
private final ReplicationSource source;

private final BlockingQueue<WALEntryBatch> entryBatchQueue;
@InterfaceAudience.Private
final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -128,6 +131,8 @@ public void testDefaultSkipsMetaWAL() throws IOException {
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Mockito.when(manager.getGlobalMetrics()).
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
Expand Down Expand Up @@ -269,6 +274,47 @@ public void testTerminateTimeout() throws Exception {
}
}

@Test
public void testTerminateClearsBuffer() throws Exception {
ReplicationSource source = new ReplicationSource();
ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
MetricsReplicationGlobalSourceSource mockMetrics =
mock(MetricsReplicationGlobalSourceSource.class);
AtomicLong buffer = new AtomicLong();
Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
source.init(testConf, null, mockManager, null, mockPeer, null,
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source);
ReplicationSourceShipper shipper =
new ReplicationSourceShipper(conf, null, null, source);
shipper.entryReader = reader;
source.workerThreads.put("testPeer", shipper);
WALEntryBatch batch = new WALEntryBatch(10, logDir);
WAL.Entry mockEntry = mock(WAL.Entry.class);
WALEdit mockEdit = mock(WALEdit.class);
WALKeyImpl mockKey = mock(WALKeyImpl.class);
when(mockEntry.getEdit()).thenReturn(mockEdit);
when(mockEdit.isEmpty()).thenReturn(false);
when(mockEntry.getKey()).thenReturn(mockKey);
when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
when(mockEdit.heapSize()).thenReturn(10000L);
when(mockEdit.size()).thenReturn(0);
ArrayList<Cell> cells = new ArrayList<>();
KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
Bytes.toBytes("1"), Bytes.toBytes("v1"));
cells.add(kv);
when(mockEdit.getCells()).thenReturn(cells);
reader.addEntryToBatch(batch, mockEntry);
reader.entryBatchQueue.put(batch);
source.terminate("test");
assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
}

/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
Expand Down Expand Up @@ -438,12 +484,12 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
queue.put(new Path("/www/html/test"));
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
Server server = Mockito.mock(Server.class);
RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
Server server = mock(Server.class);
Mockito.when(server.getServerName()).thenReturn(serverName);
Mockito.when(source.getServer()).thenReturn(server);
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
.thenReturn(1001L);
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
Expand All @@ -468,6 +514,8 @@ private RegionServerServices setupForAbortTests(ReplicationSource rs, Configurat
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
Mockito.when(manager.getGlobalMetrics()).
thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
Expand Down