Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27997 Enhance prefetch executor to record region prefetch infor… #5339

Merged
merged 4 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,10 @@ option optimize_for = SPEED;


message PrefetchedHfileName {
map<string, bool> prefetched_files = 1;
map<string, RegionFileSizeMap> prefetched_files = 1;
}

message RegionFileSizeMap {
required string region_name = 1;
required uint64 region_prefetch_size = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public void run() {
block.release();
}
}
String regionName = getRegionName(path);
PrefetchExecutor.complete(regionName, path, offset);
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
if (LOG.isTraceEnabled()) {
Expand All @@ -93,13 +95,21 @@ public void run() {
LOG.warn("Close prefetch stream reader failed, path: " + path, e);
}
}
PrefetchExecutor.complete(path);
}
}
});
}
}

/*
* Get the region name for the given file path. A HFile is always kept under the <region>/<column
* family>/<hfile>. To find the region for a given hFile, just find the name of the grandparent
* directory.
*/
private static String getRegionName(Path path) {
return path.getParent().getParent().getName();
}

private static String getPathOffsetEndStr(final Path path, final long offset, final long end) {
return "path=" + path.toString() + ", offset=" + offset + ", end=" + end;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -38,6 +41,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,7 +57,16 @@ public final class PrefetchExecutor {
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
/** Set of files for which prefetch is completed */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
/**
* Map of region -> total size of the region prefetched on this region server. This is the total
* size of hFiles for this region prefetched on this region server
*/
private static Map<String, Long> regionPrefetchSizeMap = new ConcurrentHashMap<>();
/**
* Map of hFile -> Region -> File size. This map is used by the prefetch executor while caching or
* evicting individual hFiles.
*/
private static Map<String, Pair<String, Long>> prefetchCompleted = new HashMap<>();
/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
Expand Down Expand Up @@ -120,9 +133,30 @@ public static void request(Path path, Runnable runnable) {
}
}

public static void complete(Path path) {
private static void removeFileFromPrefetch(String hFileName) {
// Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted
if (prefetchCompleted.containsKey(hFileName)) {
Pair<String, Long> regionEntry = prefetchCompleted.get(hFileName);
String regionEncodedName = regionEntry.getFirst();
long filePrefetchedSize = regionEntry.getSecond();
LOG.debug("Removing file {} for region {}", hFileName, regionEncodedName);
regionPrefetchSizeMap.computeIfPresent(regionEncodedName,
(rn, pf) -> pf - filePrefetchedSize);
// If all the blocks for a region are evicted from the cache, remove the entry for that region
if (
regionPrefetchSizeMap.containsKey(regionEncodedName)
&& regionPrefetchSizeMap.get(regionEncodedName) == 0
) {
regionPrefetchSizeMap.remove(regionEncodedName);
}
}
prefetchCompleted.remove(hFileName);
}

public static void complete(final String regionName, Path path, long size) {
prefetchFutures.remove(path);
prefetchCompleted.put(path.getName(), true);
prefetchCompleted.put(path.getName(), new Pair<>(regionName, size));
regionPrefetchSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize);
LOG.debug("Prefetch completed for {}", path.getName());
}

Expand Down Expand Up @@ -173,11 +207,25 @@ public static void retrieveFromFile(String path) throws IOException {
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
PersistentPrefetchProtos.PrefetchedHfileName proto =
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
prefetchCompleted.putAll(protoPrefetchedFilesMap);
Map<String, PersistentPrefetchProtos.RegionFileSizeMap> protoPrefetchedFilesMap =
proto.getPrefetchedFilesMap();
prefetchCompleted.putAll(PrefetchProtoUtils.fromPB(protoPrefetchedFilesMap));
updateRegionSizeMapWhileRetrievingFromFile();
}
}

private static void updateRegionSizeMapWhileRetrievingFromFile() {
// Update the regionPrefetchedSizeMap with the region size while restarting the region server
LOG.debug("Updating region size map after retrieving prefetch file list");
prefetchCompleted.forEach((hFileName, hFileSize) -> {
// Get the region name for each file
String regionEncodedName = hFileSize.getFirst();
long filePrefetchSize = hFileSize.getSecond();
regionPrefetchSizeMap.merge(regionEncodedName, filePrefetchSize,
(oldpf, fileSize) -> oldpf + fileSize);
});
}

