Skip to content

Commit

Permalink
HBASE-24779 Report on the WAL edit buffer usage/limit for replication
Browse files Browse the repository at this point in the history
Closes #2193

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Sean Busbey <busbey@apache.org>
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
joshelser committed Aug 7, 2020
1 parent cb3dd99 commit 303db63
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource {

public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage";

/**
* Sets the total usage of memory used by edits in memory read from WALs. The memory represented
* by this usage measure is across peers/sources. For example, we may batch the same WAL edits
* multiple times for the sake of replicating them to multiple peers..
* @param usage The memory used by edits in bytes
*/
void setWALReaderEditsBufferBytes(long usage);

/**
* Returns the size, in bytes, of edits held in memory to be replicated across all peers.
*/
long getWALReaderEditsBufferBytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ public interface MetricsReplicationSourceFactory {
public MetricsReplicationSinkSource getSink();
public MetricsReplicationSourceSource getSource(String id);
public MetricsReplicationTableSource getTableSource(String tableName);
public MetricsReplicationSourceSource getGlobalSource();
public MetricsReplicationGlobalSourceSource getGlobalSource();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
public class MetricsReplicationGlobalSourceSourceImpl
implements MetricsReplicationGlobalSourceSource {
private static final String KEY_PREFIX = "source.";

private final MetricsReplicationSourceImpl rms;
Expand Down Expand Up @@ -53,8 +54,9 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue;
private final MutableGaugeLong walReaderBufferUsageBytes;

public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
this.rms = rms;

ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
Expand Down Expand Up @@ -92,6 +94,9 @@ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
failedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);

walReaderBufferUsageBytes = rms.getMetricsRegistry()
.getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
}

@Override public void setLastShippedAge(long age) {
Expand Down Expand Up @@ -142,7 +147,6 @@ static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCoun
}
}
}

