From 01e4427741093f02a328e34e6d050ac6e0dc470f Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Fri, 7 Aug 2020 12:59:17 -0400 Subject: [PATCH] HBASE-24779 Report on the WAL edit buffer usage/limit for replication Closes #2193 Signed-off-by: Bharath Vissapragada Signed-off-by: Sean Busbey Signed-off-by: Wellington Chevreuil (cherry picked from commit 303db63b7654db499923ac80c9be46e08a25b27b) Change-Id: If1b3662792304747d2942dbc52338e2108b1a764 --- .../MetricsReplicationGlobalSourceSource.java | 39 ++++++++++++++++ .../MetricsReplicationSourceFactory.java | 2 +- ...icsReplicationGlobalSourceSourceImpl.java} | 18 +++++++- .../MetricsReplicationSourceFactoryImpl.java | 4 +- .../MetricsReplicationSourceSourceImpl.java | 2 +- .../regionserver/MetricsSource.java | 19 +++++++- .../replication/regionserver/Replication.java | 7 ++- .../regionserver/ReplicationSource.java | 4 +- .../ReplicationSourceManager.java | 25 ++++++++++- .../ReplicationSourceWALReader.java | 11 +++-- .../replication/TestReplicationEndpoint.java | 45 +++++++++++++++++-- .../regionserver/TestWALEntryStream.java | 5 +++ 12 files changed, 163 insertions(+), 18 deletions(-) create mode 100644 hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java rename hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/{MetricsReplicationGlobalSourceSource.java => MetricsReplicationGlobalSourceSourceImpl.java} (93%) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java new file mode 100644 index 000000000000..e373a6c1349e --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource { + + public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage"; + + /** + * Sets the total usage of memory used by edits in memory read from WALs. The memory represented + * by this usage measure is across peers/sources. For example, we may batch the same WAL edits + * multiple times for the sake of replicating them to multiple peers.. + * @param usage The memory used by edits in bytes + */ + void setWALReaderEditsBufferBytes(long usage); + + /** + * Returns the size, in bytes, of edits held in memory to be replicated across all peers. + */ + long getWALReaderEditsBufferBytes(); +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java index 2816f832edf9..5e4ad27e0912 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java @@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory { public MetricsReplicationSinkSource getSink(); public MetricsReplicationSourceSource getSource(String id); public MetricsReplicationTableSource getTableSource(String tableName); - public MetricsReplicationSourceSource getGlobalSource(); + public MetricsReplicationGlobalSourceSource getGlobalSource(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java similarity index 93% rename from hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java rename to hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index 4e8c810138be..963abbac0ea4 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -24,7 +24,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ +public class MetricsReplicationGlobalSourceSourceImpl + implements MetricsReplicationGlobalSourceSource { private static final String KEY_PREFIX = "source."; private final MetricsReplicationSourceImpl rms; @@ -53,8 +54,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS private final MutableFastCounter completedWAL; private final MutableFastCounter completedRecoveryQueue; private final MutableFastCounter failedRecoveryQueue; + private final MutableGaugeLong walReaderBufferUsageBytes; - public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { + public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) { this.rms = rms; ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); @@ -92,6 +94,8 @@ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); failedRecoveryQueue = rms.getMetricsRegistry() .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); + walReaderBufferUsageBytes = rms.getMetricsRegistry() + .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L); } @Override public void setLastShippedAge(long age) { @@ -260,4 +264,14 @@ public String getMetricsJmxContext() { public String getMetricsName() { return rms.getMetricsName(); } + + @Override + public void setWALReaderEditsBufferBytes(long usage) { + this.walReaderBufferUsageBytes.set(usage); + } + + @Override + public long getWALReaderEditsBufferBytes() { + return this.walReaderBufferUsageBytes.value(); + } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java index a3b346200417..c0cd1c73e0c1 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java @@ -39,7 +39,7 @@ private static enum SourceHolder { return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName); } - @Override public MetricsReplicationSourceSource getGlobalSource() { - return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source); + @Override public MetricsReplicationGlobalSourceSource getGlobalSource() { + return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source); } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 0ad50524f152..79e9bc1aee47 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -162,7 +162,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri @Override public void incrShippedBytes(long size) { shippedBytesCounter.incr(size); - MetricsReplicationGlobalSourceSource + MetricsReplicationGlobalSourceSourceImpl .incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 45fe68c6cd4b..3344e08bbfee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -50,7 +50,7 @@ public class MetricsSource implements BaseSource { private String id; private final MetricsReplicationSourceSource singleSourceSource; - private final MetricsReplicationSourceSource globalSourceSource; + private final MetricsReplicationGlobalSourceSource globalSourceSource; private Map singleSourceSourceByTable; /** @@ -74,7 +74,7 @@ public MetricsSource(String id) { * @param globalSourceSource Class to monitor global-scoped metrics */ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, - MetricsReplicationSourceSource globalSourceSource, + MetricsReplicationGlobalSourceSource globalSourceSource, Map singleSourceSourceByTable) { this.id = id; this.singleSourceSource = singleSourceSource; @@ -422,4 +422,19 @@ public String getMetricsName() { public Map getSingleSourceSourceByTable() { return singleSourceSourceByTable; } + + /** + * Sets the amount of memory in bytes used in this RegionServer by edits pending replication. + */ + public void setWALReaderEditsBufferUsage(long usageInBytes) { + globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes); + } + + /** + * Returns the amount of memory in bytes used in this RegionServer by edits pending replication. + * @return + */ + public long getWALReaderEditsBufferUsage() { + return globalSourceSource.getWALReaderEditsBufferBytes(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 752cfb8a1427..bec904233ea6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; @@ -72,6 +73,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer private int statsThreadPeriod; // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; + private MetricsReplicationGlobalSourceSource globalMetricsSource; private PeerProcedureHandler peerProcedureHandler; @@ -119,9 +121,12 @@ public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } + this.globalMetricsSource = CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, - walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); + walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), + globalMetricsSource); if (walProvider != null) { walProvider .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 33eba6f7c984..c5f72c90bdb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -706,7 +706,9 @@ public void postShipEdits(List entries, int batchSize) { throttler.addPushSize(batchSize); } totalReplicatedEdits.addAndGet(entries.size()); - totalBufferUsed.addAndGet(-batchSize); + long newBufferUsed = totalBufferUsed.addAndGet(-batchSize); + // Record the new buffer usage + this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a6fc313f784c..2806110556e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -155,6 +155,9 @@ public class ReplicationSourceManager implements ReplicationListener { private AtomicLong totalBufferUsed = new AtomicLong(); + // Total buffer size on this RegionServer for holding batched edits to be shipped. + private final long totalBufferLimit; + private final MetricsReplicationGlobalSourceSource globalMetrics; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -171,7 +174,8 @@ public class ReplicationSourceManager implements ReplicationListener { public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, - WALFileLengthProvider walFileLengthProvider) throws IOException { + WALFileLengthProvider walFileLengthProvider, + MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { // CopyOnWriteArrayList is thread-safe. // Generally, reading is more than modifying. this.sources = new ConcurrentHashMap<>(); @@ -205,6 +209,9 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, this.latestPaths = new HashSet(); replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + this.globalMetrics = globalMetrics; } /** @@ -861,6 +868,14 @@ public AtomicLong getTotalBufferUsed() { return totalBufferUsed; } + /** + * Returns the maximum size in bytes of edits held in memory which are pending replication + * across all sources inside this RegionServer. + */ + public long getTotalBufferLimit() { + return totalBufferLimit; + } + /** * Get the directory where wals are archived * @return the directory where wals are archived @@ -898,6 +913,10 @@ public ReplicationPeers getReplicationPeers() { */ public String getStats() { StringBuilder stats = new StringBuilder(); + // Print stats that apply across all Replication Sources + stats.append("Global stats: "); + stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=") + .append(getTotalBufferLimit()).append("B\n"); for (ReplicationSourceInterface source : this.sources.values()) { stats.append("Normal source for cluster " + source.getPeerId() + ": "); stats.append(source.getStats() + "\n"); @@ -923,4 +942,8 @@ public void cleanUpHFileRefs(String peerId, List files) { int activeFailoverTaskCount() { return executor.getActiveCount(); } + + MetricsReplicationGlobalSourceSource getGlobalMetrics() { + return this.globalMetrics; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 85c081c996c2..328cb8f04f68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -104,8 +103,7 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, // the +1 is for the current thread reading before placing onto the queue int batchCount = conf.getInt("replication.source.nb.batches", 1); this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed(); - this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, - HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit(); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = @@ -272,6 +270,8 @@ public Path getCurrentPath() { private boolean checkQuota() { // try not to go over total quota if (totalBufferUsed.get() > totalBufferQuota) { + LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", + this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota); Threads.sleep(sleepForRetries); return false; } @@ -399,7 +399,10 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { * @return true if we should clear buffer and push all */ private boolean acquireBufferQuota(long size) { - return totalBufferUsed.addAndGet(size) >= totalBufferQuota; + long newBufferUsed = totalBufferUsed.addAndGet(size); + // Record the new buffer usage + this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed); + return newBufferUsed >= totalBufferQuota; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index f5fd94c10c9e..cc7e20cbc9b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource; import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl; @@ -330,9 +331,9 @@ public void testMetricsSourceBaseSourcePassThrough() { MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); - MetricsReplicationSourceSource globalSourceSource = - new MetricsReplicationGlobalSourceSource(globalRms); - MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource); + MetricsReplicationGlobalSourceSource globalSourceSource = + new MetricsReplicationGlobalSourceSourceImpl(globalRms); + MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource); doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue(); Map singleSourceSourceByTable = @@ -493,6 +494,44 @@ protected void doStop() { } } + /** + * Not used by unit tests, helpful for manual testing with replication. + *

+ * Snippet for `hbase shell`: + *

+   * create 't', 'f'
+   * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \
+   *    'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
+   * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1}
+   * 
+ */ + public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest { + private long duration; + public SleepingReplicationEndpointForTest() { + super(); + } + + @Override + public void init(Context context) throws IOException { + super.init(context); + if (this.ctx != null) { + duration = this.ctx.getConfiguration().getLong( + "hbase.test.sleep.replication.endpoint.duration.millis", 5000L); + } + } + + @Override + public boolean replicate(ReplicateContext context) { + try { + Thread.sleep(duration); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + return super.replicate(context); + } + } + public static class InterClusterReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index b4af38bfd622..067ab633baeb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -336,6 +336,8 @@ public void testEmptyStream() throws Exception { private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + when(mockSourceManager.getTotalBufferLimit()).thenReturn( + (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); Server mockServer = Mockito.mock(Server.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); @@ -343,6 +345,9 @@ private ReplicationSource mockReplicationSource(boolean recovered, Configuration when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); + MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock( + MetricsReplicationGlobalSourceSource.class); + when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); return source; }