Skip to content

Commit

Permalink
HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible (
Browse files Browse the repository at this point in the history
…#1027)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Josh Elser <elserj@apache.org>
Signed-off-by: binlijin <binlijin@gmail.com>
  • Loading branch information
wchevreuil authored Jan 15, 2020
1 parent d60ce17 commit cb78b10
Showing 1 changed file with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,25 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private boolean dropOnDeletedTables;
private boolean isSerial = false;

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* Connection implementations, or initialize it in a different way, so defining createConnection
* as protected for possible overridings.
*/
protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
return ClusterConnectionFactory.createAsyncClusterConnection(conf,
null, User.getCurrent());
}

/*
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* ReplicationSinkManager implementations, or initialize it in a different way,
* so defining createReplicationSinkManager as protected for possible overridings.
*/
protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) {
return new ReplicationSinkManager(conn, this, this.conf);
}

@Override
public void init(Context context) throws IOException {
super.init(context);
Expand All @@ -131,13 +150,12 @@ public void init(Context context) throws IOException {
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn =
ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
this.conn = createConnection(this.conf);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf);
this.replicationSinkMgr = createReplicationSinkManager(conn);
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
Expand Down

0 comments on commit cb78b10

Please sign in to comment.