@Override public void incrLogReadInBytes(long size) {
logReadInBytesCounter.incr(size);
}
Expand Down Expand Up @@ -275,4 +279,14 @@ public long getShippedOps() {
public long getEditsFiltered() {
return this.walEditsFilteredCounter.value();
}

@Override
public void setWALReaderEditsBufferBytes(long usage) {
this.walReaderBufferUsageBytes.set(usage);
}

@Override
public long getWALReaderEditsBufferBytes() {
return this.walReaderBufferUsageBytes.value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private static enum SourceHolder {
return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
}

@Override public MetricsReplicationSourceSource getGlobalSource() {
return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
@Override public MetricsReplicationGlobalSourceSource getGlobalSource() {
return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri

@Override public void incrShippedBytes(long size) {
shippedBytesCounter.incr(size);
MetricsReplicationGlobalSourceSource
MetricsReplicationGlobalSourceSourceImpl
.incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class MetricsSource implements BaseSource {
private long timeStampNextToReplicate;

private final MetricsReplicationSourceSource singleSourceSource;
private final MetricsReplicationSourceSource globalSourceSource;
private final MetricsReplicationGlobalSourceSource globalSourceSource;
private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;

/**
Expand All @@ -75,7 +75,7 @@ public MetricsSource(String id) {
* @param globalSourceSource Class to monitor global-scoped metrics
*/
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
MetricsReplicationSourceSource globalSourceSource,
MetricsReplicationGlobalSourceSource globalSourceSource,
Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
this.id = id;
this.singleSourceSource = singleSourceSource;
Expand Down Expand Up @@ -465,4 +465,19 @@ public String getMetricsName() {
public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
return singleSourceSourceByTable;
}

/**
* Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
*/
public void setWALReaderEditsBufferUsage(long usageInBytes) {
globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
}

/**
* Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
* @return
*/
public long getWALReaderEditsBufferUsage() {
return globalSourceSource.getWALReaderEditsBufferBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
private MetricsReplicationGlobalSourceSource globalMetricsSource;

private PeerProcedureHandler peerProcedureHandler;

Expand Down Expand Up @@ -119,9 +121,12 @@ public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
this.globalMetricsSource = CompatibilitySingletonFactory
.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
globalMetricsSource);
if (walProvider != null) {
walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,9 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
totalBufferUsed.addAndGet(-batchSize);
long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
// Record the new buffer usage
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public class ReplicationSourceManager implements ReplicationListener {


private AtomicLong totalBufferUsed = new AtomicLong();
// Total buffer size on this RegionServer for holding batched edits to be shipped.
private final long totalBufferLimit;
private final MetricsReplicationGlobalSourceSource globalMetrics;

/**
* Creates a replication manager and sets the watch on all the other registered region servers
Expand All @@ -171,7 +174,8 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider) throws IOException {
WALFileLengthProvider walFileLengthProvider,
MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
// CopyOnWriteArrayList is thread-safe.
// Generally, reading is more than modifying.
this.sources = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -205,6 +209,9 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
this.latestPaths = new HashSet<Path>();
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.globalMetrics = globalMetrics;
}

/**
Expand Down Expand Up @@ -879,6 +886,14 @@ public AtomicLong getTotalBufferUsed() {
return totalBufferUsed;
}

/**
* Returns the maximum size in bytes of edits held in memory which are pending replication
* across all sources inside this RegionServer.
*/
public long getTotalBufferLimit() {
return totalBufferLimit;
}

/**
* Get the directory where wals are archived
* @return the directory where wals are archived
Expand Down Expand Up @@ -916,6 +931,10 @@ public ReplicationPeers getReplicationPeers() {
*/
public String getStats() {
StringBuilder stats = new StringBuilder();
// Print stats that apply across all Replication Sources
stats.append("Global stats: ");
stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
.append(getTotalBufferLimit()).append("B\n");
for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
stats.append(source.getStats() + "\n");
Expand All @@ -941,4 +960,8 @@ public void cleanUpHFileRefs(String peerId, List<String> files) {
int activeFailoverTaskCount() {
return executor.getActiveCount();
}

MetricsReplicationGlobalSourceSource getGlobalMetrics() {
return this.globalMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
Expand Down Expand Up @@ -104,8 +103,7 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
Expand Down Expand Up @@ -275,6 +273,8 @@ public Path getCurrentPath() {
private boolean checkQuota() {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
Threads.sleep(sleepForRetries);
return false;
}
Expand Down Expand Up @@ -403,7 +403,10 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
return newBufferUsed >= totalBufferQuota;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
Expand Down Expand Up @@ -339,9 +340,9 @@ public void testMetricsSourceBaseSourcePassThrough() {

MetricsReplicationSourceSource singleSourceSource =
new MetricsReplicationSourceSourceImpl(singleRms, id);
MetricsReplicationSourceSource globalSourceSource =
new MetricsReplicationGlobalSourceSource(globalRms);
MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
MetricsReplicationGlobalSourceSource globalSourceSource =
new MetricsReplicationGlobalSourceSourceImpl(globalRms);
MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();

Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
Expand Down Expand Up @@ -507,6 +508,44 @@ public boolean canReplicateToSameCluster() {
}
}

/**
* Not used by unit tests, helpful for manual testing with replication.
* <p>
* Snippet for `hbase shell`:
* <pre>
* create 't', 'f'
* add_peer '1', ENDPOINT_CLASSNAME =&gt; 'org.apache.hadoop.hbase.replication.' + \
* 'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
* alter 't', {NAME=&gt;'f', REPLICATION_SCOPE=&gt;1}
* </pre>
*/
public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
private long duration;
public SleepingReplicationEndpointForTest() {
super();
}

@Override
public void init(Context context) throws IOException {
super.init(context);
if (this.ctx != null) {
duration = this.ctx.getConfiguration().getLong(
"hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
}
}

@Override
public boolean replicate(ReplicateContext context) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return super.replicate(context);
}
}

public static class InterClusterReplicationEndpointForTest
extends HBaseInterClusterReplicationEndpoint {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,18 @@ public void testWALKeySerialization() throws Exception {
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
when(mockSourceManager.getTotalBufferLimit()).thenReturn(
(long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
MetricsReplicationGlobalSourceSource.class);
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
}

Expand Down

0 comments on commit 303db63

Please sign in to comment.