Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24877 Add option to avoid aborting RS process upon uncaught exceptions happen on replication source #2399

Merged
merged 6 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -121,6 +123,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
// ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint;

private boolean abortOnError;
//This is needed for the startup loop to identify when there's already
//an initialization happening (but not finished yet),
//so that it doesn't try submit another initialize thread.
//NOTE: this should only be set to false at the end of initialize method, prior to return.
private AtomicBoolean startupOngoing = new AtomicBoolean(false);
//Flag that signalizes uncaught error happening while starting up the source
//and a retry should be attempted
private AtomicBoolean retryStartup = new AtomicBoolean(false);

/**
* A filter (or a chain of filters) for WAL entries; filters out edits.
*/
Expand All @@ -131,6 +143,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
private long defaultBandwidth;
private long currentBandwidth;
private WALFileLengthProvider walFileLengthProvider;
@VisibleForTesting
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
new ConcurrentHashMap<>();

Expand Down Expand Up @@ -219,6 +232,10 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;

this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
true);

LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
}
Expand All @@ -244,15 +261,17 @@ public void enqueueLog(Path wal) {
new AbstractFSWALProvider.WALStartTimeComparator());
// make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
// the shipper may quit immediately
queue.put(wal);
queues.put(walPrefix, queue);
if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that wal enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
tryStartNewShipper(walPrefix, queue);
}
} else {
queue.put(wal);
}
queue.put(wal);
if (LOG.isTraceEnabled()) {
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
this.replicationQueueInfo.getQueueId());
Expand Down Expand Up @@ -357,19 +376,30 @@ private void initializeWALEntryFilter(UUID peerClusterId) {
}

private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId);
} else {
LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
worker.setWALReader(walReader);
worker.startup(this::uncaughtException);
}
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"{} Someone has beat us to start a worker thread for wal group {}",
logPeerId(), key);
}
return value;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
}
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
(t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
worker.setWALReader(walReader);
worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
return worker;
}
});
}

@Override
Expand Down Expand Up @@ -440,11 +470,28 @@ WALEntryFilter getWalEntryFilter() {
return walEntryFilter;
}

protected final void uncaughtException(Thread t, Throwable e) {
protected final void uncaughtException(Thread t, Throwable e,
ReplicationSourceManager manager, String peerId) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in {} currentPath={}",
t.getName(), getCurrentPath(), e);
server.abort("Unexpected exception in " + t.getName(), e);
if(abortOnError){
server.abort("Unexpected exception in " + t.getName(), e);
}
if(manager != null){
while (true) {
try {
LOG.info("Refreshing replication sources now due to previous error on thread: {}",
t.getName());
manager.refreshSources(peerId);
break;
} catch (IOException e1) {
LOG.error("Replication sources refresh failed.", e1);
sleepForRetries("Sleeping before try refreshing sources again",
maxRetriesMultiplier);
}
}
}
}

@Override
Expand Down Expand Up @@ -543,12 +590,18 @@ private void initialize() {
replicationEndpoint.stop();
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
} else {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false);
throw new RuntimeException("Exhausted retries to start replication endpoint.");
}
}
}

if (!this.isSourceActive()) {
return;
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false);
throw new IllegalStateException("Source should be active.");
}

sleepMultiplier = 1;
Expand All @@ -569,8 +622,10 @@ private void initialize() {
}
}

if (!this.isSourceActive()) {
return;
if(!this.isSourceActive()) {
retryStartup.set(!this.abortOnError);
this.startupOngoing.set(false);
throw new IllegalStateException("Source should be active.");
}
LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
Expand All @@ -582,6 +637,7 @@ private void initialize() {
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
this.startupOngoing.set(false);
}

@Override
Expand All @@ -591,10 +647,32 @@ public void startup() {
}
// Mark we are running now
this.sourceRunning = true;
startupOngoing.set(true);
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
this::uncaughtException);
(t,e) -> {
//if first initialization attempt failed, and abortOnError is false, we will
//keep looping in this thread until initialize eventually succeeds,
//while the server main startup one can go on with its work.
sourceRunning = false;
uncaughtException(t, e, null, null);
retryStartup.set(!this.abortOnError);
do {
if(retryStartup.get()) {
this.sourceRunning = true;
startupOngoing.set(true);
retryStartup.set(false);
try {
initialize();
} catch(Throwable error){
sourceRunning = false;
uncaughtException(t, error, null, null);
retryStartup.set(!this.abortOnError);
}
}
} while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ private boolean updateLogPosition(WALEntryBatch batch) {
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this,
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
handler::uncaughtException);
}

Path getCurrentPath() {
Expand Down
Loading