Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23293 [REPLICATION] make ship edits timeout configurable #825

Merged
merged 1 commit into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,10 @@ public enum OperationStatusCode {

public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;

/** Configuration key for ReplicationSource shipeEdits timeout */
public static final String REPLICATION_SOURCE_SHIPEDITS_TIMEOUT =
"replication.source.shipedits.timeout";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually the configuration key is started with "hbase."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since most of the conf keys for ReplicationSource and ReplicationSink are prefixed with replicaiton, I think this is fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, keep the existing naming style firstly

public static final int REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT = 60000;

/**
* Directory where the source cluster file system client configuration are placed which is used by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,18 @@ public static boolean sleepForRetries(String msg, long sleepForRetries, int slee
}
return sleepMultiplier < maxRetriesMultiplier;
}

/**
* Get the adaptive timeout value when performing a retry
*/
public static int getAdaptiveTimeout(final int initialValue, final int retries) {
int ntries = retries;
if (ntries >= HConstants.RETRY_BACKOFF.length) {
ntries = HConstants.RETRY_BACKOFF.length - 1;
}
if (ntries < 0) {
ntries = 0;
}
return initialValue * HConstants.RETRY_BACKOFF[ntries];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ public CompletableFuture<CompactRegionResponse> compactRegion(CompactRegionReque
}

public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
ReplicateWALEntryRequest request, CellScanner cellScanner) {
return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done),
cellScanner);
ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
return call((stub, controller, done) -> {
controller.setCallTimeout(timeout);
stub.replicateWALEntry(controller, request, done);
}, cellScanner);
}

public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ public class ReplicationProtbufUtil {
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
*/
public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
throws IOException {
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
int timeout) throws IOException {
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ static class ReplicateContext {
List<Entry> entries;
int size;
String walGroupId;
int timeout;
@InterfaceAudience.Private
public ReplicateContext() {
}
Expand All @@ -186,6 +187,12 @@ public int getSize() {
public String getWalGroupId(){
return walGroupId;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getTimeout() {
return this.timeout;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext
replicateContext.getSize());
}
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
pool.submit(createReplicator(entries, i));
pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
futures++;
}
}
Expand Down Expand Up @@ -467,7 +467,8 @@ protected void doStop() {
}

@VisibleForTesting
protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
throws IOException {
SinkPeer sinkPeer = null;
try {
int entriesHashCode = System.identityHashCode(entries);
Expand All @@ -481,7 +482,7 @@ protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOExc
try {
ReplicationProtbufUtil.replicateWALEntry(rsAdmin,
entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
hfileArchiveDir);
hfileArchiveDir, timeout);
if (LOG.isTraceEnabled()) {
LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
}
Expand All @@ -501,30 +502,30 @@ protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOExc
return batchIndex;
}

private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex)
private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
throws IOException {
int batchSize = 0, index = 0;
List<Entry> batch = new ArrayList<>();
for (Entry entry : entries) {
int entrySize = getEstimatedEntrySize(entry);
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
replicateEntries(batch, index++);
replicateEntries(batch, index++, timeout);
batch.clear();
batchSize = 0;
}
batch.add(entry);
batchSize += entrySize;
}
if (batchSize > 0) {
replicateEntries(batch, index);
replicateEntries(batch, index, timeout);
}
return batchIndex;
}

@VisibleForTesting
protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) {
return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
: () -> replicateEntries(entries, batchIndex);
protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
: () -> replicateEntries(entries, batchIndex, timeout);
}

private String logPeerId(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;

import java.io.IOException;
Expand All @@ -26,6 +27,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.util.Threads;
Expand Down Expand Up @@ -73,6 +75,7 @@ public enum WorkerState {
protected final int maxRetriesMultiplier;
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
private final int shipEditsTimeout;

public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
Expand All @@ -86,6 +89,8 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId,
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.getEntriesTimeout =
this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
}

@Override
Expand Down Expand Up @@ -186,6 +191,7 @@ private void shipEdits(WALEntryBatch entryBatch) {
new ReplicationEndpoint.ReplicateContext();
replicateContext.setEntries(entries).setSize(currentSize);
replicateContext.setWalGroupId(walGroupId);
replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));

long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,13 @@ protected final void verifyReplicationRequestRejection(HBaseTestingUtility utili
}
if (!expectedRejection) {
ReplicationProtbufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
} else {
try {
ReplicationProtbufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
fail("Should throw IOException when sync-replication state is in A or DA");
} catch (RemoteException e) {
assertRejection(e.unwrapRemoteException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public boolean replicate(ReplicateContext replicateContext) {
}

@Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
// Fail only once, we don't want to slow down the test.
if (failedOnce) {
return () -> ordinal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ public boolean replicate(ReplicateContext replicateContext) {
}

@Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
return () -> {
int batchIndex = replicateEntries(entries, ordinal);
int batchIndex = replicateEntries(entries, ordinal, timeout);
entriesCount += entries.size();
int count = batchCount.incrementAndGet();
LOG.info(
Expand All @@ -244,10 +244,10 @@ public static class FailureInjectingReplicationEndpointForTest
private final AtomicBoolean failNext = new AtomicBoolean(false);

@Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
return () -> {
if (failNext.compareAndSet(false, true)) {
int batchIndex = replicateEntries(entries, ordinal);
int batchIndex = replicateEntries(entries, ordinal, timeout);
entriesCount += entries.size();
int count = batchCount.incrementAndGet();
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public boolean canReplicateToSameCluster() {
}

@Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
return () -> {
entryQueue.addAll(entries);
return ordinal;
Expand Down