diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 8091d0ce71f8..eb6bf2a4b0bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -684,6 +684,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, Threads.shutdown(initThread, this.sleepForRetries); } Collection workers = workerThreads.values(); + for (ReplicationSourceShipper worker : workers) { worker.stopWorker(); if(worker.entryReader != null) { @@ -694,6 +695,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, if (this.replicationEndpoint != null) { this.replicationEndpoint.stop(); } + for (ReplicationSourceShipper worker : workers) { if (worker.isAlive() || worker.entryReader.isAlive()) { try { @@ -712,6 +714,9 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, worker.entryReader.interrupt(); } } + //If worker is already stopped but there was still entries batched, + //we need to clear buffer used for non processed entries + worker.clearWALEntryBatch(); } if (join) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 45eb91c2e72e..3f43eb199418 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.LongAccumulator; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -325,4 +327,56 @@ void stopWorker() { public boolean isFinished() { return state == WorkerState.FINISHED; } + + /** + * Attempts to properly update ReplicationSourceManager.totalBufferUser, + * in case there were unprocessed entries batched by the reader to the shipper, + * but the shipper didn't manage to ship those because the replication source is being terminated. + * In that case, it iterates through the batched entries and decrease the pending + * entries size from ReplicationSourceManager.totalBufferUser + *

+ * NOTES + * 1) This method should only be called upon replication source termination. + * It blocks waiting for both shipper and reader threads termination, + * to make sure no race conditions + * when updating ReplicationSourceManager.totalBufferUser. + * + * 2) It does not attempt to terminate reader and shipper threads. Those must + * have been triggered interruption/termination prior to calling this method. + */ + void clearWALEntryBatch() { + long timeout = System.currentTimeMillis() + this.shipEditsTimeout; + while(this.isAlive() || this.entryReader.isAlive()){ + try { + if (System.currentTimeMillis() >= timeout) { + LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper " + + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}", + this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive()); + return; + } else { + // Wait both shipper and reader threads to stop + Thread.sleep(this.sleepForRetries); + } + } catch (InterruptedException e) { + LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " + + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e); + return; + } + } + LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0); + entryReader.entryBatchQueue.forEach(w -> { + entryReader.entryBatchQueue.remove(w); + w.getWalEntries().forEach(e -> { + long entrySizeExcludeBulkLoad = ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(e); + totalToDecrement.accumulate(entrySizeExcludeBulkLoad); + }); + }); + if( LOG.isTraceEnabled()) { + LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", + totalToDecrement.longValue()); + } + long newBufferUsed = source.getSourceManager().getTotalBufferUsed() + .addAndGet(-totalToDecrement.longValue()); + source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index c71db1bf785b..a6d87870b495 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread { private final WALEntryFilter filter; private final ReplicationSource source; - private final BlockingQueue entryBatchQueue; + @InterfaceAudience.Private + final BlockingQueue entryBatchQueue; // max (heap) size of each batch - multiply by number of batches in queue to get total private final long replicationBatchSizeCapacity; // max count of each batch - multiply by number of batches in queue to get total diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 14558926a464..fd41306dc746 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -22,7 +22,10 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.ArrayList; import java.util.OptionalLong; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -128,6 +131,8 @@ public void testDefaultSkipsMetaWAL() throws IOException { Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + Mockito.when(manager.getGlobalMetrics()). + thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); @@ -269,6 +274,47 @@ public void testTerminateTimeout() throws Exception { } } + @Test + public void testTerminateClearsBuffer() throws Exception { + ReplicationSource source = new ReplicationSource(); + ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class); + MetricsReplicationGlobalSourceSource mockMetrics = + mock(MetricsReplicationGlobalSourceSource.class); + AtomicLong buffer = new AtomicLong(); + Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer); + Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics); + ReplicationPeer mockPeer = mock(ReplicationPeer.class); + Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L); + Configuration testConf = HBaseConfiguration.create(); + source.init(testConf, null, mockManager, null, mockPeer, null, + "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class)); + ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null, + conf, null, 0, null, source); + ReplicationSourceShipper shipper = + new ReplicationSourceShipper(conf, null, null, source); + shipper.entryReader = reader; + source.workerThreads.put("testPeer", shipper); + WALEntryBatch batch = new WALEntryBatch(10, logDir); + WAL.Entry mockEntry = mock(WAL.Entry.class); + WALEdit mockEdit = mock(WALEdit.class); + WALKeyImpl mockKey = mock(WALKeyImpl.class); + when(mockEntry.getEdit()).thenReturn(mockEdit); + when(mockEdit.isEmpty()).thenReturn(false); + when(mockEntry.getKey()).thenReturn(mockKey); + when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L); + when(mockEdit.heapSize()).thenReturn(10000L); + when(mockEdit.size()).thenReturn(0); + ArrayList cells = new ArrayList<>(); + KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"), + Bytes.toBytes("1"), Bytes.toBytes("v1")); + cells.add(kv); + when(mockEdit.getCells()).thenReturn(cells); + reader.addEntryToBatch(batch, mockEntry); + reader.entryBatchQueue.put(batch); + source.terminate("test"); + assertEquals(0, source.getSourceManager().getTotalBufferUsed().get()); + } + /** * Tests that recovered queues are preserved on a regionserver shutdown. * See HBASE-18192 @@ -438,12 +484,12 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); queue.put(new Path("/www/html/test")); - RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); - Server server = Mockito.mock(Server.class); + RecoveredReplicationSource source = mock(RecoveredReplicationSource.class); + Server server = mock(Server.class); Mockito.when(server.getServerName()).thenReturn(serverName); Mockito.when(source.getServer()).thenReturn(server); Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer); - ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class); + ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class); Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any())) .thenReturn(1001L); Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any())) @@ -468,6 +514,8 @@ private RegionServerServices setupForAbortTests(ReplicationSource rs, Configurat Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + Mockito.when(manager.getGlobalMetrics()). + thenReturn(mock(MetricsReplicationGlobalSourceSource.class)); String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));