Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport to branch-2, HBASE-26552 Introduce retry to logroller to avo… #4170

Merged
merged 2 commits into from
Mar 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,30 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread

protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";

/**
* Configure for the timeout of log rolling retry.
*/
protected static final String WAL_ROLL_WAIT_TIMEOUT =
"hbase.regionserver.logroll.wait.timeout.ms";

/**
* Configure for the max count of log rolling retry.
* The real retry count is also limited by the timeout of log rolling
* via {@link #WAL_ROLL_WAIT_TIMEOUT}
*/
protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries";

protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>();
protected final T abortable;
// Period to roll log.
private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private final long checkLowReplicationInterval;
// Wait period for roll log
private final long rollWaitTimeout;
// Max retry for roll log
private final int maxRollRetry;

private volatile boolean running = true;

Expand Down Expand Up @@ -113,6 +130,9 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.checkLowReplicationInterval =
conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000);
// retry rolling does not have to be the default behavior, so the default value is 0 here
this.maxRollRetry = conf.getInt(WAL_ROLL_RETRIES, 0);
}

/**
Expand Down Expand Up @@ -183,9 +203,29 @@ public void run() {
} else {
continue;
}
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an collection of actual region and family names.
Map<byte[], List<byte[]>> regionsToFlush = controller.rollWal(now);
Map<byte[], List<byte[]>> regionsToFlush = null;
int nAttempts = 0;
long startWaiting = System.currentTimeMillis();
do {
try {
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an collection of actual region and family names.
regionsToFlush = controller.rollWal(System.currentTimeMillis());
break;
} catch (IOException ioe) {
long waitingTime = System.currentTimeMillis() - startWaiting;
if (waitingTime < rollWaitTimeout && nAttempts < maxRollRetry) {
nAttempts++;
LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry,"
+ " last excepiton= {}", nAttempts, waitingTime,
ioe.getCause().getClass().getSimpleName());
sleep(1000);
} else {
LOG.error("Roll wal failed and waiting timeout, will not retry", ioe);
throw ioe;
}
}
} while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout);
if (regionsToFlush != null) {
for (Map.Entry<byte[], List<byte[]>> r : regionsToFlush.entrySet()) {
scheduleFlush(Bytes.toString(r.getKey()), r.getValue());
Expand Down