diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 14a753791da3..00bbc24dc247 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -272,10 +272,14 @@ public void incrLogReadInBytes(long readInBytes) { /** Removes all metrics about this Source. */ public void clear() { + terminate(); + singleSourceSource.clear(); + } + + public void terminate() { int lastQueueSize = singleSourceSource.getSizeOfLogQueue(); globalSourceSource.decrSizeOfLogQueue(lastQueueSize); singleSourceSource.decrSizeOfLogQueue(lastQueueSize); - singleSourceSource.clear(); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); lastShippedTimeStamps.clear(); lastHFileRefsQueueSize = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 99050cd8b646..74a430a7f382 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -706,10 +706,13 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool } } } - if (clearMetrics) { - // Can be null in test context. - if (this.metrics != null) { + + // Can be null in test context. + if (this.metrics != null) { + if (clearMetrics) { this.metrics.clear(); + } else { + this.metrics.terminate(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 74816074c783..04e306c219d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -467,7 +467,8 @@ public void refreshSources(String peerId) throws IOException { ReplicationSourceInterface toRemove = this.sources.remove(peerId); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); - toRemove.terminate(terminateMessage, null, true); + // Do not clear metrics + toRemove.terminate(terminateMessage, null, false); } src = createSource(peerId, peer); this.sources.put(peerId, src); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index c27b77aeb0c9..aa31e44b8885 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -94,6 +94,8 @@ public void terminate(String reason, Exception e) { public void terminate(String reason, Exception e, boolean clearMetrics) { if (clearMetrics) { this.metrics.clear(); + } else { + this.metrics.terminate(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 82856e550d48..dd989293ff5d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -572,6 +572,41 @@ public void testRemovePeerMetricsCleanup() throws Exception { } } + @Test + public void testDisablePeerMetricsCleanup() throws Exception { + final String peerId = "DummyPeer"; + final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build(); + try { + MetricsReplicationSourceSource globalSource = getGlobalSource(); + final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); + final long sizeOfLatestPath = getSizeOfLatestPath(); + addPeerAndWait(peerId, peerConfig, true); + assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + ReplicationSourceInterface source = manager.getSource(peerId); + // Sanity check + assertNotNull(source); + final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); + // Enqueue log and check if metrics updated + source.enqueueLog(new Path("abc")); + assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, + globalSource.getSizeOfLogQueue()); + + // Refreshing the peer should decrement the global and single source metrics + manager.refreshSources(peerId); + assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + + source = manager.getSource(peerId); + assertNotNull(source); + assertEquals(sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, + globalSource.getSizeOfLogQueue()); + } finally { + removePeerAndWait(peerId); + } + } + private ReplicationSourceInterface mockReplicationSource(String peerId) { ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); when(source.getPeerId()).thenReturn(peerId);