private static FileInputStream deleteFileOnClose(final File file) throws IOException {
return new FileInputStream(file) {
private File myFile;
Expand All @@ -203,13 +251,24 @@ public void close() throws IOException {
}

public static void removePrefetchedFileWhileEvict(String hfileName) {
prefetchCompleted.remove(hfileName);
removeFileFromPrefetch(hfileName);
}

public static boolean isFilePrefetched(String hfileName) {
return prefetchCompleted.containsKey(hfileName);
}

public static Map<String, Long> getRegionPrefetchInfo() {
return Collections.unmodifiableMap(regionPrefetchSizeMap);
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|PrefetchExecutor).java")
public static void reset() {
prefetchCompleted = new HashMap<>();
regionPrefetchSizeMap = new ConcurrentHashMap<>();
}

private PrefetchExecutor() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.util.Pair;

import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;

Expand All @@ -26,8 +28,26 @@ private PrefetchProtoUtils() {
}

static PersistentPrefetchProtos.PrefetchedHfileName
toPB(Map<String, Boolean> prefetchedHfileNames) {
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder()
.putAllPrefetchedFiles(prefetchedHfileNames).build();
toPB(Map<String, Pair<String, Long>> prefetchedHfileNames) {
Map<String, PersistentPrefetchProtos.RegionFileSizeMap> tmpMap = new HashMap<>();
prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> {
PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize =
PersistentPrefetchProtos.RegionFileSizeMap.newBuilder()
.setRegionName(regionPrefetchMap.getFirst())
.setRegionPrefetchSize(regionPrefetchMap.getSecond()).build();
tmpMap.put(hFileName, tmpRegionFileSize);
});
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap)
.build();
}

static Map<String, Pair<String, Long>>
fromPB(Map<String, PersistentPrefetchProtos.RegionFileSizeMap> prefetchHFileNames) {
Map<String, Pair<String, Long>> hFileMap = new HashMap<>();
prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> {
hFileMap.put(hFileName,
new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionPrefetchSize()));
});
return hFileMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -106,8 +107,8 @@ public void testPrefetchPersistenceCrash() throws Exception {
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs);
Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs);
Path storeFile = writeStoreFile("Region0", "TestPrefetch0", conf, cacheConf, fs);
Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
Thread.sleep(bucketCachePersistInterval);
Expand All @@ -126,7 +127,7 @@ public void testPrefetchPersistenceCrashNegative() throws Exception {
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
Path storeFile = writeStoreFile("Region2", "TestPrefetch2", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertFalse(new File(testDir + "/bucket.persistence").exists());
Expand All @@ -140,14 +141,18 @@ public void testPrefetchListUponBlockEviction() throws Exception {
CacheConfig cacheConf = new CacheConfig(conf, bucketCache1);
FileSystem fs = HFileSystem.get(conf);
// Load Blocks in cache
Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
Path storeFile = writeStoreFile("Region3", "TestPrefetch3", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1);
Thread.sleep(500);
// Evict Blocks from cache
BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey();
assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize > 0);
bucketCache1.evictBlock(bucketCacheKey);
assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
}

public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf,
Expand All @@ -172,9 +177,12 @@ public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheC
}
}

public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs)
throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
public Path writeStoreFile(String regionName, String fname, Configuration conf,
CacheConfig cacheConf, FileSystem fs) throws IOException {
// Create store files as per the following directory structure
// <region name>/<column family>/<hFile>
Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
Path storeFileParentDir = new Path(regionDir, fname);
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
Expand All @@ -190,6 +198,18 @@ public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheCo
}

sfw.close();

// Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region
// name to be added to the prefetch file list
Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
File regionInfoFile = new File(regionInfoFilePath.toString());
try {
if (!regionInfoFile.createNewFile()) {
assertFalse("Unable to create .regioninfo file", true);
}
} catch (IOException e) {
e.printStackTrace();
}
return sfw.getPath();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -123,14 +124,16 @@ public void testPrefetchPersistence() throws Exception {
assertEquals(0, usedSize);
assertTrue(new File(testDir + "/bucket.cache").exists());
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch0");
Path storeFile2 = writeStoreFile("TestPrefetch1");
Path storeFile = writeStoreFile("Region0", "TestPrefetch0");
Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1");
readStoreFile(storeFile, 0);
readStoreFile(storeFile2, 0);
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);

bucketCache.shutdown();
// Reset the info maintained in PrefetchExecutor
PrefetchExecutor.reset();
assertTrue(new File(testDir + "/bucket.persistence").exists());
assertTrue(new File(testDir + "/prefetch.persistence").exists());
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
Expand All @@ -149,8 +152,12 @@ public void testPrefetchPersistence() throws Exception {
public void closeStoreFile(Path path) throws Exception {
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
assertTrue(PrefetchExecutor.isFilePrefetched(path.getName()));
int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize > 0);
reader.close(true);
assertFalse(PrefetchExecutor.isFilePrefetched(path.getName()));
int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
}

public void readStoreFile(Path storeFilePath, long offset) throws Exception {
Expand All @@ -174,8 +181,11 @@ public void readStoreFile(Path storeFilePath, long offset) throws Exception {
}
}

public Path writeStoreFile(String fname) throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
public Path writeStoreFile(String regionName, String fname) throws IOException {
// Create store files as per the following directory structure
// <region name>/<column family>/<hFile>
Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
Path storeFileParentDir = new Path(regionDir, fname);
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
Expand All @@ -191,6 +201,19 @@ public Path writeStoreFile(String fname) throws IOException {
}

sfw.close();

// Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region name
// to be added to the prefetch file list
Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
File regionInfoFile = new File(regionInfoFilePath.toString());
LOG.info("Create file: {}", regionInfoFilePath);
try {
if (!regionInfoFile.exists() && !regionInfoFile.createNewFile()) {
assertFalse("Unable to create .regioninfo file", true);
}
} catch (IOException e) {
e.printStackTrace();
}
return sfw.getPath();
}

Expand Down