Skip to content

Commit

Permalink
HBASE-26079 Use StoreFileTracker when splitting and merging (#3617)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
wchevreuil authored and joshelser committed Dec 22, 2021

Verified

This commit was signed with the committer’s verified signature.
chris-rock Christoph Hartmann
1 parent 43b40e9 commit 6e05376
Showing 12 changed files with 471 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -56,6 +55,8 @@
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
@@ -587,30 +588,35 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();

List<Path> mergedFiles = new ArrayList<>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem.createRegionOnFileSystem(
env.getMasterConfiguration(), fs, tableDir, mergedRegion);

for (RegionInfo ri: this.regionsToMerge) {
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tableDir, ri, false);
mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion);
mergedFiles.addAll(mergeStoreFiles(env, regionFs, mergeRegionFs, mergedRegion));
}
assert mergeRegionFs != null;
mergeRegionFs.commitMergedRegion();
mergeRegionFs.commitMergedRegion(mergedFiles, env);

// Prepare to create merged regions
env.getAssignmentManager().getRegionStates().
getOrCreateRegionStateNode(mergedRegion).setState(State.MERGING_NEW);
}

private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
final TableDescriptor htd = env.getMasterServices().getTableDescriptors()
.get(mergedRegion.getTable());
List<Path> mergedFiles = new ArrayList<>();
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
String family = hcd.getNameAsString();
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
Configuration trackerConfig =
StoreFileTrackerFactory.mergeConfigurations(env.getMasterConfiguration(), htd, hcd);
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
family, regionFs);
final Collection<StoreFileInfo> storeFiles = tracker.load();
if (storeFiles != null && storeFiles.size() > 0) {
final Configuration storeConfiguration =
StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd);
@@ -622,11 +628,13 @@ private void mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
// is running in a regionserver's Store context, or we might not be able
// to read the hfiles.
storeFileInfo.setConf(storeConfiguration);
mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
mergedFiles.add(refFile);
}
}
}
return mergedFiles;
}

/**
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -66,6 +65,8 @@
import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -621,21 +622,20 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
final FileSystem fs = mfs.getFileSystem();
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);

regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);

Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);

assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
regionFs.getSplitsDir(daughterOneRI));
regionFs.commitDaughterRegion(daughterOneRI);
assertSplitResultFilesCount(fs, expectedReferences.getFirst(),
regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
new Path(tabledir, daughterOneRI.getEncodedName()));

assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
regionFs.getSplitsDir(daughterTwoRI));
regionFs.commitDaughterRegion(daughterTwoRI);
assertSplitResultFilesCount(fs, expectedReferences.getSecond(),
regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
new Path(tabledir, daughterTwoRI.getEncodedName()));
}

@@ -652,7 +652,7 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept
* Create Split directory
* @param env MasterProcedureEnv
*/
private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
final HRegionFileSystem regionFs) throws IOException {
final Configuration conf = env.getMasterConfiguration();
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
@@ -668,7 +668,11 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
String family = cfd.getNameAsString();
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
Configuration trackerConfig = StoreFileTrackerFactory.
mergeConfigurations(env.getMasterConfiguration(), htd, htd.getColumnFamily(cfd.getName()));
StoreFileTracker tracker = StoreFileTrackerFactory.create(trackerConfig, true,
family, regionFs);
Collection<StoreFileInfo> sfis = tracker.load();
if (sfis == null) {
continue;
}
@@ -694,7 +698,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
}
if (nbFiles == 0) {
// no file needs to be splitted.
return new Pair<Integer, Integer>(0, 0);
return new Pair<>(Collections.emptyList(), Collections.emptyList());
}
// Max #threads is the smaller of the number of storefiles or the default max determined above.
int maxThreads = Math.min(
@@ -752,14 +756,18 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}

int daughterA = 0;
int daughterB = 0;
List<Path> daughterA = new ArrayList<>();
List<Path> daughterB = new ArrayList<>();
// Look for any exception
for (Future<Pair<Path, Path>> future : futures) {
try {
Pair<Path, Path> p = future.get();
daughterA += p.getFirst() != null ? 1 : 0;
daughterB += p.getSecond() != null ? 1 : 0;
if(p.getFirst() != null){
daughterA.add(p.getFirst());
}
if(p.getSecond() != null){
daughterB.add(p.getSecond());
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
@@ -772,7 +780,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA +
" storefiles, Daughter B: " + daughterB + " storefiles.");
}
return new Pair<Integer, Integer>(daughterA, daughterB);
return new Pair<>(daughterA, daughterB);
}

