Skip to content

Commit

Permalink
HBASE-24877 Add option to avoid aborting RS process upon uncaught exc…
Browse files Browse the repository at this point in the history
…eptions happen on replication source (apache#2399)

Signed-off-by: stack <stack@apache.org>
  • Loading branch information
wchevreuil authored and clarax committed Nov 15, 2020
1 parent 131aa8f commit 7d2edc7
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -121,6 +123,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
// ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint;

private boolean abortOnError;
//This is needed for the startup loop to identify when there's already
//an initialization happening (but not finished yet),
//so that it doesn't try submit another initialize thread.
//NOTE: this should only be set to false at the end of initialize method, prior to return.
private AtomicBoolean startupOngoing = new AtomicBoolean(false);
//Flag that signalizes uncaught error happening while starting up the source
//and a retry should be attempted
private AtomicBoolean retryStartup = new AtomicBoolean(false);

/**
* A filter (or a chain of filters) for WAL entries; filters out edits.
*/
Expand All @@ -131,6 +143,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
private long defaultBandwidth;
private long currentBandwidth;
private WALFileLengthProvider walFileLengthProvider;
@VisibleForTesting
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();

Expand Down Expand Up @@ -219,6 +232,10 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;

this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
true);

LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
}
Expand All @@ -244,15 +261,17 @@ public void enqueueLog(Path wal) {
new AbstractFSWALProvider.WALStartTimeComparator());
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
// the shipper may quit immediately
queue.put(wal);
queues.put(walPrefix, queue);
if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that wal enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipper(walPrefix, queue);
}
} else {
queue.put(wal);
}
queue.put(wal);
if (LOG.isTraceEnabled()) {
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
this.replicationQueueInfo.getQueueId());
Expand Down Expand Up @@ -357,19 +376,30 @@ private void initializeWALEntryFilter(UUID peerClusterId) {
}

private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId);
} else {
LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
worker.setWALReader(walReader);
worker.startup(this::uncaughtException);
}
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"{} Someone has beat us to start a worker thread for wal group {}",
logPeerId(), key);
}
return value;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
}
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
(t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
worker.setWALReader(walReader);
worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
return worker;
}
});
}

@Override
Expand Down Expand Up @@ -440,11 +470,28 @@ WALEntryFilter getWalEntryFilter() {
return walEntryFilter;
}

protected final void uncaughtException(Thread t, Throwable e) {
protected final void uncaughtException(Thread t, Throwable e,
ReplicationSourceManager manager, String peerId) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in {} currentPath={}",
t.getName(), getCurrentPath(), e);
server.abort("Unexpected exception in " + t.getName(), e);
if(abortOnError){
server.abort("Unexpected exception in " + t.getName(), e);
}
if(manager != null){
while (true) {
try {
LOG.info("Refreshing replication sources now due to previous error on thread: {}",
t.getName());
manager.refreshSources(peerId);
break;
} catch (IOException e1) {
LOG.error("Replication sources refresh failed.", e1);
sleepForRetries("Sleeping before try refreshing sources again",
maxRetriesMultiplier);
}
}
}
}

@Override
Expand Down Expand Up @@ -543,12 +590,18 @@ private void initialize() {
replicationEndpoint.stop();
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
} else {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false);
throw new RuntimeException("Exhausted retries to start replication endpoint.");
}
}
}

if (!this.isSourceActive()) {
return;
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false);
throw new IllegalStateException("Source should be active.");
}

sleepMultiplier = 1;
Expand All @@ -569,8 +622,10 @@ private void initialize() {
}
}

if (!this.isSourceActive()) {
return;
if(!this.isSourceActive()) {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false);
throw new IllegalStateException("Source should be active.");
}
LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
Expand All @@ -582,6 +637,7 @@ private void initialize() {
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
this.startupOngoing.set(false);
}

@Override
Expand All @@ -591,10 +647,32 @@ public void startup() {
}
// Mark we are running now
this.sourceRunning = true;
startupOngoing.set(true);
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
this::uncaughtException);
(t,e) -> {
//if first initialization attempt failed, and abortOnError is false, we will
//keep looping in this thread until initialize eventually succeeds,
//while the server main startup one can go on with its work.
sourceRunning = false;
uncaughtException(t, e, null, null);
retryStartup.set(!this.abortOnError);
do {
if(retryStartup.get()) {
this.sourceRunning = true;
startupOngoing.set(true);
retryStartup.set(false);
try {
initialize();
} catch(Throwable error){
sourceRunning = false;
uncaughtException(t, error, null, null);
retryStartup.set(!this.abortOnError);
}
}
} while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ private boolean updateLogPosition(WALEntryBatch batch) {
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this,
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
handler::uncaughtException);
}

Path getCurrentPath() {
Expand Down
Loading

0 comments on commit 7d2edc7

Please sign in to comment.