diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index fd1acd9a1363..a1a37ef746a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -299,4 +299,8 @@ public interface Store { * if you try to set a configuration. */ Configuration getReadOnlyConfiguration(); + + default String getName() { + return String.format("%s:%s", getRegionInfo().getEncodedName(), getColumnFamilyName()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java index 454b244fb512..502fd629a17e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -171,4 +173,14 @@ public static Configuration createStoreConfiguration(Configuration conf, TableDe return new CompoundConfiguration().add(conf).addBytesMap(td.getValues()) .addStringMap(cfd.getConfiguration()).addBytesMap(cfd.getValues()); } + + public static List filteredReferenceFiles(final Collection files) { + List referenceFiles = new ArrayList<>(); + for (HStoreFile sf : files) { + if (sf.isReference() || StoreFileInfo.isReference(sf.getPath())) { + referenceFiles.add(sf); + } + } + return referenceFiles; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java index 61deb0b93ce3..a1eb2b2a977d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java @@ -18,11 +18,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; /** * Configuration class for stripe store and compactions. @@ -65,6 +65,11 @@ public class StripeStoreConfig { public static final String MAX_REGION_SPLIT_IMBALANCE_KEY = "hbase.store.stripe.region.split.max.imbalance"; + /** + * Configure for enable priority select Reference files to compact in StripeCompactPolicy + */ + public static final String PRIORITY_COMPACT_REFERENCE_FILES_ENABLED = + "hbase.store.stripe.region.priority.compact.reference.files.enabled"; private final float maxRegionSplitImbalance; private final int level0CompactMinFiles; @@ -77,6 +82,8 @@ public class StripeStoreConfig { private final boolean flushIntoL0; private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount + private final boolean priorityCompactRefsEnabled; + private static final double EPSILON = 0.001; // good enough for this, not a real epsilon. public StripeStoreConfig(Configuration config, StoreConfigInformation sci) { this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4); @@ -109,6 +116,8 @@ public StripeStoreConfig(Configuration config, StoreConfigInformation sci) { } this.initialCount = initialCount; this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount); + this.priorityCompactRefsEnabled = + config.getBoolean(PRIORITY_COMPACT_REFERENCE_FILES_ENABLED, false); } private static float getFloat( @@ -163,4 +172,8 @@ public float getSplitCount() { public long getSplitPartSize() { return splitPartSize; } + + public boolean isPriorityCompactRefsEnabled() { + return priorityCompactRefsEnabled; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 14863a69a9b1..a824a9a03371 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -61,7 +61,7 @@ protected void createComponents( Configuration conf, HStore store, CellComparator comparator) throws IOException { this.config = new StripeStoreConfig(conf, store); this.compactionPolicy = new StripeCompactionPolicy(conf, store, config); - this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config); + this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config, store); this.storeFlusher = new StripeStoreFlusher( conf, store, this.compactionPolicy, this.storeFileManager); this.compactor = new StripeCompactor(conf, store); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index 1c3ac683dcd8..b200945c02c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -123,12 +123,19 @@ private static class State { private final int blockingFileCount; + private final String storeName; + public StripeStoreFileManager( - CellComparator kvComparator, Configuration conf, StripeStoreConfig config) { + CellComparator kvComparator, Configuration conf, StripeStoreConfig config, HStore store) { this.cellComparator = kvComparator; this.config = config; this.blockingFileCount = conf.getInt( HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); + if(store != null) { + storeName = store.getName(); + } else { + storeName = ""; + } } @Override @@ -136,6 +143,11 @@ public void loadFiles(List storeFiles) { loadUnclassifiedStoreFiles(storeFiles); } + @Override + public String getStoreName() { + return storeName; + } + @Override public Collection getStorefiles() { return state.allFilesCached; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 37095216497d..2b1bbaa013ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -41,6 +42,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; /** @@ -116,6 +118,87 @@ public StripeCompactionRequest selectCompaction(StripeInformationProvider si, // compact-all-things behavior. Collection allFiles = si.getStorefiles(); if (StoreUtils.hasReferences(allFiles)) { + if (config.isPriorityCompactRefsEnabled()) { + // try to only select reference files + LOG.info("There are references in the store {}, compact reference files only. ", + si.getStoreName()); + List l0References = StoreUtils.filteredReferenceFiles(si.getLevel0Files()); + if (!l0References.isEmpty()) { + boolean needSelect = needSelectFiles(l0References); + if (needSelect) { + // need select L0 reference file compaction means L0 is very large. + // if L0 reference is large, then we should compact large stripes first, to make sure + // the stripes will not too large after the large L0 compaction. + StripeCompactionRequest result = selectSingleStripeCompaction( + si, false, false, false); + if (result != null) { + LOG.debug("Performing one whole stripe split compaction after split, {}", + si.getStoreName()); + return result; + } + } + List toCompactL0Refs = needSelect ? + selectSimpleCompaction(l0References, false, false, true) : + l0References; + assert !toCompactL0Refs.isEmpty() : "To compact reference files should not be empty"; + String msg = ""; + if (LOG.isDebugEnabled()) { + msg = String.format("Compact L0 references only after split. %d store files, " + + "%d L0 files, %d reference files length %d, to " + + "compact %d reference files with length %d, store %s", allFiles.size(), + si.getLevel0Files().size(), l0References.size(), getTotalFileSize(l0References), + toCompactL0Refs.size(), getTotalFileSize(toCompactL0Refs), si.getStoreName()); + } + StripeCompactionRequest request; + if (si.getStripeCount() > 0) { + // do L0 reference compaction, will perform boundary compaction + LOG.debug(msg + ". Performing boundary compaction."); + request = + new BoundaryStripeCompactionRequest(toCompactL0Refs, si.getStripeBoundaries()); + } else { + // do L0 reference compaction, will perform split compaction + LOG.debug(msg + ". Performing split stripe compaction."); + long targetKvs = + estimateTargetKvs(toCompactL0Refs, config.getInitialCount()).getFirst(); + request = new SplitStripeCompactionRequest(toCompactL0Refs, OPEN_KEY, OPEN_KEY, targetKvs); + } + request.getRequest().setAfterSplit(true); + request.getRequest().setIsMajor(false, false); + return request; + } + // select reference files in a single stripe + int priorityStripe = getStripeIndexWithReferences(si); + if (priorityStripe != -1) { + LOG.debug("The stripe {} has reference files, select all files in this stripe to " + + "compact, store {}", priorityStripe, si.getStoreName()); + Collection priorityStripeFiles = si.getStripes().get(priorityStripe); + int targetCount = 1; + long targetKvs = Long.MAX_VALUE; + long toCompactSize = getTotalFileSize(priorityStripeFiles); + if (toCompactSize >= config.getSplitSize()) { + Pair estimate = + estimateTargetKvs(priorityStripeFiles, config.getSplitCount()); + targetCount = estimate.getSecond(); + targetKvs = estimate.getFirst(); + } + String splitString = + "; the stripe will be split into at most " + targetCount + " stripes with " + + targetKvs + " target KVs, toCompact files size is " + toCompactSize; + StripeCompactionRequest request = + new SplitStripeCompactionRequest(priorityStripeFiles, + si.getStartRow(priorityStripe), + si.getEndRow(priorityStripe), targetCount, targetKvs); + LOG.debug( + "Priority compact stripe {} all files, selecting {} files, " + " store {}", + priorityStripe, request.getRequest().getFiles().size(), + si.getStoreName() + splitString); + request.getRequest().setAfterSplit(true); + request.getRequest().setIsMajor(false, false); + return request; + } + } + + // the priority compact reference files is disabled, so compact all files after split LOG.debug("There are references in the store; compacting all files"); long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst(); SplitStripeCompactionRequest request = new SplitStripeCompactionRequest( @@ -162,6 +245,35 @@ public StripeCompactionRequest selectCompaction(StripeInformationProvider si, return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak); } + /** + * Check if the size or count of the participants is too large to select all + * for one compaction request + * @param participants participant store files + * @return True if need to select partial of the participants, or else False + */ + private boolean needSelectFiles(final List participants) { + return participants.size() > this.config.getStripeCompactMaxFiles() || + getTotalFileSize(participants) > comConf.getMaxCompactSize(); + } + + /** + * Get the index of the stripe who has reference files + * @param si the stripe information provider + * @return the index of a stripe, [0,n-1] + */ + private int getStripeIndexWithReferences(StripeInformationProvider si) { + ArrayList> stripeFiles = si.getStripes(); + for (int i = 0; i < stripeFiles.size(); ++i) { + ImmutableList oneStripeFiles = stripeFiles.get(i); + if (StoreUtils.hasReferences(oneStripeFiles)) { + LOG.debug("Stripe {} has references, endRow {}, store {}", i, si.getEndRow(i), + si.getStoreName()); + return i; + } + } + return -1; + } + public boolean needsCompactions(StripeInformationProvider si, List filesCompacting) { // Approximation on whether we need compaction. return filesCompacting.isEmpty() @@ -559,6 +671,12 @@ public void setMajorRangeFull() { /** The information about stripes that the policy needs to do its stuff */ public static interface StripeInformationProvider { + /** + * The store name, can be used in log print + * @return + */ + String getStoreName(); + public Collection getStorefiles(); /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java index 63db911f8b5d..73997054cb1c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java @@ -612,7 +612,7 @@ private static StripeStoreFileManager createManager( StripeStoreConfig config = new StripeStoreConfig( conf, Mockito.mock(StoreConfigInformation.class)); StripeStoreFileManager result = new StripeStoreFileManager(CellComparatorImpl.COMPARATOR, conf, - config); + config, null); result.loadFiles(sfs); return result; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 2ba15d1be18e..5f5b6a650816 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver.compactions; import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MAX_FILES_KEY; +import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.MIN_FILES_KEY; +import static org.apache.hadoop.hbase.regionserver.StripeStoreConfig.PRIORITY_COMPACT_REFERENCE_FILES_ENABLED; import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; import static org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY; import static org.junit.Assert.assertEquals; @@ -92,6 +94,7 @@ import org.mockito.ArgumentMatcher; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.ListUtils; @RunWith(Parameterized.class) @Category({RegionServerTests.class, MediumTests.class}) @@ -251,8 +254,63 @@ public void testWithReferences() throws Exception { assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles())); scr.execute(sc, NoLimitThroughputController.INSTANCE, null); verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), - aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), - any(), any()); + aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any()); + } + + @Test + public void testPriorityReferences() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(PRIORITY_COMPACT_REFERENCE_FILES_ENABLED, true); + StripeCompactionPolicy policy = createPolicy(conf); + StripeCompactor sc = mock(StripeCompactor.class); + HStoreFile ref = createFile(); + when(ref.isReference()).thenReturn(true); + StripeInformationProvider si = mock(StripeInformationProvider.class); + List mixed = al(ref, createFile()); + when(si.getStorefiles()).thenReturn(mixed); + when(si.getLevel0Files()).thenReturn(mixed); + Collection refs = al(ref); + + assertTrue(policy.needsCompactions(si, al())); + StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); + assertTrue(ListUtils.isEqualList(refs, scr.getRequest().getFiles())); + scr.execute(sc, NoLimitThroughputController.INSTANCE, null); + // this compaction request has no major range + verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), + aryEq(OPEN_KEY), any(), any(), any(), any()); + } + + @Test + public void testPrioritySelectReferences() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(PRIORITY_COMPACT_REFERENCE_FILES_ENABLED, true); + int compactFileCount = 2; + conf.setInt(MIN_FILES_KEY, compactFileCount); + conf.setInt(MAX_FILES_KEY,compactFileCount); + StripeCompactionPolicy policy = createPolicy(conf); + StripeCompactor sc = mock(StripeCompactor.class); + StripeInformationProvider si = mock(StripeInformationProvider.class); + List sfs = mockRefs(10, 10); + when(si.getStorefiles()).thenReturn(sfs); + when(si.getLevel0Files()).thenReturn(sfs); + + assertTrue(policy.needsCompactions(si, al())); + StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); + assertEquals(compactFileCount, scr.getRequest().getFiles().size()); + } + + private List mockRefs(int refCount, int otherCount) throws Exception { + List files = new ArrayList<>(refCount + otherCount); + for(int i = 0;i