diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index 3ad1c5cd17ca..662a5ca1fc16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -60,6 +60,8 @@ public abstract class AbstractWALRoller extends Thread protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; + protected static final String WAL_ROLL_WAIT_TIMEOUT = "hbase.regionserver.logroll.wait.timeout.ms"; + protected final ConcurrentMap wals = new ConcurrentHashMap<>(); protected final T abortable; // Period to roll log. @@ -67,6 +69,8 @@ public abstract class AbstractWALRoller extends Thread private final int threadWakeFrequency; // The interval to check low replication on hlog's pipeline private final long checkLowReplicationInterval; + // Wait period for roll log. + private final long rollWaitTimeout; private volatile boolean running = true; @@ -114,6 +118,7 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) { this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); this.checkLowReplicationInterval = conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); + this.rollWaitTimeout = conf.getLong(WAL_ROLL_WAIT_TIMEOUT, 30000); } /** @@ -184,18 +189,38 @@ public void run() { } else { continue; } - try { - // Force the roll if the logroll.period is elapsed or if a roll was requested. - // The returned value is an collection of actual region and family names. - Map> regionsToFlush = controller.rollWal(now); - if (regionsToFlush != null) { - for (Map.Entry> r : regionsToFlush.entrySet()) { - scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); + Map> regionsToFlush = null; + long startWaiting = EnvironmentEdgeManager.currentTime(); + int nAttempts = 0; + do { + try { + // Force the roll if the logroll.period is elapsed or if a roll was requested. + // The returned value is an collection of actual region and family names. + regionsToFlush = controller.rollWal(EnvironmentEdgeManager.currentTime()); + break; + } catch (IOException ioe) { + if (ioe instanceof WALClosedException) { + LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", ioe); + iter.remove(); + break; } + long waitingTime = EnvironmentEdgeManager.currentTime() - startWaiting; + if (waitingTime < rollWaitTimeout) { + nAttempts++; + LOG.warn("Retry to roll log, nAttempts={}, waiting time={}ms, sleeping 1s to retry," + + " last excepiton= {}", nAttempts, waitingTime, + ioe.getCause().getClass().getSimpleName()); + sleep(1000); + } else { + LOG.error("Roll wal failed and waiting timeout, will not retry", ioe); + throw ioe; + } + } + } while (EnvironmentEdgeManager.currentTime() - startWaiting < rollWaitTimeout); + if (regionsToFlush != null) { + for (Map.Entry> r : regionsToFlush.entrySet()) { + scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); } - } catch (WALClosedException e) { - LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); - iter.remove(); } } } catch (FailedLogCloseException | ConnectException e) {