Skip to content

Commit

Permalink
HBASE-25086 Refactor Replication: move the default ReplicationSinkSer…
Browse files Browse the repository at this point in the history
…vice implementation out (#2444)

Signed-off-by: meiyi <myimeiyi@gmail.com>
  • Loading branch information
infraio authored Sep 24, 2020
1 parent 56c7505 commit 8828643
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -981,10 +981,12 @@ public enum OperationStatusCode {
*/
public static final String
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
public static final String
REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.regionserver.Replication";
public static final String
REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl";
public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled";
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ public class HRegionServer extends Thread implements
// Replication services. If no replication, this handler will be null.
private ReplicationSourceService replicationSourceHandler;
private ReplicationSinkService replicationSinkHandler;
private boolean sameReplicationSourceAndSink;

// Compactions
public CompactSplit compactSplitThread;
Expand Down Expand Up @@ -1390,20 +1391,32 @@ private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, lon
serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
}
}
// for the replicationLoad purpose. Only need to get from one executorService
// either source or sink will get the same info
ReplicationSourceService rsources = getReplicationSourceService();

if (rsources != null) {
if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
// always refresh first to get the latest value
ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
if (rLoad != null) {
serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
for (ClusterStatusProtos.ReplicationLoadSource rLS :
rLoad.getReplicationLoadSourceEntries()) {
for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
.getReplicationLoadSourceEntries()) {
serverLoad.addReplLoadSource(rLS);
}

}
} else {
if (replicationSourceHandler != null) {
ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
if (rLoad != null) {
for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
.getReplicationLoadSourceEntries()) {
serverLoad.addReplLoadSource(rLS);
}
}
}
if (replicationSinkHandler != null) {
ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
if (rLoad != null) {
serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
}
}
}

Expand Down Expand Up @@ -1921,8 +1934,7 @@ private void setupWALAndReplication() throws IOException {
* Start up replication source and sink handlers.
*/
private void startReplicationService() throws IOException {
if (this.replicationSourceHandler == this.replicationSinkHandler &&
this.replicationSourceHandler != null) {
if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
} else {
if (this.replicationSourceHandler != null) {
Expand Down Expand Up @@ -2628,9 +2640,10 @@ protected void stopServiceThreads() {
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
}
if (this.executorService != null) this.executorService.shutdown();
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler == this.replicationSinkHandler) {
if (this.executorService != null) {
this.executorService.shutdown();
}
if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
this.replicationSourceHandler.stopReplicationService();
} else {
if (this.replicationSourceHandler != null) {
Expand Down Expand Up @@ -3070,19 +3083,21 @@ private static void createNewReplicationInstance(Configuration conf, HRegionServ

// read in the name of the sink replication class from the config file.
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);

// If both the sink and the source class names are the same, then instantiate
// only one object.
if (sourceClassname.equals(sinkClassname)) {
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
server.sameReplicationSourceAndSink = true;
} else {
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
server.replicationSinkHandler = newReplicationInstance(sinkClassname,
ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
server.sameReplicationSourceAndSink = false;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;

@InterfaceAudience.Private
public class ReplicationSinkServiceImpl implements ReplicationSinkService {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkServiceImpl.class);

private Configuration conf;

private Server server;

private ReplicationSink replicationSink;

// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;

private int statsPeriod;

@Override
public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells,
String replicationClusterId, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath) throws IOException {
this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
}

@Override
public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir,
WALProvider walProvider) throws IOException {
this.server = server;
this.conf = server.getConfiguration();
this.statsPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
this.replicationLoad = new ReplicationLoad();
}

@Override
public void startReplicationService() throws IOException {
this.replicationSink = new ReplicationSink(this.conf);
this.server.getChoreService().scheduleChore(
new ReplicationStatisticsChore("ReplicationSinkStatistics", server, statsPeriod));
}

@Override
public void stopReplicationService() {
if (this.replicationSink != null) {
this.replicationSink.stopReplicationSinkServices();
}
}

@Override
public ReplicationLoad refreshAndGetReplicationLoad() {
if (replicationLoad == null) {
return null;
}
// always build for latest data
replicationLoad.buildReplicationLoad(Collections.emptyList(), replicationSink.getSinkMetrics());
return replicationLoad;
}

private final class ReplicationStatisticsChore extends ScheduledChore {

ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
super(name, stopper, period);
}

@Override
protected void chore() {
printStats(replicationSink.getStats());
}

private void printStats(String stats) {
if (!stats.isEmpty()) {
LOG.info(stats);
}
}
}
}
Loading

0 comments on commit 8828643

Please sign in to comment.