From 6e053765e8141f6ad807325828c520b232d2600f Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Wed, 8 Sep 2021 10:31:49 +0100 Subject: [PATCH] HBASE-26079 Use StoreFileTracker when splitting and merging (#3617) Signed-off-by: Duo Zhang --- .../MergeTableRegionsProcedure.java | 22 +- .../assignment/SplitTableRegionProcedure.java | 42 +-- .../hbase/regionserver/HRegionFileSystem.java | 42 ++- .../DefaultStoreFileTracker.java | 4 +- .../storefiletracker/StoreFileTracker.java | 1 - .../StoreFileTrackerFactory.java | 33 ++- .../regionserver/TestDefaultStoreEngine.java | 1 + .../TestDirectStoreSplitsMerges.java | 32 ++- .../hbase/regionserver/TestHStoreFile.java | 19 +- .../TestMergesSplitsAddToTracker.java | 262 ++++++++++++++++++ .../regionserver/TestStripeStoreEngine.java | 1 + .../TestStoreFileTracker.java | 56 ++++ 12 files changed, 471 insertions(+), 44 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index da3d73ea852d..e6bbe445d077 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.List; import java.util.stream.Stream; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -56,6 +55,8 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.wal.WALSplitUtil; @@ -587,30 +588,35 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable()); final FileSystem fs = mfs.getFileSystem(); - + List mergedFiles = new ArrayList<>(); HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem( env.getMasterConfiguration(), fs, tableDir, mergedRegion); for (RegionInfo ri: this.regionsToMerge) { HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( env.getMasterConfiguration(), fs, tableDir, ri, false); - mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion); + mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion)); } assert mergeRegionFs != null; - mergeRegionFs.commitMergedRegion(); + mergeRegionFs.commitMergedRegion(mergedFiles, env); // Prepare to create merged regions env.getAssignmentManager().getRegionStates(). getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW); } - private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs, + private List mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs, HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException { final TableDescriptor htd = env.getMasterServices().getTableDescriptors() .get(mergedRegion.getTable()); + List mergedFiles = new ArrayList<>(); for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) { String family = hcd.getNameAsString(); - final Collection storeFiles = regionFs.getStoreFiles(family); + Configuration trackerConfig = + StoreFileTrackerFactory.mergeConfigurations(env.getMasterConfiguration(), htd, hcd); + StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true, + family, regionFs); + final Collection storeFiles = tracker.load(); if (storeFiles != null && storeFiles.size() > 0) { final Configuration storeConfiguration = StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd); @@ -622,11 +628,13 @@ private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs, // is running in a regionserver's Store context, or we might not be able // to read the hfiles. storeFileInfo.setConf(storeConfiguration); - mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family, + Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family, new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED)); + mergedFiles.add(refFile); } } } + return mergedFiles; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index fbd87290d8c2..ff16dc5514b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -33,7 +33,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -66,6 +65,8 @@ import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -621,21 +622,20 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti final FileSystem fs = mfs.getFileSystem(); HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false); - regionFs.createSplitsDir(daughterOneRI, daughterTwoRI); - Pair expectedReferences = splitStoreFiles(env, regionFs); + Pair, List> expectedReferences = splitStoreFiles(env, regionFs); - assertSplitResultFilesCount(fs, expectedReferences.getFirst(), + assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(), regionFs.getSplitsDir(daughterOneRI)); - regionFs.commitDaughterRegion(daughterOneRI); - assertSplitResultFilesCount(fs, expectedReferences.getFirst(), + regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env); + assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(), new Path(tabledir, daughterOneRI.getEncodedName())); - assertSplitResultFilesCount(fs, expectedReferences.getSecond(), + assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(), regionFs.getSplitsDir(daughterTwoRI)); - regionFs.commitDaughterRegion(daughterTwoRI); - assertSplitResultFilesCount(fs, expectedReferences.getSecond(), + regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env); + assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(), new Path(tabledir, daughterTwoRI.getEncodedName())); } @@ -652,7 +652,7 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept * Create Split directory * @param env MasterProcedureEnv */ - private Pair splitStoreFiles(final MasterProcedureEnv env, + private Pair, List> splitStoreFiles(final MasterProcedureEnv env, final HRegionFileSystem regionFs) throws IOException { final Configuration conf = env.getMasterConfiguration(); TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); @@ -668,7 +668,11 @@ private Pair splitStoreFiles(final MasterProcedureEnv env, new HashMap>(htd.getColumnFamilyCount()); for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) { String family = cfd.getNameAsString(); - Collection sfis = regionFs.getStoreFiles(family); + Configuration trackerConfig = StoreFileTrackerFactory. + mergeConfigurations(env.getMasterConfiguration(), htd, htd.getColumnFamily(cfd.getName())); + StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true, + family, regionFs); + Collection sfis = tracker.load(); if (sfis == null) { continue; } @@ -694,7 +698,7 @@ private Pair splitStoreFiles(final MasterProcedureEnv env, } if (nbFiles == 0) { // no file needs to be splitted. - return new Pair(0, 0); + return new Pair<>(Collections.emptyList(), Collections.emptyList()); } // Max #threads is the smaller of the number of storefiles or the default max determined above. int maxThreads = Math.min( @@ -752,14 +756,18 @@ private Pair splitStoreFiles(final MasterProcedureEnv env, throw (InterruptedIOException) new InterruptedIOException().initCause(e); } - int daughterA = 0; - int daughterB = 0; + List daughterA = new ArrayList<>(); + List daughterB = new ArrayList<>(); // Look for any exception for (Future> future : futures) { try { Pair p = future.get(); - daughterA += p.getFirst() != null ? 1 : 0; - daughterB += p.getSecond() != null ? 1 : 0; + if(p.getFirst() != null){ + daughterA.add(p.getFirst()); + } + if(p.getSecond() != null){ + daughterB.add(p.getSecond()); + } } catch (InterruptedException e) { throw (InterruptedIOException) new InterruptedIOException().initCause(e); } catch (ExecutionException e) { @@ -772,7 +780,7 @@ private Pair splitStoreFiles(final MasterProcedureEnv env, getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA + " storefiles, Daughter B: " + daughterB + " storefiles."); } - return new Pair(daughterA, daughterB); + return new Pair<>(daughterA, daughterB); } private void assertSplitResultFilesCount(final FileSystem fs, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 2f5f8d7e34cd..cb30432c38b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -24,7 +24,9 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; @@ -49,6 +51,9 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; @@ -595,19 +600,46 @@ void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException { * @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo} * @throws IOException */ - public Path commitDaughterRegion(final RegionInfo regionInfo) - throws IOException { + public Path commitDaughterRegion(final RegionInfo regionInfo, List allRegionFiles, + MasterProcedureEnv env) throws IOException { Path regionDir = this.getSplitsDir(regionInfo); if (fs.exists(regionDir)) { // Write HRI to a file in case we need to recover hbase:meta Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE); byte[] regionInfoContent = getRegionInfoFileContent(regionInfo); writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent); + HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( + env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false); + insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs); } - return regionDir; } + private void insertRegionFilesIntoStoreTracker(List allFiles, MasterProcedureEnv env, + HRegionFileSystem regionFs) throws IOException { + TableDescriptor tblDesc = env.getMasterServices().getTableDescriptors(). + get(regionInfo.getTable()); + //we need to map trackers per store + Map trackerMap = new HashMap<>(); + //we need to map store files per store + Map> fileInfoMap = new HashMap<>(); + for(Path file : allFiles) { + String familyName = file.getParent().getName(); + trackerMap.computeIfAbsent(familyName, t -> { + Configuration config = StoreFileTrackerFactory.mergeConfigurations(conf, tblDesc, + tblDesc.getColumnFamily(Bytes.toBytes(familyName))); + return StoreFileTrackerFactory. + create(config, true, familyName, regionFs); + }); + fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>()); + List infos = fileInfoMap.get(familyName); + infos.add(new StoreFileInfo(conf, fs, file, true)); + } + for(Map.Entry entry : trackerMap.entrySet()) { + entry.getValue().add(fileInfoMap.get(entry.getKey())); + } + } + /** * Creates region split daughter directories under the table dir. If the daughter regions already * exist, for example, in the case of a recovery from a previous failed split procedure, this @@ -795,13 +827,15 @@ public Path mergeStoreFile(RegionInfo mergingRegion, String familyName, HStoreFi * Commit a merged region, making it ready for use. * @throws IOException */ - public void commitMergedRegion() throws IOException { + public void commitMergedRegion(List allMergedFiles, MasterProcedureEnv env) + throws IOException { Path regionDir = getMergesDir(regionInfoForFs); if (regionDir != null && fs.exists(regionDir)) { // Write HRI to a file in case we need to recover hbase:meta Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE); byte[] regionInfoContent = getRegionInfoFileContent(regionInfo); writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent); + insertRegionFilesIntoStoreTracker(allMergedFiles, env, this); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java index fa044818336f..22e05132bf91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.List; import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hbase.regionserver.StoreContext; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.yetus.audience.InterfaceAudience; @@ -32,8 +33,7 @@ @InterfaceAudience.Private class DefaultStoreFileTracker extends StoreFileTrackerBase { - public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, - StoreContext ctx) { + public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { super(conf, isPrimaryReplica, ctx); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java index aadedc8ef727..0a85abb00a4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java @@ -48,7 +48,6 @@ */ @InterfaceAudience.Private public interface StoreFileTracker { - /** * Load the store files list when opening a region. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java index 6cdfaf4a0fe1..c446d5ae9a31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java @@ -18,22 +18,51 @@ package org.apache.hadoop.hbase.regionserver.storefiletracker; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreContext; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Factory method for creating store file tracker. */ @InterfaceAudience.Private public final class StoreFileTrackerFactory { - public static final String TRACK_IMPL = "hbase.store.file-tracker.impl"; + private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class); public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, - StoreContext ctx) { + StoreContext ctx) { Class tracker = conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class); + LOG.info("instantiating StoreFileTracker impl {}", tracker.getName()); return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx); } + + public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family, + HRegionFileSystem regionFs) { + ColumnFamilyDescriptorBuilder fDescBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)); + StoreContext ctx = StoreContext.getBuilder(). + withColumnFamilyDescriptor(fDescBuilder.build()). + withRegionFileSystem(regionFs). + build(); + return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx); + } + + public static Configuration mergeConfigurations(Configuration global, + TableDescriptor table, ColumnFamilyDescriptor family) { + return new CompoundConfiguration() + .add(global) + .addBytesMap(table.getValues()) + .addStringMap(family.getConfiguration()) + .addBytesMap(family.getValues()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java index 3784876a59f3..523f27782362 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java @@ -67,6 +67,7 @@ public void testCustomParts() throws Exception { DummyStoreFlusher.class.getName()); HRegion mockRegion = Mockito.mock(HRegion.class); HStore mockStore = Mockito.mock(HStore.class); + mockStore.conf = conf; Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO); Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion); StoreEngine se = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreSplitsMerges.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreSplitsMerges.java index bd24f1b22293..0eba8aa541ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreSplitsMerges.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDirectStoreSplitsMerges.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.Path; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -139,7 +141,9 @@ public void testCommitDaughterRegionNoFiles() throws Exception { setRegionId(region.getRegionInfo().getRegionId() + EnvironmentEdgeManager.currentTime()).build(); Path splitDir = regionFS.getSplitsDir(daughterA); - Path result = regionFS.commitDaughterRegion(daughterA); + MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster(). + getMasterProcedureExecutor().getEnvironment(); + Path result = regionFS.commitDaughterRegion(daughterA, new ArrayList<>(), env); assertEquals(splitDir, result); } @@ -162,14 +166,18 @@ public void testCommitDaughterRegionWithFiles() throws Exception { Path splitDirA = regionFS.getSplitsDir(daughterA); Path splitDirB = regionFS.getSplitsDir(daughterB); HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0]; - regionFS + List filesA = new ArrayList<>(); + filesA.add(regionFS .splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file, - Bytes.toBytes("002"), false, region.getSplitPolicy()); - regionFS + Bytes.toBytes("002"), false, region.getSplitPolicy())); + List filesB = new ArrayList<>(); + filesB.add(regionFS .splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file, - Bytes.toBytes("002"), true, region.getSplitPolicy()); - Path resultA = regionFS.commitDaughterRegion(daughterA); - Path resultB = regionFS.commitDaughterRegion(daughterB); + Bytes.toBytes("002"), true, region.getSplitPolicy())); + MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster(). + getMasterProcedureExecutor().getEnvironment(); + Path resultA = regionFS.commitDaughterRegion(daughterA, filesA, env); + Path resultB = regionFS.commitDaughterRegion(daughterB, filesB, env); assertEquals(splitDirA, resultA); assertEquals(splitDirB, resultB); } @@ -203,8 +211,11 @@ public void testCommitMergedRegion() throws Exception { mergeFileFromRegion(mergeRegionFs, first, file); //merge file from second region file = (HStoreFile) second.getStore(FAMILY_NAME).getStorefiles().toArray()[0]; - mergeFileFromRegion(mergeRegionFs, second, file); - mergeRegionFs.commitMergedRegion(); + List mergedFiles = new ArrayList<>(); + mergedFiles.add(mergeFileFromRegion(mergeRegionFs, second, file)); + MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster(). + getMasterProcedureExecutor().getEnvironment(); + mergeRegionFs.commitMergedRegion(mergedFiles, env); } private void waitForSplitProcComplete(int attempts, int waitTime) throws Exception { @@ -223,11 +234,12 @@ private void waitForSplitProcComplete(int attempts, int waitTime) throws Excepti } } - private void mergeFileFromRegion(HRegionFileSystem regionFS, HRegion regionToMerge, + private Path mergeFileFromRegion(HRegionFileSystem regionFS, HRegion regionToMerge, HStoreFile file) throws IOException { Path mergedFile = regionFS.mergeStoreFile(regionToMerge.getRegionInfo(), Bytes.toString(FAMILY_NAME), file); validateResultingFile(regionToMerge.getRegionInfo().getEncodedName(), mergedFile); + return mergedFile; } private void validateResultingFile(String originalRegion, Path result){ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java index cdef341965b8..394e62d556f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -49,12 +50,14 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; @@ -69,6 +72,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.ReaderContext; import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.BloomFilterFactory; @@ -1060,7 +1065,19 @@ private Path splitStoreFile(final HRegionFileSystem regionFs, final RegionInfo h if (null == path) { return null; } - Path regionDir = regionFs.commitDaughterRegion(hri); + List splitFiles = new ArrayList<>(); + splitFiles.add(path); + MasterProcedureEnv mockEnv = mock(MasterProcedureEnv.class); + MasterServices mockServices = mock(MasterServices.class); + when(mockEnv.getMasterServices()).thenReturn(mockServices); + when(mockEnv.getMasterConfiguration()).thenReturn(new Configuration()); + TableDescriptors mockTblDescs = mock(TableDescriptors.class); + when(mockServices.getTableDescriptors()).thenReturn(mockTblDescs); + TableDescriptor mockTblDesc = mock(TableDescriptor.class); + when(mockTblDescs.get(any())).thenReturn(mockTblDesc); + ColumnFamilyDescriptor mockCfDesc = mock(ColumnFamilyDescriptor.class); + when(mockTblDesc.getColumnFamily(any())).thenReturn(mockCfDesc); + Path regionDir = regionFs.commitDaughterRegion(hri, splitFiles, mockEnv); return new Path(new Path(regionDir, family), path.getName()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java new file mode 100644 index 000000000000..c6205cb18493 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory. + TRACK_IMPL; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + + +@Category({RegionServerTests.class, LargeTests.class}) +public class TestMergesSplitsAddToTracker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMergesSplitsAddToTracker.class); + + private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setupClass() throws Exception { + TEST_UTIL.getConfiguration().set(TRACK_IMPL, TestStoreFileTracker.class.getName()); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setup(){ + TestStoreFileTracker.trackedFiles = new HashMap<>(); + } + + @Test + public void testCommitDaughterRegion() throws Exception { + TableName table = TableName.valueOf(name.getMethodName()); + TEST_UTIL.createTable(table, FAMILY_NAME); + //first put some data in order to have a store file created + putThreeRowsAndFlush(table); + HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0); + HRegionFileSystem regionFS = region.getStores().get(0).getRegionFileSystem(); + RegionInfo daughterA = + RegionInfoBuilder.newBuilder(table).setStartKey(region.getRegionInfo().getStartKey()). + setEndKey(Bytes.toBytes("002")).setSplit(false). + setRegionId(region.getRegionInfo().getRegionId() + + EnvironmentEdgeManager.currentTime()). + build(); + RegionInfo daughterB = RegionInfoBuilder.newBuilder(table).setStartKey(Bytes.toBytes("002")) + .setEndKey(region.getRegionInfo().getEndKey()).setSplit(false) + .setRegionId(region.getRegionInfo().getRegionId()).build(); + HStoreFile file = (HStoreFile) region.getStore(FAMILY_NAME).getStorefiles().toArray()[0]; + List splitFilesA = new ArrayList<>(); + splitFilesA.add(regionFS + .splitStoreFile(daughterA, Bytes.toString(FAMILY_NAME), file, + Bytes.toBytes("002"), false, region.getSplitPolicy())); + List splitFilesB = new ArrayList<>(); + splitFilesB.add(regionFS + .splitStoreFile(daughterB, Bytes.toString(FAMILY_NAME), file, + Bytes.toBytes("002"), true, region.getSplitPolicy())); + MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster(). + getMasterProcedureExecutor().getEnvironment(); + Path resultA = regionFS.commitDaughterRegion(daughterA, splitFilesA, env); + Path resultB = regionFS.commitDaughterRegion(daughterB, splitFilesB, env); + FileSystem fs = regionFS.getFileSystem(); + verifyFilesAreTracked(resultA, fs); + verifyFilesAreTracked(resultB, fs); + } + + @Test + public void testCommitMergedRegion() throws Exception { + TableName table = TableName.valueOf(name.getMethodName()); + TEST_UTIL.createTable(table, FAMILY_NAME); + //splitting the table first + TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002")); + //Add data and flush to create files in the two different regions + putThreeRowsAndFlush(table); + List regions = TEST_UTIL.getHBaseCluster().getRegions(table); + HRegion first = regions.get(0); + HRegion second = regions.get(1); + HRegionFileSystem regionFS = first.getRegionFileSystem(); + + RegionInfo mergeResult = + RegionInfoBuilder.newBuilder(table).setStartKey(first.getRegionInfo().getStartKey()) + .setEndKey(second.getRegionInfo().getEndKey()).setSplit(false) + .setRegionId(first.getRegionInfo().getRegionId() + + EnvironmentEdgeManager.currentTime()).build(); + + HRegionFileSystem mergeFS = HRegionFileSystem.createRegionOnFileSystem( + TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(), + regionFS.getFileSystem(), regionFS.getTableDir(), mergeResult); + + List mergedFiles = new ArrayList<>(); + //merge file from first region + mergedFiles.add(mergeFileFromRegion(first, mergeFS)); + //merge file from second region + mergedFiles.add(mergeFileFromRegion(second, mergeFS)); + MasterProcedureEnv env = TEST_UTIL.getMiniHBaseCluster().getMaster(). + getMasterProcedureExecutor().getEnvironment(); + mergeFS.commitMergedRegion(mergedFiles, env); + //validate + FileSystem fs = first.getRegionFileSystem().getFileSystem(); + Path finalMergeDir = new Path(first.getRegionFileSystem().getTableDir(), + mergeResult.getEncodedName()); + verifyFilesAreTracked(finalMergeDir, fs); + } + + @Test + public void testSplitLoadsFromTracker() throws Exception { + TableName table = TableName.valueOf(name.getMethodName()); + TEST_UTIL.createTable(table, FAMILY_NAME); + //Add data and flush to create files in the two different regions + putThreeRowsAndFlush(table); + HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0); + Pair copyResult = copyFileInTheStoreDir(region); + StoreFileInfo fileInfo = copyResult.getFirst(); + String copyName = copyResult.getSecond(); + //Now splits the region + TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002")); + List regions = TEST_UTIL.getHBaseCluster().getRegions(table); + HRegion first = regions.get(0); + validateDaughterRegionsFiles(first, fileInfo.getActiveFileName(), copyName); + HRegion second = regions.get(1); + validateDaughterRegionsFiles(second, fileInfo.getActiveFileName(), copyName); + } + + @Test + public void testMergeLoadsFromTracker() throws Exception { + TableName table = TableName.valueOf(name.getMethodName()); + TEST_UTIL.createTable(table, new byte[][]{FAMILY_NAME}, + new byte[][]{Bytes.toBytes("002")}); + //Add data and flush to create files in the two different regions + putThreeRowsAndFlush(table); + List regions = TEST_UTIL.getHBaseCluster().getRegions(table); + HRegion first = regions.get(0); + Pair copyResult = copyFileInTheStoreDir(first); + StoreFileInfo fileInfo = copyResult.getFirst(); + String copyName = copyResult.getSecond(); + //Now merges the first two regions + TEST_UTIL.getAdmin().mergeRegionsAsync(new byte[][]{ + first.getRegionInfo().getEncodedNameAsBytes(), + regions.get(1).getRegionInfo().getEncodedNameAsBytes() + }, true).get(10, TimeUnit.SECONDS); + regions = TEST_UTIL.getHBaseCluster().getRegions(table); + HRegion merged = regions.get(0); + validateDaughterRegionsFiles(merged, fileInfo.getActiveFileName(), copyName); + } + + private Pair copyFileInTheStoreDir(HRegion region) throws IOException { + Path storeDir = region.getRegionFileSystem().getStoreDir("info"); + //gets the single file + StoreFileInfo fileInfo = region.getRegionFileSystem().getStoreFiles("info").get(0); + //make a copy of the valid file staight into the store dir, so that it's not tracked. + String copyName = UUID.randomUUID().toString().replaceAll("-", ""); + Path copy = new Path(storeDir, copyName); + FileUtil.copy(region.getFilesystem(), fileInfo.getFileStatus(), region.getFilesystem(), + copy , false, false, TEST_UTIL.getConfiguration()); + return new Pair<>(fileInfo, copyName); + } + + private void validateDaughterRegionsFiles(HRegion region, String orignalFileName, + String untrackedFile) throws IOException { + //verify there's no link for the untracked, copied file in first region + List infos = region.getRegionFileSystem().getStoreFiles("info"); + final MutableBoolean foundLink = new MutableBoolean(false); + infos.stream().forEach(i -> { + i.getActiveFileName().contains(orignalFileName); + if(i.getActiveFileName().contains(untrackedFile)){ + fail(); + } + if(i.getActiveFileName().contains(orignalFileName)){ + foundLink.setTrue(); + } + }); + assertTrue(foundLink.booleanValue()); + } + + private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception { + String storeId = regionDir.getName() + "-info"; + for(FileStatus f : fs.listStatus(new Path(regionDir, Bytes.toString(FAMILY_NAME)))){ + assertTrue(TestStoreFileTracker.trackedFiles.get(storeId).stream().filter(s -> + s.getPath().equals(f.getPath())).findFirst().isPresent()); + } + } + + private Path mergeFileFromRegion(HRegion regionToMerge, HRegionFileSystem mergeFS) + throws IOException { + HStoreFile file = (HStoreFile) regionToMerge.getStore(FAMILY_NAME).getStorefiles().toArray()[0]; + return mergeFS.mergeStoreFile(regionToMerge.getRegionInfo(), Bytes.toString(FAMILY_NAME), file); + } + + private void putThreeRowsAndFlush(TableName table) throws IOException { + Table tbl = TEST_UTIL.getConnection().getTable(table); + Put put = new Put(Bytes.toBytes("001")); + byte[] qualifier = Bytes.toBytes("1"); + put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(1)); + tbl.put(put); + put = new Put(Bytes.toBytes("002")); + put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2)); + tbl.put(put); + put = new Put(Bytes.toBytes("003")); + put.addColumn(FAMILY_NAME, qualifier, Bytes.toBytes(2)); + tbl.put(put); + TEST_UTIL.flush(table); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index eb0b1c1ca694..80012dfcd461 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -120,6 +120,7 @@ private static HStoreFile createFile() throws Exception { private static TestStoreEngine createEngine(Configuration conf) throws Exception { HRegion region = mock(HRegion.class); HStore store = mock(HStore.class); + store.conf = conf; when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO); when(store.getHRegion()).thenReturn(region); CellComparatorImpl kvComparator = mock(CellComparatorImpl.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java new file mode 100644 index 000000000000..05ca1fcb419b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.storefiletracker; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.StoreContext; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestStoreFileTracker extends DefaultStoreFileTracker { + + private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileTracker.class); + public static Map> trackedFiles = new HashMap<>(); + private String storeId; + + public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { + super(conf, isPrimaryReplica, ctx); + this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString(); + LOG.info("created storeId: {}", storeId); + trackedFiles.computeIfAbsent(storeId, v -> new ArrayList<>()); + } + + @Override + protected void doAddNewStoreFiles(Collection newFiles) throws IOException { + LOG.info("adding to storeId: {}", storeId); + trackedFiles.get(storeId).addAll(newFiles); + } + + @Override + public List load() throws IOException { + return trackedFiles.get(storeId); + } +}