diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 450b8f33e8f0..17bfb61291a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -288,22 +288,29 @@ public abstract class AbstractFSWAL implements WAL { final Comparator LOG_NAME_COMPARATOR = (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); - private static final class WalProps { + private static final class WALProps { /** * Map the encoded region name to the highest sequence id. *

* Contains all the regions it has an entry for. */ - public final Map encodedName2HighestSequenceId; + private final Map encodedName2HighestSequenceId; /** * The log file size. Notice that the size may not be accurate if we do asynchronous close in * sub classes. */ - public final long logSize; + private final long logSize; - public WalProps(Map encodedName2HighestSequenceId, long logSize) { + /** + * If we do asynchronous close in sub classes, it is possible that when adding WALProps to the + * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file, + * for safety. + */ + private volatile boolean closed = false; + + WALProps(Map encodedName2HighestSequenceId, long logSize) { this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; this.logSize = logSize; } @@ -313,7 +320,7 @@ public WalProps(Map encodedName2HighestSequenceId, long logSize) { * Map of WAL log file to properties. The map is sorted by the log file creation timestamp * (contained in the log file name). */ - protected ConcurrentNavigableMap walFile2Props = + protected final ConcurrentNavigableMap walFile2Props = new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); /** @@ -332,6 +339,9 @@ public WalProps(Map encodedName2HighestSequenceId, long logSize) { protected final AtomicBoolean rollRequested = new AtomicBoolean(false); + protected final ExecutorService closeExecutor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); + // Run in caller if we get reject execution exception, to avoid aborting region server when we get // reject execution exception. Usually this should not happen but let's make it more robust. private final ExecutorService logArchiveExecutor = @@ -679,7 +689,7 @@ Map> findRegionsToForceFlush() throws IOException { Map> regions = null; int logCount = getNumRolledLogFiles(); if (logCount > this.maxLogs && logCount > 0) { - Map.Entry firstWALEntry = this.walFile2Props.firstEntry(); + Map.Entry firstWALEntry = this.walFile2Props.firstEntry(); regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); } @@ -702,14 +712,35 @@ Map> findRegionsToForceFlush() throws IOException { return regions; } + /** + * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file. + */ + protected final void markClosedAndClean(Path path) { + WALProps props = walFile2Props.get(path); + // typically this should not be null, but if there is no big issue if it is already null, so + // let's make the code more robust + if (props != null) { + props.closed = true; + cleanOldLogs(); + } + } + /** * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. + *

+ * Use synchronized because we may call this method in different threads, normally when replacing + * writer, and since now close writer may be asynchronous, we will also call this method in the + * closeExecutor, right after we actually close a WAL writer. */ - private void cleanOldLogs() throws IOException { + private synchronized void cleanOldLogs() { List> logsToArchive = null; // For each log file, look at its Map of regions to highest sequence id; if all sequence ids // are older than what is currently in memory, the WAL can be GC'd. - for (Map.Entry e : this.walFile2Props.entrySet()) { + for (Map.Entry e : this.walFile2Props.entrySet()) { + if (!e.getValue().closed) { + LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey()); + continue; + } Path log = e.getKey(); Map sequenceNums = e.getValue().encodedName2HighestSequenceId; if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { @@ -791,7 +822,7 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; if (oldPath != null) { this.walFile2Props.put(oldPath, - new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); + new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); this.totalLogSize.addAndGet(oldFileLen); LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), @@ -987,6 +1018,20 @@ public Void call() throws Exception { // and abort the region server logArchiveExecutor.shutdown(); } + // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still + // have some pending archiving tasks not finished yet, and in close we may archive all the + // remaining WAL files, there could be race if we do not wait for the background archive task + // finish + try { + if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) { + throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" + + " the shutdown of WAL doesn't complete! Please check the status of underlying " + + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS + + "\""); + } + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 9fa36630d494..c1e6c1b6907e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -35,7 +35,6 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -179,9 +178,6 @@ public class AsyncFSWAL extends AbstractFSWAL { private final long batchSize; - private final ExecutorService closeExecutor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); - private volatile AsyncFSOutput fsOut; private final Deque toWriteAppends = new ArrayDeque<>(); @@ -718,23 +714,20 @@ private void waitForSafePoint() { } } - protected final long closeWriter(AsyncWriter writer, Path path) { - if (writer != null) { - inflightWALClosures.put(path.getName(), writer); - long fileLength = writer.getLength(); - closeExecutor.execute(() -> { - try { - writer.close(); - } catch (IOException e) { - LOG.warn("close old writer failed", e); - } finally { - inflightWALClosures.remove(path.getName()); - } - }); - return fileLength; - } else { - return 0L; - } + private void closeWriter(AsyncWriter writer, Path path) { + inflightWALClosures.put(path.getName(), writer); + closeExecutor.execute(() -> { + try { + writer.close(); + } catch (IOException e) { + LOG.warn("close old writer failed", e); + } finally { + // call this even if the above close fails, as there is no other chance we can set closed to + // true, it will not cause big problems. + markClosedAndClean(path); + inflightWALClosures.remove(path.getName()); + } + }); } @Override @@ -742,8 +735,19 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite throws IOException { Preconditions.checkNotNull(nextWriter); waitForSafePoint(); - long oldFileLen = closeWriter(this.writer, oldPath); - logRollAndSetupWalProps(oldPath, newPath, oldFileLen); + // we will call rollWriter in init method, where we want to create the first writer and + // obviously the previous writer is null, so here we need this null check. And why we must call + // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after + // closing the writer asynchronously, we need to make sure the WALProps is put into + // walFile2Props before we call markClosedAndClean + if (writer != null) { + long oldFileLen = writer.getLength(); + logRollAndSetupWalProps(oldPath, newPath, oldFileLen); + closeWriter(writer, oldPath); + } else { + logRollAndSetupWalProps(oldPath, newPath, 0); + } + this.writer = nextWriter; if (nextWriter instanceof AsyncProtobufLogWriter) { this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 1df2d91e46bf..6afe2e06794c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -36,8 +36,6 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -169,8 +167,6 @@ public class FSHLog extends AbstractFSWAL { private final AtomicInteger closeErrorCount = new AtomicInteger(); private final int waitOnShutdownInSeconds; - private final ExecutorService closeExecutor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); /** * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs @@ -376,28 +372,44 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th LOG.warn( "Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage()); } - long oldFileLen = 0L; // It is at the safe point. Swap out writer from under the blocked writer thread. + // we will call rollWriter in init method, where we want to create the first writer and + // obviously the previous writer is null, so here we need this null check. And why we must + // call logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean + // after closing the writer asynchronously, we need to make sure the WALProps is put into + // walFile2Props before we call markClosedAndClean if (this.writer != null) { - oldFileLen = this.writer.getLength(); + long oldFileLen = this.writer.getLength(); + logRollAndSetupWalProps(oldPath, newPath, oldFileLen); // In case of having unflushed entries or we already reached the // closeErrorsTolerated count, call the closeWriter inline rather than in async // way so that in case of an IOE we will throw it back and abort RS. inflightWALClosures.put(oldPath.getName(), writer); if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) { - closeWriter(this.writer, oldPath, true); + try { + closeWriter(this.writer, oldPath, true); + } finally { + inflightWALClosures.remove(oldPath.getName()); + } } else { Writer localWriter = this.writer; closeExecutor.execute(() -> { try { closeWriter(localWriter, oldPath, false); } catch (IOException e) { - // We will never reach here. + LOG.warn("close old writer failed", e); + } finally { + // call this even if the above close fails, as there is no other chance we can set + // closed to true, it will not cause big problems. + markClosedAndClean(oldPath); + inflightWALClosures.remove(oldPath.getName()); } }); } + } else { + logRollAndSetupWalProps(oldPath, newPath, 0); } - logRollAndSetupWalProps(oldPath, newPath, oldFileLen); + this.writer = nextWriter; if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) { this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream(); @@ -452,8 +464,6 @@ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws } LOG.warn("Riding over failed WAL close of " + path + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe); - } finally { - inflightWALClosures.remove(path.getName()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 7e5e33098c22..cfb2a4fddfdd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -397,10 +397,11 @@ public static ServerName getServerNameFromWALDirectoryName(Path logFile) { serverName = ServerName.parseServerName(logDirName); } catch (IllegalArgumentException | IllegalStateException ex) { serverName = null; - LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage()); + LOG.warn("Cannot parse a server name from path={}", logFile, ex); } - if (serverName != null && serverName.getStartcode() < 0) { - LOG.warn("Invalid log file path=" + logFile); + if (serverName != null && serverName.getStartCode() < 0) { + LOG.warn("Invalid log file path={}, start code {} is less than 0", logFile, + serverName.getStartCode()); serverName = null; } return serverName; @@ -465,6 +466,11 @@ public static Path findArchivedLog(Path path, Configuration conf) throws IOExcep } ServerName serverName = getServerNameFromWALDirectoryName(path); + if (serverName == null) { + LOG.warn("Can not extract server name from path {}, " + + "give up searching the separated old log dir", path); + return null; + } // Try finding the log in separate old log dir oldLogDir = new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index 03bdb7e8f866..58caf6f66135 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -419,9 +419,11 @@ public void testWALRollWriting() throws Exception { r.flush(true); } ADMIN.rollWALWriter(regionServer.getServerName()); - int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); - LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); - assertTrue(("actual count: " + count), count <= 2); + TEST_UTIL.waitFor(5000, () -> { + int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); + LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); + return count <= 2; + }); } private void setUpforLogRolling() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java index 473e316b49fe..30cb95d80d16 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClusterAdminApi.java @@ -152,9 +152,11 @@ public void testRollWALWALWriter() throws Exception { r.flush(true); } admin.rollWALWriter(regionServer.getServerName()).join(); - int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); - LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); - assertTrue(("actual count: " + count), count <= 2); + TEST_UTIL.waitFor(5000, () -> { + int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); + LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); + return count <= 2; + }); } private void setUpforLogRolling() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index e9eec9e53527..a36b25592084 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -488,9 +488,8 @@ public void testFlushingWhenLogRolling() throws Exception { // Roll the WAL. The log file count is less than maxLogs so no flush is triggered. int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion); assertNull(getWAL(desiredRegion).rollWriter()); - while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) { - Thread.sleep(100); - } + TEST_UTIL.waitFor(60000, + () -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles); } assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion)); assertTrue( @@ -529,7 +528,7 @@ public String explainFailure() throws Exception { desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); // let WAL cleanOldLogs assertNull(getWAL(desiredRegion).rollWriter(true)); - assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs); + TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs); } finally { TEST_UTIL.shutdownMiniCluster(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 3200ad2b58d0..1f04e2718be5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -253,21 +253,14 @@ public void testWALComparator() throws Exception { } } - /** - * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of - * regions which should be flushed in order to archive the oldest wal file. - *

- * This method tests this behavior by inserting edits and rolling the wal enough times to reach - * the max number of logs threshold. It checks whether we get the "right regions and stores" for - * flush on rolling the wal. - */ - @Test - public void testFindMemStoresEligibleForFlush() throws Exception { - LOG.debug("testFindMemStoresEligibleForFlush"); - Configuration conf1 = HBaseConfiguration.create(CONF); - conf1.setInt("hbase.regionserver.maxlogs", 1); - AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(), - HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); + // now we will close asynchronously and will not archive a wal file unless it is fully closed, so + // sometimes we need to wait a bit before asserting, especially when you want to test the removal + // of numRolledLogFiles + private void waitNumRolledLogFiles(AbstractFSWAL wal, int expected) { + TEST_UTIL.waitFor(5000, () -> wal.getNumRolledLogFiles() == expected); + } + + private void testFindMemStoresEligibleForFlush(AbstractFSWAL wal) throws IOException { String cf1 = "cf1"; String cf2 = "cf2"; String cf3 = "cf3"; @@ -278,7 +271,7 @@ public void testFindMemStoresEligibleForFlush() throws Exception { RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build(); RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build(); - List cfs = new ArrayList(); + List cfs = new ArrayList<>(); cfs.add(ColumnFamilyDescriptorBuilder.of(cf1)); cfs.add(ColumnFamilyDescriptorBuilder.of(cf2)); TableDescriptor t3 = @@ -299,87 +292,101 @@ public void testFindMemStoresEligibleForFlush() throws Exception { for (byte[] fam : t3.getColumnFamilyNames()) { scopes3.put(fam, 0); } - try { - addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); - wal.rollWriter(); - // add some more edits and roll the wal. This would reach the log number threshold - addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); - wal.rollWriter(); - // with above rollWriter call, the max logs limit is reached. - assertTrue(wal.getNumRolledLogFiles() == 2); - - // get the regions to flush; since there is only one region in the oldest wal, it should - // return only one region. - Map> regionsToFlush = wal.findRegionsToForceFlush(); - assertEquals(1, regionsToFlush.size()); - assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); - // insert edits in second region - addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); - // get the regions to flush, it should still read region1. - regionsToFlush = wal.findRegionsToForceFlush(); - assertEquals(1, regionsToFlush.size()); - assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); - // flush region 1, and roll the wal file. Only last wal which has entries for region1 should - // remain. - flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); - wal.rollWriter(); - // only one wal should remain now (that is for the second region). - assertEquals(1, wal.getNumRolledLogFiles()); - // flush the second region - flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); - wal.rollWriter(true); - // no wal should remain now. - assertEquals(0, wal.getNumRolledLogFiles()); - // add edits both to region 1 and region 2, and roll. - addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); - addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); - wal.rollWriter(); - // add edits and roll the writer, to reach the max logs limit. - assertEquals(1, wal.getNumRolledLogFiles()); - addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); - wal.rollWriter(); - // it should return two regions to flush, as the oldest wal file has entries - // for both regions. - regionsToFlush = wal.findRegionsToForceFlush(); - assertEquals(2, regionsToFlush.size()); - // flush both regions - flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); - flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); - wal.rollWriter(true); - assertEquals(0, wal.getNumRolledLogFiles()); - // Add an edit to region1, and roll the wal. - addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); - // tests partial flush: roll on a partial flush, and ensure that wal is not archived. - wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); - wal.rollWriter(); - wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); - assertEquals(1, wal.getNumRolledLogFiles()); - - // clear test data - flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); - wal.rollWriter(true); - // add edits for three familes - addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); - addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2); - addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3); - wal.rollWriter(); - addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); - wal.rollWriter(); - assertEquals(2, wal.getNumRolledLogFiles()); - // flush one family before archive oldest wal - Set flushedFamilyNames = new HashSet<>(); - flushedFamilyNames.add(Bytes.toBytes(cf1)); - flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames); - regionsToFlush = wal.findRegionsToForceFlush(); - // then only two family need to be flushed when archive oldest wal - assertEquals(1, regionsToFlush.size()); - assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); - assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size()); - } finally { - if (wal != null) { - wal.close(); - } + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); + wal.rollWriter(); + // add some more edits and roll the wal. This would reach the log number threshold + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); + wal.rollWriter(); + // with above rollWriter call, the max logs limit is reached. + waitNumRolledLogFiles(wal, 2); + + // get the regions to flush; since there is only one region in the oldest wal, it should + // return only one region. + Map> regionsToFlush = wal.findRegionsToForceFlush(); + assertEquals(1, regionsToFlush.size()); + assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); + // insert edits in second region + addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); + // get the regions to flush, it should still read region1. + regionsToFlush = wal.findRegionsToForceFlush(); + assertEquals(1, regionsToFlush.size()); + assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); + // flush region 1, and roll the wal file. Only last wal which has entries for region1 should + // remain. + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); + wal.rollWriter(); + // only one wal should remain now (that is for the second region). + waitNumRolledLogFiles(wal, 1); + // flush the second region + flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); + wal.rollWriter(true); + // no wal should remain now. + waitNumRolledLogFiles(wal, 0); + // add edits both to region 1 and region 2, and roll. + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); + addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1); + wal.rollWriter(); + // add edits and roll the writer, to reach the max logs limit. + waitNumRolledLogFiles(wal, 1); + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); + wal.rollWriter(); + // it should return two regions to flush, as the oldest wal file has entries + // for both regions. + regionsToFlush = wal.findRegionsToForceFlush(); + assertEquals(2, regionsToFlush.size()); + // flush both regions + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); + flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); + wal.rollWriter(true); + waitNumRolledLogFiles(wal, 0); + // Add an edit to region1, and roll the wal. + addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1); + // tests partial flush: roll on a partial flush, and ensure that wal is not archived. + wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); + wal.rollWriter(); + wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); + waitNumRolledLogFiles(wal, 1); + + // clear test data + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); + wal.rollWriter(true); + // add edits for three familes + addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); + addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2); + addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3); + wal.rollWriter(); + addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1); + wal.rollWriter(); + waitNumRolledLogFiles(wal, 2); + // flush one family before archive oldest wal + Set flushedFamilyNames = new HashSet<>(); + flushedFamilyNames.add(Bytes.toBytes(cf1)); + flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames); + regionsToFlush = wal.findRegionsToForceFlush(); + // then only two family need to be flushed when archive oldest wal + assertEquals(1, regionsToFlush.size()); + assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]); + assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size()); + } + + /** + * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of + * regions which should be flushed in order to archive the oldest wal file. + *

+ * This method tests this behavior by inserting edits and rolling the wal enough times to reach + * the max number of logs threshold. It checks whether we get the "right regions and stores" for + * flush on rolling the wal. + */ + @Test + public void testFindMemStoresEligibleForFlush() throws Exception { + LOG.debug("testFindMemStoresEligibleForFlush"); + Configuration conf1 = HBaseConfiguration.create(CONF); + conf1.setInt("hbase.regionserver.maxlogs", 1); + try (AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(), + HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null)) { + testFindMemStoresEligibleForFlush(wal); } + } @Test(expected = IOException.class) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 90480af00543..722f5ce5bc2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -137,9 +139,7 @@ public void tearDown() throws Exception { TEST_UTIL.shutdownMiniCluster(); } - protected void startAndWriteData() throws IOException, InterruptedException { - // When the hbase:meta table can be opened, the region servers are running - TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); + private void startAndWriteData() throws IOException, InterruptedException { this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); Table table = createTestTable(this.tableName); @@ -175,19 +175,6 @@ public void testLogRollOnNothingWritten() throws Exception { } } - private void assertLogFileSize(WAL log) throws InterruptedException { - if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) { - assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0); - } else { - for (int i = 0; i < 10; i++) { - if (AbstractFSWALProvider.getLogFileSize(log) != 0) { - Thread.sleep(10); - } - } - assertEquals(0, AbstractFSWALProvider.getLogFileSize(log)); - } - } - /** * Tests that logs are deleted */ @@ -200,20 +187,25 @@ public void testLogRolling() throws Exception { final WAL log = server.getWAL(region); LOG.info( "after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files"); - assertLogFileSize(log); + + // roll the log, so we should have at least one rolled file and the log file size should be + // greater than 0, in case in the above method we rolled in the last round and then flushed so + // all the old wal files are deleted and cause the below assertion to fail + log.rollWriter(); + + assertThat(AbstractFSWALProvider.getLogFileSize(log), greaterThan(0L)); // flush all regions for (HRegion r : server.getOnlineRegionsLocalContext()) { r.flush(true); } - // Now roll the log + // Now roll the log the again log.rollWriter(); - int count = AbstractFSWALProvider.getNumRolledLogFiles(log); - LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); - assertTrue(("actual count: " + count), count <= 2); - assertLogFileSize(log); + // should have deleted all the rolled wal files + TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(log) == 0); + assertEquals(0, AbstractFSWALProvider.getLogFileSize(log)); } protected String getName() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java index cdc9757ed459..ce490cfaef5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java @@ -112,10 +112,6 @@ public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); } - static String getName() { - return "TestDefaultWALProvider"; - } - @Test public void testGetServerNameFromWALDirectoryName() throws IOException { ServerName sn = ServerName.valueOf("hn", 450, 1398); @@ -163,7 +159,7 @@ private void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, /** * used by TestDefaultWALProviderWithHLogKey */ - WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp, + private WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp, NavigableMap scopes) { return new WALKeyImpl(info, tableName, timestamp, mvcc, scopes); } @@ -171,14 +167,19 @@ WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long ti /** * helper method to simulate region flush for a WAL. */ - protected void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { + private void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { wal.startCacheFlush(regionEncodedName, flushedFamilyNames); wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM); } - @Test - public void testLogCleaning() throws Exception { - LOG.info(currentTest.getMethodName()); + // now we will close asynchronously and will not archive a wal file unless it is fully closed, so + // sometimes we need to wait a bit before asserting, especially when you want to test the removal + // of numRolledLogFiles + private void waitNumRolledLogFiles(WAL wal, int expected) { + TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(wal) == expected); + } + + private void testLogCleaning(WALFactory wals) throws IOException { TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); @@ -193,71 +194,62 @@ public void testLogCleaning() throws Exception { for (byte[] fam : htd2.getColumnFamilyNames()) { scopes2.put(fam, 0); } + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); + RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build(); + // we want to mix edits from regions, so pick our own identifier. + WAL log = wals.getWAL(null); + + // Add a single edit and make sure that rolling won't remove the file + // Before HBASE-3198 it used to delete it + addEdits(log, hri, htd, 1, scopes1); + log.rollWriter(); + waitNumRolledLogFiles(log, 1); + + // See if there's anything wrong with more than 1 edit + addEdits(log, hri, htd, 2, scopes1); + log.rollWriter(); + assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log)); + + // Now mix edits from 2 regions, still no flushing + addEdits(log, hri, htd, 1, scopes1); + addEdits(log, hri2, htd2, 1, scopes2); + addEdits(log, hri, htd, 1, scopes1); + addEdits(log, hri2, htd2, 1, scopes2); + log.rollWriter(); + waitNumRolledLogFiles(log, 3); + + // Flush the first region, we expect to see the first two files getting + // archived. We need to append something or writer won't be rolled. + addEdits(log, hri2, htd2, 1, scopes2); + log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); + log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); + log.rollWriter(); + waitNumRolledLogFiles(log, 2); + + // Flush the second region, which removes all the remaining output files + // since the oldest was completely flushed and the two others only contain + // flush information + addEdits(log, hri2, htd2, 1, scopes2); + log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames()); + log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); + log.rollWriter(); + waitNumRolledLogFiles(log, 0); + } + + @Test + public void testLogCleaning() throws Exception { + LOG.info(currentTest.getMethodName()); Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); try { - RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); - RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build(); - // we want to mix edits from regions, so pick our own identifier. - WAL log = wals.getWAL(null); - - // Add a single edit and make sure that rolling won't remove the file - // Before HBASE-3198 it used to delete it - addEdits(log, hri, htd, 1, scopes1); - log.rollWriter(); - assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log)); - - // See if there's anything wrong with more than 1 edit - addEdits(log, hri, htd, 2, scopes1); - log.rollWriter(); - assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log)); - - // Now mix edits from 2 regions, still no flushing - addEdits(log, hri, htd, 1, scopes1); - addEdits(log, hri2, htd2, 1, scopes2); - addEdits(log, hri, htd, 1, scopes1); - addEdits(log, hri2, htd2, 1, scopes2); - log.rollWriter(); - assertEquals(3, AbstractFSWALProvider.getNumRolledLogFiles(log)); - - // Flush the first region, we expect to see the first two files getting - // archived. We need to append something or writer won't be rolled. - addEdits(log, hri2, htd2, 1, scopes2); - log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); - log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); - log.rollWriter(); - int count = AbstractFSWALProvider.getNumRolledLogFiles(log); - assertEquals(2, count); - - // Flush the second region, which removes all the remaining output files - // since the oldest was completely flushed and the two others only contain - // flush information - addEdits(log, hri2, htd2, 1, scopes2); - log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames()); - log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); - log.rollWriter(); - assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log)); + testLogCleaning(wals); } finally { - if (wals != null) { - wals.close(); - } + wals.close(); } } - /** - * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs and - * also don't archive "live logs" (that is, a log with un-flushed entries). - *

- * This is what it does: It creates two regions, and does a series of inserts along with log - * rolling. Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is - * eligible for archiving if for all the regions which have entries in that wal file, have flushed - * - past their maximum sequence id in that wal file. - *

- */ - @Test - public void testWALArchiving() throws IOException { - LOG.debug(currentTest.getMethodName()); + private void testWALArchiving(WALFactory wals) throws IOException { TableDescriptor table1 = TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "1")) .setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build(); @@ -272,58 +264,73 @@ public void testWALArchiving() throws IOException { for (byte[] fam : table2.getColumnFamilyNames()) { scopes2.put(fam, 0); } + WAL wal = wals.getWAL(null); + assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); + RegionInfo hri1 = RegionInfoBuilder.newBuilder(table1.getTableName()).build(); + RegionInfo hri2 = RegionInfoBuilder.newBuilder(table2.getTableName()).build(); + // variables to mock region sequenceIds. + // start with the testing logic: insert a waledit, and roll writer + addEdits(wal, hri1, table1, 1, scopes1); + wal.rollWriter(); + // assert that the wal is rolled + waitNumRolledLogFiles(wal, 1); + // add edits in the second wal file, and roll writer. + addEdits(wal, hri1, table1, 1, scopes1); + wal.rollWriter(); + // assert that the wal is rolled + waitNumRolledLogFiles(wal, 2); + // add a waledit to table1, and flush the region. + addEdits(wal, hri1, table1, 3, scopes1); + flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames()); + // roll log; all old logs should be archived. + wal.rollWriter(); + waitNumRolledLogFiles(wal, 0); + // add an edit to table2, and roll writer + addEdits(wal, hri2, table2, 1, scopes2); + wal.rollWriter(); + waitNumRolledLogFiles(wal, 1); + // add edits for table1, and roll writer + addEdits(wal, hri1, table1, 2, scopes1); + wal.rollWriter(); + waitNumRolledLogFiles(wal, 2); + // add edits for table2, and flush hri1. + addEdits(wal, hri2, table2, 2, scopes2); + flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); + // the log : region-sequenceId map is + // log1: region2 (unflushed) + // log2: region1 (flushed) + // log3: region2 (unflushed) + // roll the writer; log2 should be archived. + wal.rollWriter(); + waitNumRolledLogFiles(wal, 2); + // flush region2, and all logs should be archived. + addEdits(wal, hri2, table2, 2, scopes2); + flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); + wal.rollWriter(); + waitNumRolledLogFiles(wal, 0); + } + + /** + * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs and + * also don't archive "live logs" (that is, a log with un-flushed entries). + *

+ * This is what it does: It creates two regions, and does a series of inserts along with log + * rolling. Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is + * eligible for archiving if for all the regions which have entries in that wal file, have flushed + * - past their maximum sequence id in that wal file. + *

+ */ + @Test + public void testWALArchiving() throws IOException { + LOG.debug(currentTest.getMethodName()); + Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName()); WALFactory wals = new WALFactory(localConf, currentTest.getMethodName()); try { - WAL wal = wals.getWAL(null); - assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); - RegionInfo hri1 = RegionInfoBuilder.newBuilder(table1.getTableName()).build(); - RegionInfo hri2 = RegionInfoBuilder.newBuilder(table2.getTableName()).build(); - // variables to mock region sequenceIds. - // start with the testing logic: insert a waledit, and roll writer - addEdits(wal, hri1, table1, 1, scopes1); - wal.rollWriter(); - // assert that the wal is rolled - assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal)); - // add edits in the second wal file, and roll writer. - addEdits(wal, hri1, table1, 1, scopes1); - wal.rollWriter(); - // assert that the wal is rolled - assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); - // add a waledit to table1, and flush the region. - addEdits(wal, hri1, table1, 3, scopes1); - flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames()); - // roll log; all old logs should be archived. - wal.rollWriter(); - assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); - // add an edit to table2, and roll writer - addEdits(wal, hri2, table2, 1, scopes2); - wal.rollWriter(); - assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal)); - // add edits for table1, and roll writer - addEdits(wal, hri1, table1, 2, scopes1); - wal.rollWriter(); - assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); - // add edits for table2, and flush hri1. - addEdits(wal, hri2, table2, 2, scopes2); - flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); - // the log : region-sequenceId map is - // log1: region2 (unflushed) - // log2: region1 (flushed) - // log3: region2 (unflushed) - // roll the writer; log2 should be archived. - wal.rollWriter(); - assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); - // flush region2, and all logs should be archived. - addEdits(wal, hri2, table2, 2, scopes2); - flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); - wal.rollWriter(); - assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); + testWALArchiving(wals); } finally { - if (wals != null) { - wals.close(); - } + wals.close(); } }