Skip to content

Commit

Permalink
HBASE-23293 [REPLICATION] make ship edits timeout configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxu14 committed Nov 14, 2019
1 parent 1d6f50f commit 5aa2682
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,10 @@ public enum OperationStatusCode {

public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;

/** Configuration key for ReplicationSource shipeEdits timeout */
public static final String REPLICATION_SOURCE_SHIPEDITS_TIMEOUT =
"replication.source.shipedits.timeout";
public static final int REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT = 60000;

/**
* Directory where the source cluster file system client configuration are placed which is used by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ public CompletableFuture<CompactRegionResponse> compactRegion(CompactRegionReque
}

public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
ReplicateWALEntryRequest request, CellScanner cellScanner) {
return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done),
cellScanner);
ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
return call((stub, controller, done) -> {
controller.setCallTimeout(timeout);
stub.replicateWALEntry(controller, request, done);
}, cellScanner);
}

public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ public class ReplicationProtbufUtil {
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
*/
public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
throws IOException {
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
int timeout) throws IOException {
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private Abortable abortable;
private boolean dropOnDeletedTables;
private boolean isSerial = false;
private int shipEditsTimeout;

@Override
public void init(Context context) throws IOException {
Expand All @@ -135,6 +136,8 @@ public void init(Context context) throws IOException {
ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf);
Expand Down Expand Up @@ -481,7 +484,7 @@ protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOExc
try {
ReplicationProtbufUtil.replicateWALEntry(rsAdmin,
entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
hfileArchiveDir);
hfileArchiveDir, shipEditsTimeout);
if (LOG.isTraceEnabled()) {
LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,13 @@ protected final void verifyReplicationRequestRejection(HBaseTestingUtility utili
}
if (!expectedRejection) {
ReplicationProtbufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
} else {
try {
ReplicationProtbufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
fail("Should throw IOException when sync-replication state is in A or DA");
} catch (RemoteException e) {
assertRejection(e.unwrapRemoteException());
Expand Down

0 comments on commit 5aa2682

Please sign in to comment.