diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index dd800f9b2c31..03b861be7a33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -22,12 +22,14 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; @@ -84,10 +87,14 @@ public InternalScanner createScanner(ScanInfo scanInfo, List s @Override public StoreFileWriter createWriter(InternalScanner scanner, org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { + boolean shouldDropBehind, boolean major, Consumer writerCreationTracker) + throws IOException { // make this writer with tags always because of possible new cells with tags. - return store.getStoreEngine().createWriter( - createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true)); + return store.getStoreEngine() + .createWriter( + createParams(fd, shouldDropBehind, major, writerCreationTracker) + .includeMVCCReadpoint(true) + .includesTag(true)); } }; @@ -155,18 +162,19 @@ public List compact(CompactionRequestImpl request, ThroughputController th * the scanner to filter the deleted cells. * @param fd File details * @param scanner Where to read from. + * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @param throughputController The compaction throughput controller. * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { - long bytesWrittenProgressForCloseCheck = 0; + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; // Since scanner.next() can return 'false' but still be delivering data, @@ -370,9 +378,8 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, return true; } - @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index a52ce2b05c0c..10ef4c450e70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -100,7 +101,7 @@ public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOExcepti @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + FlushLifeCycleTracker tracker, Consumer writerCreationTracker) throws IOException { ArrayList result = new ArrayList<>(); long cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries @@ -114,7 +115,7 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = createWriter(snapshot, true); + writer = createWriter(snapshot, true, writerCreationTracker); IOException e = null; try { // It's a mob store, flush the cells in a mob way. This is the difference of flushing diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java index 82c3867c103c..a824b501c82c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -67,7 +67,7 @@ public void init(StoreScanner sourceScanner, WriterFactory factory) { * comments in HBASE-15400 for more details. */ public List commitWriters(long maxSeqId, boolean majorCompaction) throws IOException { - return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET); + return commitWriters(maxSeqId, majorCompaction, Collections.emptyList()); } public List commitWriters(long maxSeqId, boolean majorCompaction, @@ -110,11 +110,7 @@ public List abortWriters() { return paths; } - /** - * Returns all writers. This is used to prevent deleting currently writen storefiles - * during cleanup. - */ - public abstract Collection writers(); + protected abstract Collection writers(); /** * Subclasses override this method to be called at the end of a successful sequence of append; all diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java index 0c4807d8badc..042acb0bbc99 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java @@ -152,7 +152,7 @@ private void cleanFileIfNeeded(FileStatus file, HStore store, } private boolean isCompactionResultFile(FileStatus file, HStore store) { - return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath()); + return store.getStoreFilesBeingWritten().contains(file.getPath()); } // Compacted files can still have readers and are cleaned by a separate chore, so they have to diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java index 10cd9f009e4a..1d45e1c51c5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.function.Consumer; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.yetus.audience.InterfaceAudience; @@ -40,6 +42,8 @@ public final class CreateStoreFileWriterParams { private String fileStoragePolicy = HConstants.EMPTY_STRING; + private Consumer writerCreationTracker; + private CreateStoreFileWriterParams() { } @@ -127,8 +131,16 @@ public CreateStoreFileWriterParams fileStoragePolicy(String fileStoragePolicy) { return this; } + public Consumer writerCreationTracker() { + return writerCreationTracker; + } + + public CreateStoreFileWriterParams writerCreationTracker(Consumer writerCreationTracker) { + this.writerCreationTracker = writerCreationTracker; + return this; + } + public static CreateStoreFileWriterParams create() { return new CreateStoreFileWriterParams(); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index 1e10eb2db231..8201cb152c01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException { } @Override - public Collection writers() { + protected Collection writers() { return lowerBoundary2Writer.values(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 306760d7ce6a..0f3daa4c177a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -44,8 +45,8 @@ public DefaultStoreFlusher(Configuration conf, HStore store) { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, + Consumer writerCreationTracker) throws IOException { ArrayList result = new ArrayList<>(); int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries @@ -59,7 +60,7 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = createWriter(snapshot, false); + writer = createWriter(snapshot, false, writerCreationTracker); IOException e = null; try { performFlush(scanner, writer, throughputController); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 41f18382073a..3da95c18142b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -156,8 +158,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, // rows that has cells from both memstore and files (or only files) private LongAdder mixedRowReadsCount = new LongAdder(); - private boolean cacheOnWriteLogged; - /** * Lock specific to archiving compacted store files. This avoids races around * the combination of retrieving the list of compacted files and moving them to @@ -215,14 +215,46 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, private final StoreContext storeContext; + // Used to track the store files which are currently being written. For compaction, if we want to + // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to + // track the store files being written when flushing. + // Notice that the creation is in the background compaction or flush thread and we will get the + // files in other thread, so it needs to be thread safe. + private static final class StoreFileWriterCreationTracker implements Consumer { + + private final Set files = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + @Override + public void accept(Path t) { + files.add(t); + } + + public Set get() { + return Collections.unmodifiableSet(files); + } + } + + // We may have multiple compaction running at the same time, and flush can also happen at the same + // time, so here we need to use a collection, and the collection needs to be thread safe. + // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to + // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to + // IdentityHashMap if later we want to implement hashCode or equals. + private final Set storeFileWriterCreationTrackers = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + + // For the SFT implementation which we will write tmp store file first, we do not need to clean up + // the broken store files under the data directory, which means we do not need to track the store + // file writer creation. So here we abstract a factory to return different trackers for different + // SFT implementations. + private final Supplier storeFileWriterCreationTrackerFactory; + /** * Constructor * @param family HColumnDescriptor for this column - * @param confParam configuration object failed. Can be null. + * @param confParam configuration object failed. Can be null. */ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration confParam, boolean warmup) throws IOException { - this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family); this.region = region; @@ -267,6 +299,12 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); storeEngine.initialize(warmup); + // if require writing to tmp dir first, then we just return null, which indicate that we do not + // need to track the creation of store file writer, otherwise we return a new + // StoreFileWriterCreationTracker. + this.storeFileWriterCreationTrackerFactory = + storeEngine.requireWritingToTmpDirFirst() ? () -> null + : () -> new StoreFileWriterCreationTracker(); refreshStoreSizeAndTotalBytes(); flushRetriesNumber = conf.getInt( @@ -290,7 +328,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads, parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType()); - cacheOnWriteLogged = false; } private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException { @@ -795,8 +832,8 @@ public ImmutableCollection close() throws IOException { * @throws IOException if exception occurs during process */ protected List flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, + Consumer writerCreationTracker) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. @@ -806,8 +843,13 @@ protected List flushCache(final long logCacheFlushId, MemStoreSnapshot sna IOException lastException = null; for (int i = 0; i < flushRetriesNumber; i++) { try { - List pathNames = - flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); + List pathNames = flusher.flushSnapshot( + snapshot, + logCacheFlushId, + status, + throughputController, + tracker, + writerCreationTracker); Path lastPathName = null; try { for (Path pathName : pathNames) { @@ -1118,6 +1160,12 @@ public List compact(CompactionContext compaction, ThroughputController throughputController, User user) throws IOException { assert compaction != null; CompactionRequestImpl cr = compaction.getRequest(); + StoreFileWriterCreationTracker writerCreationTracker = + storeFileWriterCreationTrackerFactory.get(); + if (writerCreationTracker != null) { + cr.setWriterCreationTracker(writerCreationTracker); + storeFileWriterCreationTrackers.add(writerCreationTracker); + } try { // Do all sanity checking in here if we have a valid CompactionRequestImpl // because we need to clean up after it on the way out in a finally @@ -1157,18 +1205,6 @@ protected List doCompaction(CompactionRequestImpl cr, } replaceStoreFiles(filesToCompact, sfs, true); - // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the - // CleanerChore know that compaction is done and the file can be cleaned up if compaction - // have failed. - storeEngine.resetCompactionWriter(); - - if (cr.isMajor()) { - majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); - majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); - } else { - compactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); - compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); - } long outputBytes = getTotalSize(sfs); // At this point the store will use new files for all new scanners. @@ -1576,6 +1612,11 @@ private void finishCompactionRequest(CompactionRequestImpl cr) { synchronized (filesCompacting) { filesCompacting.removeAll(cr.getFiles()); } + // The tracker could be null, for example, we do not need to track the creation of store file + // writer due to different implementation of SFT, or the compaction is canceled. + if (cr.getWriterCreationTracker() != null) { + storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker()); + } } /** @@ -1899,6 +1940,7 @@ public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTra private final class StoreFlusherImpl implements StoreFlushContext { private final FlushLifeCycleTracker tracker; + private final StoreFileWriterCreationTracker writerCreationTracker; private final long cacheFlushSeqNum; private MemStoreSnapshot snapshot; private List tempFiles; @@ -1910,6 +1952,7 @@ private final class StoreFlusherImpl implements StoreFlushContext { private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { this.cacheFlushSeqNum = cacheFlushSeqNum; this.tracker = tracker; + this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get(); } /** @@ -1930,41 +1973,61 @@ public MemStoreSize prepare() { public void flushCache(MonitoredTask status) throws IOException { RegionServerServices rsService = region.getRegionServerServices(); ThroughputController throughputController = - rsService == null ? null : rsService.getFlushThroughputController(); - tempFiles = - HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); + rsService == null ? null : rsService.getFlushThroughputController(); + // it could be null if we do not need to track the creation of store file writer due to + // different SFT implementation. + if (writerCreationTracker != null) { + HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker); + } + tempFiles = HStore.this.flushCache( + cacheFlushSeqNum, + snapshot, + status, + throughputController, + tracker, + writerCreationTracker); } @Override public boolean commit(MonitoredTask status) throws IOException { - if (CollectionUtils.isEmpty(this.tempFiles)) { - return false; - } - status.setStatus("Flushing " + this + ": reopening flushed file"); - List storeFiles = storeEngine.commitStoreFiles(tempFiles, false); - for (HStoreFile sf : storeFiles) { - StoreFileReader r = sf.getReader(); - if (LOG.isInfoEnabled()) { - LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), - cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1)); + try { + if (CollectionUtils.isEmpty(this.tempFiles)) { + return false; + } + status.setStatus("Flushing " + this + ": reopening flushed file"); + List storeFiles = storeEngine.commitStoreFiles(tempFiles, false); + for (HStoreFile sf : storeFiles) { + StoreFileReader r = sf.getReader(); + if (LOG.isInfoEnabled()) { + LOG.info( + "Added {}, entries={}, sequenceid={}, filesize={}", + sf, + r.getEntries(), + cacheFlushSeqNum, + TraditionalBinaryPrefix.long2String(r.length(), "", 1)); + } + outputFileSize += r.length(); + storeSize.addAndGet(r.length()); + totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); + committedFiles.add(sf.getPath()); } - outputFileSize += r.length(); - storeSize.addAndGet(r.length()); - totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); - committedFiles.add(sf.getPath()); - } - flushedCellsCount.addAndGet(cacheFlushCount); - flushedCellsSize.addAndGet(cacheFlushSize); - flushedOutputFileSize.addAndGet(outputFileSize); - // call coprocessor after we have done all the accounting above - for (HStoreFile sf : storeFiles) { - if (getCoprocessorHost() != null) { - getCoprocessorHost().postFlush(HStore.this, sf, tracker); + flushedCellsCount.addAndGet(cacheFlushCount); + flushedCellsSize.addAndGet(cacheFlushSize); + flushedOutputFileSize.addAndGet(outputFileSize); + // call coprocessor after we have done all the accounting above + for (HStoreFile sf : storeFiles) { + if (getCoprocessorHost() != null) { + getCoprocessorHost().postFlush(HStore.this, sf, tracker); + } + } + // Add new file to store files. Clear snapshot too while we have the Store write lock. + return completeFlush(storeFiles, snapshot.getId()); + } finally { + if (writerCreationTracker != null) { + HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker); } } - // Add new file to store files. Clear snapshot too while we have the Store write lock. - return completeFlush(storeFiles, snapshot.getId()); } @Override @@ -2110,6 +2173,16 @@ public long getMajorCompactedCellsSize() { return majorCompactedCellsSize.get(); } + public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) { + if (isMajor) { + majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); + majorCompactedCellsSize.addAndGet(progress.totalCompactedSize); + } else { + compactedCellsCount.addAndGet(progress.getTotalCompactingKVs()); + compactedCellsSize.addAndGet(progress.totalCompactedSize); + } + } + /** * Returns the StoreEngine that is backing this concrete implementation of Store. * @return Returns the {@link StoreEngine} object used internally inside this HStore object. @@ -2404,4 +2477,15 @@ void updateMetricsStore(boolean memstoreRead) { mixedRowReadsCount.increment(); } } + + /** + * Return the storefiles which are currently being written to. Mainly used by + * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in + * SFT yet. + */ + Set getStoreFilesBeingWritten() { + return storeFileWriterCreationTrackers.stream() + .flatMap(t -> t.get().stream()) + .collect(Collectors.toSet()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index d85553ac8082..847187074df5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -42,11 +42,9 @@ import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -94,7 +92,7 @@ */ @InterfaceAudience.Private public abstract class StoreEngine { + C extends Compactor, SFM extends StoreFileManager> { private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); @@ -157,7 +155,7 @@ public CompactionPolicy getCompactionPolicy() { /** * @return Compactor to use. */ - public Compactor getCompactor() { + public Compactor getCompactor() { return this.compactor; } @@ -544,17 +542,6 @@ public boolean requireWritingToTmpDirFirst() { return storeFileTracker.requireWritingToTmpDirFirst(); } - /** - * Resets the compaction writer when the new file is committed and used as active storefile. - * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the - * CleanerChore know that compaction is done and the file can be cleaned up if compaction - * have failed. Currently called in - * @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List) - */ - public void resetCompactionWriter(){ - compactor.resetWriter(); - } - @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", allowedOnPath = ".*/TestHStore.java") ReadWriteLock getLock() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index a4084cbd3795..829028eec638 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -34,6 +34,7 @@ import java.util.HashSet; import java.util.Set; import java.util.UUID; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -423,9 +424,14 @@ public static class Builder { private boolean shouldDropCacheBehind; private Supplier> compactedFilesSupplier = () -> Collections.emptySet(); private String fileStoragePolicy; - - public Builder(Configuration conf, CacheConfig cacheConf, - FileSystem fs) { + // this is used to track the creation of the StoreFileWriter, mainly used for the SFT + // implementation where we will write store files directly to the final place, instead of + // writing a tmp file first. Under this scenario, we will have a background task to purge the + // store files which are not recorded in the SFT, but for the newly created store file writer, + // they are not tracked in SFT, so here we need to record them and treat them specially. + private Consumer writerCreationTracker; + + public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { this.conf = conf; this.cacheConf = cacheConf; this.fs = fs; @@ -509,6 +515,11 @@ public Builder withFileStoragePolicy(String fileStoragePolicy) { return this; } + public Builder withWriterCreationTracker(Consumer writerCreationTracker) { + this.writerCreationTracker = writerCreationTracker; + return this; + } + /** * Create a store file writer. Client is responsible for closing file when * done. If metadata, add BEFORE closing using @@ -557,8 +568,21 @@ public StoreFileWriter build() throws IOException { bloomType = BloomType.NONE; } } - - return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount, + // make sure we call this before actually create the writer + // in fact, it is not a big deal to even add an inexistent file to the track, as we will never + // try to delete it and finally we will clean the tracker up after compaction. But if the file + // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file + // and cause problem. + if (writerCreationTracker != null) { + writerCreationTracker.accept(filePath); + } + return new StoreFileWriter( + fs, + filePath, + conf, + cacheConf, + bloomType, + maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 1095854273a0..d6461f7729a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -22,7 +22,7 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -55,8 +55,8 @@ public StoreFlusher(Configuration conf, HStore store) { * @return List of files written. Can be empty; must not be null. */ public abstract List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException; + MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, + Consumer writerCreationTracker) throws IOException; protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, MonitoredTask status) throws IOException { @@ -69,13 +69,17 @@ protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, writer.close(); } - protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag) - throws IOException { + protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag, + Consumer writerCreationTracker) throws IOException { return store.getStoreEngine() - .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount()) - .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false) - .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent()) - .shouldDropBehind(false)); + .createWriter( + CreateStoreFileWriterParams.create() + .maxKeyCount(snapshot.getCellsCount()) + .compression(store.getColumnFamilyDescriptor().getCompressionType()) + .isCompaction(false) + .includeMVCCReadpoint(true) + .includesTag(alwaysIncludesTag || snapshot.isTagsPresent()) + .shouldDropBehind(false).writerCreationTracker(writerCreationTracker)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java index a4e943ac8b04..fc0598d89ac0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java @@ -58,7 +58,7 @@ public void setNoStripeMetadata() { } @Override - public Collection writers() { + protected Collection writers() { return existingWriters; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index f8183b7645a5..fb9115e01ecf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -23,7 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; @@ -54,11 +54,14 @@ public StripeStoreFlusher(Configuration conf, HStore store, @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, + Consumer writerCreationTracker) throws IOException { List result = new ArrayList<>(); int cellsCount = snapshot.getCellsCount(); - if (cellsCount == 0) return result; // don't flush if there are no entries + if (cellsCount == 0) { + // don't flush if there are no entries + return result; + } InternalScanner scanner = createScanner(snapshot.getScanners(), tracker); @@ -70,8 +73,9 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum StripeMultiFileWriter mw = null; try { mw = req.createWriter(); // Writer according to the policy. - StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot); - StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; + StripeMultiFileWriter.WriterFactory factory = + createWriterFactory(snapshot, writerCreationTracker); + StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner) scanner : null; mw.init(storeScanner, factory); synchronized (flushLock) { @@ -98,12 +102,13 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum return result; } - private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) { + private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot, + Consumer writerCreationTracker) { return new StripeMultiFileWriter.WriterFactory() { @Override public StoreFileWriter createWriter() throws IOException { // XXX: it used to always pass true for includesTag, re-consider? - return StripeStoreFlusher.this.createWriter(snapshot, true); + return StripeStoreFlusher.this.createWriter(snapshot, true, writerCreationTracker); } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index 19b7a98627e6..23d16934b65c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,19 +46,21 @@ public AbstractMultiOutputCompactor(Configuration conf, HStore store) { super(conf, store); } - protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner, - final FileDetails fd, final boolean shouldDropBehind, boolean major) { + protected final void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner scanner, + final FileDetails fd, final boolean shouldDropBehind, boolean major, + Consumer writerCreationTracker) { WriterFactory writerFactory = new WriterFactory() { @Override public StoreFileWriter createWriter() throws IOException { - return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major); + return AbstractMultiOutputCompactor.this + .createWriter(fd, shouldDropBehind, major, writerCreationTracker); } @Override public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) throws IOException { - return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, - fileStoragePolicy, major); + return AbstractMultiOutputCompactor.this + .createWriter(fd, shouldDropBehind, fileStoragePolicy, major, writerCreationTracker); } }; // Prepare multi-writer, and perform the compaction using scanner and writer. @@ -68,7 +70,7 @@ public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) } @Override - protected void abortWriter() throws IOException { + protected void abortWriter(AbstractMultiFileWriter writer) throws IOException { FileSystem fs = store.getFileSystem(); for (Path leftoverFile : writer.abortWriters()) { try { @@ -79,7 +81,6 @@ protected void abortWriter() throws IOException { e); } } - //this step signals that the target file is no longer writen and can be cleaned up - writer = null; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 942cc4f3fd6b..2ccdd150cd21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -37,7 +37,7 @@ public class CompactionProgress { private static final Logger LOG = LoggerFactory.getLogger(CompactionProgress.class); /** the total compacting key values in currently running compaction */ - private long totalCompactingKVs; + public long totalCompactingKVs; /** the completed count of key values in currently running compaction */ public long currentCompactedKVs = 0; /** the total size of data processed by the currently running compaction, in bytes */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java index 899219d70b24..5d8285aecdb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java @@ -22,8 +22,9 @@ import java.util.Collection; import java.util.Collections; +import java.util.function.Consumer; import java.util.stream.Collectors; - +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -51,6 +52,7 @@ private enum DisplayCompactionType { MINOR, ALL_FILES, MAJOR } private String storeName = ""; private long totalSize = -1L; private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY; + private Consumer writerCreationTracker; public CompactionRequestImpl(Collection files) { this.selectionTime = EnvironmentEdgeManager.currentTime(); @@ -137,6 +139,14 @@ public CompactionLifeCycleTracker getTracker() { return tracker; } + public Consumer getWriterCreationTracker() { + return writerCreationTracker; + } + + public void setWriterCreationTracker(Consumer writerCreationTracker) { + this.writerCreationTracker = writerCreationTracker; + } + public boolean isAfterSplit() { return isAfterSplit; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index d934ecb0c16d..5ac64823ae0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -25,12 +25,13 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Set; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -39,7 +40,6 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileInfo; -import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter; import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.HStore; @@ -70,15 +70,18 @@ /** * A compactor is a compaction algorithm associated a given policy. Base class also contains * reusable parts for implementing compactors (what is common and what isn't is evolving). + *

+ * Compactions might be concurrent against a given store and the Compactor is shared among + * them. Do not put mutable state into class fields. All Compactor class fields should be + * final or effectively final. + * 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it. */ @InterfaceAudience.Private public abstract class Compactor { private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; - protected volatile CompactionProgress progress; protected final Configuration conf; protected final HStore store; - protected final int compactionKVMax; protected final Compression.Algorithm majorCompactionCompression; protected final Compression.Algorithm minorCompactionCompression; @@ -92,15 +95,15 @@ public abstract class Compactor { protected static final String MINOR_COMPACTION_DROP_CACHE = "hbase.regionserver.minorcompaction.pagecache.drop"; - private final boolean dropCacheMajor; - private final boolean dropCacheMinor; + protected final boolean dropCacheMajor; + protected final boolean dropCacheMinor; - // In compaction process only a single thread will access and write to this field, and - // getCompactionTargets is the only place we will access it other than the compaction thread, so - // make it volatile. - protected volatile T writer = null; + // We track progress per request using the CompactionRequestImpl identity as key. + // completeCompaction() cleans up this state. + private final Set progressSet = + Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); - //TODO: depending on Store is not good but, realistically, all compactors currently do. + // TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(Configuration conf, HStore store) { this.conf = conf; this.store = store; @@ -116,15 +119,9 @@ public abstract class Compactor { this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); } - - protected interface CellSinkFactory { - S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major) - throws IOException; - } - - public CompactionProgress getProgress() { - return this.progress; + S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major, + Consumer writerCreationTracker) throws IOException; } /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ @@ -271,12 +268,13 @@ public InternalScanner createScanner(ScanInfo scanInfo, List s }; protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind, - boolean major) { + boolean major, Consumer writerCreationTracker) { return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount) .compression(major ? majorCompactionCompression : minorCompactionCompression) .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0) .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind) - .totalCompactedFilesSize(fd.totalCompactedFilesSize); + .totalCompactedFilesSize(fd.totalCompactedFilesSize) + .writerCreationTracker(writerCreationTracker); } /** @@ -286,16 +284,20 @@ protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean * @throws IOException if creation failed */ protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, - boolean major) throws IOException { + boolean major, Consumer writerCreationTracker) throws IOException { // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. - return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major)); + return store.getStoreEngine() + .createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)); } protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, - String fileStoragePolicy, boolean major) throws IOException { + String fileStoragePolicy, boolean major, Consumer writerCreationTracker) + throws IOException { return store.getStoreEngine() - .createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy)); + .createWriter( + createParams(fd, shouldDropBehind, major, writerCreationTracker) + .fileStoragePolicy(fileStoragePolicy)); } private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, @@ -327,7 +329,6 @@ protected final List compact(final CompactionRequestImpl request, InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, ThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); - this.progress = new CompactionProgress(fd.maxKeyCount); // Find the smallest read point across all the Scanners. long smallestReadPoint = getSmallestReadPoint(); @@ -343,6 +344,9 @@ protected final List compact(final CompactionRequestImpl request, boolean finished = false; List scanners = createFileScanners(request.getFiles(), smallestReadPoint, dropCache); + T writer = null; + CompactionProgress progress = new CompactionProgress(fd.maxKeyCount); + progressSet.add(progress); try { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = scannerFactory.getScanType(request); @@ -355,14 +359,14 @@ protected final List compact(final CompactionRequestImpl request, smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } - if (writer != null){ - LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream() - .map(n -> n.toString()) - .collect(Collectors.joining(", ", "{ ", " }"))); - } - writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor()); - finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId, - throughputController, request.isAllFiles(), request.getFiles().size()); + writer = sinkFactory.createWriter( + scanner, + fd, + dropCache, + request.isMajor(), + request.getWriterCreationTracker()); + finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request.isAllFiles(), request.getFiles().size(), progress); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); @@ -380,34 +384,41 @@ protected final List compact(final CompactionRequestImpl request, } else { Closeables.close(scanner, true); } - if (!finished && writer != null) { - abortWriter(); + if (!finished) { + if (writer != null) { + abortWriter(writer); + } + } else { + store.updateCompactedMetrics(request.isMajor(), progress); } + progressSet.remove(progress); } assert finished : "We should have exited the method on all error paths"; assert writer != null : "Writer should be non-null if no error"; - return commitWriter(fd, request); + return commitWriter(writer, fd, request); } - protected abstract List commitWriter(FileDetails fd, + protected abstract List commitWriter(T writer, FileDetails fd, CompactionRequestImpl request) throws IOException; - protected abstract void abortWriter() throws IOException; + protected abstract void abortWriter(T writer) throws IOException; /** * Performs the compaction. * @param fd FileDetails of cell sink writer * @param scanner Where to read from. + * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= * smallestReadPoint * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param progress Progress reporter. * @return Whether compaction ended; false if it was interrupted for some reason. */ - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException { assert writer instanceof ShipperListener; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; @@ -549,22 +560,27 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, dropDeletesFromRow, dropDeletesToRow); } - public List getCompactionTargets() { - T writer = this.writer; - if (writer == null) { - return Collections.emptyList(); - } - if (writer instanceof StoreFileWriter) { - return Arrays.asList(((StoreFileWriter) writer).getPath()); + /** + * Return the aggregate progress for all currently active compactions. + */ + public CompactionProgress getProgress() { + synchronized (progressSet) { + long totalCompactingKVs = 0; + long currentCompactedKVs = 0; + long totalCompactedSize = 0; + for (CompactionProgress progress: progressSet) { + totalCompactingKVs += progress.totalCompactingKVs; + currentCompactedKVs += progress.currentCompactedKVs; + totalCompactedSize += progress.totalCompactedSize; + } + CompactionProgress result = new CompactionProgress(totalCompactingKVs); + result.currentCompactedKVs = currentCompactedKVs; + result.totalCompactedSize = totalCompactedSize; + return result; } - return ((AbstractMultiFileWriter) writer).writers().stream().map(sfw -> sfw.getPath()) - .collect(Collectors.toList()); } - /** - * Reset the Writer when the new storefiles were successfully added - */ - public void resetWriter(){ - writer = null; + public boolean isCompacting() { + return !progressSet.isEmpty(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index 43e037c5e702..c8c10e16ff19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.Map; import java.util.OptionalLong; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; @@ -68,21 +68,26 @@ public List compact(final CompactionRequestImpl request, final List @Override public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { - DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries, - lowerBoundariesPolicies, - needEmptyFile(request)); - initMultiWriter(writer, scanner, fd, shouldDropBehind, major); + boolean shouldDropBehind, boolean major, Consumer writerCreationTracker) + throws IOException { + DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter( + lowerBoundaries, + lowerBoundariesPolicies, + needEmptyFile(request)); + initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker); return writer; } - }, throughputController, user); + }, + throughputController, + user); } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List pathList = writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); return pathList; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 03e3a1b5f394..0e91d8870b6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.List; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.HStore; @@ -47,10 +48,11 @@ public DefaultCompactor(Configuration conf, HStore store) { private final CellSinkFactory writerFactory = new CellSinkFactory() { @Override - public StoreFileWriter createWriter(InternalScanner scanner, - org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { - return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major); + public StoreFileWriter createWriter(InternalScanner scanner, FileDetails fd, + boolean shouldDropBehind, boolean major, Consumer writerCreationTracker) + throws IOException { + return DefaultCompactor.this + .createWriter(fd, shouldDropBehind, major, writerCreationTracker); } }; @@ -63,7 +65,7 @@ public List compact(final CompactionRequestImpl request, } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); @@ -72,12 +74,6 @@ protected List commitWriter(FileDetails fd, } @Override - protected void abortWriter() throws IOException { - abortWriter(writer); - // this step signals that the target file is no longer written and can be cleaned up - writer = null; - } - protected final void abortWriter(StoreFileWriter writer) throws IOException { Path leftoverFile = writer.getPath(); try { @@ -92,4 +88,5 @@ protected final void abortWriter(StoreFileWriter writer) throws IOException { leftoverFile, e); } } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 060a11b41fe6..6413a304d55d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -19,7 +19,7 @@ import java.io.IOException; import java.util.List; - +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.HStore; @@ -88,18 +88,26 @@ public List compact(CompactionRequestImpl request, final List targ } LOG.debug(sb.toString()); } - return compact(request, new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow), + return compact( + request, + new StripeInternalScannerFactory(majorRangeFromRow, majorRangeToRow), new CellSinkFactory() { @Override public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { + boolean shouldDropBehind, boolean major, Consumer writerCreationTracker) + throws IOException { StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( - store.getComparator(), targetBoundaries, majorRangeFromRow, majorRangeToRow); - initMultiWriter(writer, scanner, fd, shouldDropBehind, major); + store.getComparator(), + targetBoundaries, + majorRangeFromRow, + majorRangeToRow); + initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker); return writer; } - }, throughputController, user); + }, + throughputController, + user); } public List compact(CompactionRequestImpl request, final int targetCount, final long targetSize, @@ -115,20 +123,28 @@ public List compact(CompactionRequestImpl request, final int targetCount, @Override public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { + boolean shouldDropBehind, boolean major, Consumer writerCreationTracker) + throws IOException { StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( - store.getComparator(), targetCount, targetSize, left, right); - initMultiWriter(writer, scanner, fd, shouldDropBehind, major); + store.getComparator(), + targetCount, + targetSize, + left, + right); + initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker); return writer; } - }, throughputController, user); + }, + throughputController, + user); } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StripeMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles()); assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; return newFiles; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java index 1bf354f00a0f..f3e626707960 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java @@ -177,11 +177,15 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) th } StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) - .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) - .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) - .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) + .withOutputDir(outputDir) + .withBloomType(ctx.getBloomFilterType()) + .withMaxKeyCount(params.maxKeyCount()) + .withFavoredNodes(ctx.getFavoredNodes()) + .withFileContext(hFileContext) + .withShouldDropCacheBehind(params.shouldDropBehind()) .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) - .withFileStoragePolicy(params.fileStoragePolicy()); + .withFileStoragePolicy(params.fileStoragePolicy()) + .withWriterCreationTracker(params.writerCreationTracker()); return builder.build(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java index 1b76c52cd6e3..eaf406167113 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java @@ -128,13 +128,14 @@ public MyCompactor(Configuration conf, HStore store) { } @Override - protected List commitWriter(FileDetails fd, + protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer; Cell cell = writerImpl.getLastCell(); // The cell should be backend with an KeyOnlyKeyValue. IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue); - return super.commitWriter(fd, request); + return super.commitWriter(writer, fd, request); } } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index ea7ff61986b7..80dbe28af3d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import java.util.function.IntBinaryOperator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -97,6 +98,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl; import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; @@ -2491,9 +2493,10 @@ public MyDefaultStoreFlusher(Configuration conf, HStore store) { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + FlushLifeCycleTracker tracker, Consumer writerCreationTracker) throws IOException { counter.incrementAndGet(); - return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); + return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, + writerCreationTracker); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index dc699a62c0c8..b9cf81f51df9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -211,26 +209,9 @@ private void majorCompaction() throws Exception { Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).readVersions(100)); assertEquals(compactionThreshold, result.size()); - // see if CompactionProgress is in place but null - for (HStore store : r.getStores()) { - assertNull(store.getCompactionProgress()); - } - r.flush(true); r.compact(true); - // see if CompactionProgress has done its thing on at least one store - int storeCount = 0; - for (HStore store : r.getStores()) { - CompactionProgress progress = store.getCompactionProgress(); - if (progress != null) { - ++storeCount; - assertTrue(progress.currentCompactedKVs > 0); - assertTrue(progress.getTotalCompactingKVs() > 0); - } - assertTrue(storeCount > 0); - } - // look at the second row // Increment the least significant character so we get to next row. byte[] secondRowBytes = START_KEY_BYTES.clone(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 8738b9a554f6..afff1a683891 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -92,7 +92,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -118,11 +117,9 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; @@ -621,17 +618,11 @@ private void clearReferences(HRegion region) throws IOException { assertEquals(1, region.getStores().size()); HStore store = region.getStores().get(0); while (store.hasReferences()) { - // Wait on any current compaction to complete first. - CompactionProgress progress = store.getCompactionProgress(); - if (progress != null && progress.getProgressPct() < 1.0f) { - while (progress.getProgressPct() < 1.0f) { - LOG.info("Waiting, progress={}", progress.getProgressPct()); - Threads.sleep(1000); - } - } else { - // Run new compaction. Shoudn't be any others running. - region.compact(true); + while (store.storeEngine.getCompactor().isCompacting()) { + Threads.sleep(100); } + // Run new compaction. Shoudn't be any others running. + region.compact(true); store.closeAndArchiveCompactedFiles(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index fd5404c4242b..0f18b16fe486 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -43,6 +43,7 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -67,8 +68,6 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; @@ -654,11 +653,12 @@ public CustomStoreFlusher(Configuration conf, HStore store) { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { + FlushLifeCycleTracker tracker, Consumer writerCreationTracker) throws IOException { if (throwExceptionWhenFlushing.get()) { throw new IOException("Simulated exception by tests"); } - return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); + return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker, + writerCreationTracker); } };