Skip to content

Commit

Permalink
HBASE-26552 Introduce retry to logroller when encounters IOException
Browse files Browse the repository at this point in the history
  • Loading branch information
sunhelly committed Jan 28, 2022
1 parent f6348d4 commit fc69bc4
Showing 1 changed file with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread

protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";

protected static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms";

protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
protected final T abortable;
// Period to roll log.
private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private final long checkLowReplicationInterval;
// Wait period for roll log.
private final long rollWaitTimeout;

private volatile boolean running = true;

Expand Down Expand Up @@ -114,6 +118,7 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.checkLowReplicationInterval =
conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000);
}

/**
Expand Down Expand Up @@ -184,18 +189,38 @@ public void run() {
} else {
continue;
}
try {
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an collection of actual region and family names.
Map<byte[], List<byte[]>> regionsToFlush = controller.rollWal(now);
if (regionsToFlush != null) {
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
Map<byte[], List<byte[]>> regionsToFlush = null;
long startWaiting = EnvironmentEdgeManager.currentTime();
int nAttempts = 0;
do {
try {
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an collection of actual region and family names.
regionsToFlush = controller.rollWal(EnvironmentEdgeManager.currentTime());
break;
} catch (IOException ioe) {
if (ioe instanceof WALClosedException) {
LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", ioe);
iter.remove();
break;
}
long waitingTime = EnvironmentEdgeManager.currentTime() - startWaiting;
if (waitingTime < rollWaitTimeout) {
nAttempts++;
LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry,"
+ " last excepiton= {}", nAttempts, waitingTime,
ioe.getCause().getClass().getSimpleName());
sleep(1000);
} else {
LOG.error("Roll wal failed and waiting timeout, will not retry", ioe);
throw ioe;
}
}
} while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout);
if (regionsToFlush != null) {
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
}
} catch (WALClosedException e) {
LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e);
iter.remove();
}
}
} catch (FailedLogCloseException | ConnectException e) {
Expand Down

0 comments on commit fc69bc4

Please sign in to comment.