private void assertSplitResultFilesCount(final FileSystem fs,
Original file line number Diff line number Diff line change
@@ -24,7 +24,9 @@
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
@@ -49,6 +51,9 @@
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -595,19 +600,46 @@ void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException {
* @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
* @throws IOException
*/
public Path commitDaughterRegion(final RegionInfo regionInfo)
throws IOException {
public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegionFiles,
MasterProcedureEnv env) throws IOException {
Path regionDir = this.getSplitsDir(regionInfo);
if (fs.exists(regionDir)) {
// Write HRI to a file in case we need to recover hbase:meta
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
}

return regionDir;
}

private void insertRegionFilesIntoStoreTracker(List<Path> allFiles, MasterProcedureEnv env,
HRegionFileSystem regionFs) throws IOException {
TableDescriptor tblDesc = env.getMasterServices().getTableDescriptors().
get(regionInfo.getTable());
//we need to map trackers per store
Map<String, StoreFileTracker> trackerMap = new HashMap<>();
//we need to map store files per store
Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
for(Path file : allFiles) {
String familyName = file.getParent().getName();
trackerMap.computeIfAbsent(familyName, t -> {
Configuration config = StoreFileTrackerFactory.mergeConfigurations(conf, tblDesc,
tblDesc.getColumnFamily(Bytes.toBytes(familyName)));
return StoreFileTrackerFactory.
create(config, true, familyName, regionFs);
});
fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
List<StoreFileInfo> infos = fileInfoMap.get(familyName);
infos.add(new StoreFileInfo(conf, fs, file, true));
}
for(Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
entry.getValue().add(fileInfoMap.get(entry.getKey()));
}
}

/**
* Creates region split daughter directories under the table dir. If the daughter regions already
* exist, for example, in the case of a recovery from a previous failed split procedure, this
@@ -795,13 +827,15 @@ public Path mergeStoreFile(RegionInfo mergingRegion, String familyName, HStoreFi
* Commit a merged region, making it ready for use.
* @throws IOException
*/
public void commitMergedRegion() throws IOException {
public void commitMergedRegion(List<Path> allMergedFiles, MasterProcedureEnv env)
throws IOException {
Path regionDir = getMergesDir(regionInfoForFs);
if (regionDir != null && fs.exists(regionDir)) {
// Write HRI to a file in case we need to recover hbase:meta
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
insertRegionFilesIntoStoreTracker(allMergedFiles, env, this);
}
}

Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
@@ -32,8 +33,7 @@
@InterfaceAudience.Private
class DefaultStoreFileTracker extends StoreFileTrackerBase {

public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
super(conf, isPrimaryReplica, ctx);
}

Original file line number Diff line number Diff line change
@@ -48,7 +48,6 @@
*/
@InterfaceAudience.Private
public interface StoreFileTracker {

/**
* Load the store files list when opening a region.
*/
Original file line number Diff line number Diff line change
@@ -18,22 +18,51 @@
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Factory method for creating store file tracker.
*/
@InterfaceAudience.Private
public final class StoreFileTrackerFactory {

public static final String TRACK_IMPL = "hbase.store.file-tracker.impl";
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class);

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
StoreContext ctx) {
Class<? extends StoreFileTracker> tracker =
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
LOG.info("instantiating StoreFileTracker impl {}", tracker.getName());
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
}

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family,
HRegionFileSystem regionFs) {
ColumnFamilyDescriptorBuilder fDescBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
StoreContext ctx = StoreContext.getBuilder().
withColumnFamilyDescriptor(fDescBuilder.build()).
withRegionFileSystem(regionFs).
build();
return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
}

public static Configuration mergeConfigurations(Configuration global,
TableDescriptor table, ColumnFamilyDescriptor family) {
return new CompoundConfiguration()
.add(global)
.addBytesMap(table.getValues())
.addStringMap(family.getConfiguration())
.addBytesMap(family.getValues());
}
}
Original file line number Diff line number Diff line change
@@ -67,6 +67,7 @@ public void testCustomParts() throws Exception {
DummyStoreFlusher.class.getName());
HRegion mockRegion = Mockito.mock(HRegion.class);
HStore mockStore = Mockito.mock(HStore.class);
mockStore.conf = conf;
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
StoreEngine<?, ?, ?, ?> se =
Loading

0 comments on commit 6e05376

Please sign in to comment.