From b069f1feaba36069f40b1cb4da81635338245c44 Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Fri, 28 Jul 2023 19:44:27 +0530 Subject: [PATCH 1/4] HBASE-27997 Enhance prefetch executor to record region prefetch information along with the list of hfiles prefetched --- .../main/protobuf/PrefetchPersistence.proto | 6 +- .../hbase/io/hfile/HFilePreadReader.java | 12 +++- .../hbase/io/hfile/PrefetchExecutor.java | 69 +++++++++++++++++-- .../hbase/io/hfile/PrefetchProtoUtils.java | 23 ++++++- .../bucket/TestBucketCachePersister.java | 34 +++++++-- .../hfile/bucket/TestPrefetchPersistence.java | 31 +++++++-- 6 files changed, 153 insertions(+), 22 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto index d1a2b4cfd1b7..1237e27187f6 100644 --- a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto +++ b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto @@ -27,5 +27,9 @@ option optimize_for = SPEED; message PrefetchedHfileName { - map prefetched_files = 1; + map prefetched_files = 1; +} + +message RegionFileSizeMap { + map region_file_size = 1; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 2c71ce9f4842..91fe3066c1e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -77,6 +77,8 @@ public void run() { block.release(); } } + String regionName = getRegionName(path); + PrefetchExecutor.complete(regionName, path, offset); } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) if (LOG.isTraceEnabled()) { @@ -93,13 +95,21 @@ public void run() { LOG.warn("Close prefetch stream reader failed, path: " + path, e); } } - PrefetchExecutor.complete(path); } } }); } } + /* + * Get the region name for the given file path. A HFile is always kept under the //. To find the region for a given hFile, just find the name of the grandparent + * directory. + */ + private static String getRegionName(Path path) { + return path.getParent().getParent().getName(); + } + private static String getPathOffsetEndStr(final Path path, final long offset, final long end) { return "path=" + path.toString() + ", offset=" + offset + ", end=" + end; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index d3064e066a12..0af208a5b994 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hbase.io.hfile; +import com.google.errorprone.annotations.RestrictedApi; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -53,7 +55,8 @@ public final class PrefetchExecutor { private static final Map> prefetchFutures = new ConcurrentSkipListMap<>(); /** Set of files for which prefetch is completed */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") - private static HashMap prefetchCompleted = new HashMap<>(); + private static ConcurrentHashMap regionPrefetchSizeMap = new ConcurrentHashMap<>(); + private static HashMap> prefetchCompleted = new HashMap<>(); /** Executor pool shared among all HFiles for block prefetch */ private static final ScheduledExecutorService prefetchExecutorPool; /** Delay before beginning prefetch */ @@ -120,9 +123,35 @@ public static void request(Path path, Runnable runnable) { } } - public static void complete(Path path) { + private static void removeFileFromPrefetch(String hFileName) { + // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted + if (prefetchCompleted.containsKey(hFileName)) { + Map.Entry regionEntry = + prefetchCompleted.get(hFileName).entrySet().iterator().next(); + String regionEncodedName = regionEntry.getKey(); + long filePrefetchedSize = regionEntry.getValue(); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing file {} for region {}", hFileName, regionEncodedName); + } + regionPrefetchSizeMap.computeIfPresent(regionEncodedName, + (rn, pf) -> pf - filePrefetchedSize); + // If all the blocks for a region are evicted from the cache, remove the entry for that region + if ( + regionPrefetchSizeMap.containsKey(regionEncodedName) + && regionPrefetchSizeMap.get(regionEncodedName) == 0 + ) { + regionPrefetchSizeMap.remove(regionEncodedName); + } + } + prefetchCompleted.remove(hFileName); + } + + public static void complete(final String regionName, Path path, long size) { prefetchFutures.remove(path); - prefetchCompleted.put(path.getName(), true); + Map tmpMap = new HashMap<>(); + tmpMap.put(regionName, size); + prefetchCompleted.put(path.getName(), tmpMap); + regionPrefetchSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize); LOG.debug("Prefetch completed for {}", path.getName()); } @@ -173,11 +202,28 @@ public static void retrieveFromFile(String path) throws IOException { try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) { PersistentPrefetchProtos.PrefetchedHfileName proto = PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis); - Map protoPrefetchedFilesMap = proto.getPrefetchedFilesMap(); - prefetchCompleted.putAll(protoPrefetchedFilesMap); + Map protoPrefetchedFilesMap = + proto.getPrefetchedFilesMap(); + prefetchCompleted.putAll(PrefetchProtoUtils.fromPB(protoPrefetchedFilesMap)); + updateRegionSizeMapWhileRetrievingFromFile(); } } + private static void updateRegionSizeMapWhileRetrievingFromFile() { + // Update the regionPrefetchedSizeMap with the region size while restarting the region server + if (LOG.isDebugEnabled()) { + LOG.debug("Updating region size map after retrieving prefetch file list"); + } + prefetchCompleted.forEach((hFileName, hFileSize) -> { + // Get the region name for each file + Map.Entry regionEntry = hFileSize.entrySet().iterator().next(); + String regionEncodedName = regionEntry.getKey(); + long filePrefetchSize = regionEntry.getValue(); + regionPrefetchSizeMap.merge(regionEncodedName, filePrefetchSize, + (oldpf, fileSize) -> oldpf + fileSize); + }); + } + private static FileInputStream deleteFileOnClose(final File file) throws IOException { return new FileInputStream(file) { private File myFile; @@ -203,13 +249,24 @@ public void close() throws IOException { } public static void removePrefetchedFileWhileEvict(String hfileName) { - prefetchCompleted.remove(hfileName); + removeFileFromPrefetch(hfileName); } public static boolean isFilePrefetched(String hfileName) { return prefetchCompleted.containsKey(hfileName); } + public static Map getRegionPrefetchInfo() { + return regionPrefetchSizeMap; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*(/src/test/.*|PrefetchExecutor).java") + public static void reset() { + prefetchCompleted = new HashMap<>(); + regionPrefetchSizeMap = new ConcurrentHashMap<>(); + } + private PrefetchExecutor() { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java index e75e8a6a6522..815bd91bfc5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile; +import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; @@ -26,8 +27,24 @@ private PrefetchProtoUtils() { } static PersistentPrefetchProtos.PrefetchedHfileName - toPB(Map prefetchedHfileNames) { - return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder() - .putAllPrefetchedFiles(prefetchedHfileNames).build(); + toPB(Map> prefetchedHfileNames) { + Map tmpMap = new HashMap<>(); + prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> { + PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize = + PersistentPrefetchProtos.RegionFileSizeMap.newBuilder() + .putAllRegionFileSize(regionPrefetchMap).build(); + tmpMap.put(hFileName, tmpRegionFileSize); + }); + return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap) + .build(); + } + + static Map> + fromPB(Map prefetchHFileNames) { + Map> hFileMap = new HashMap<>(); + prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> { + hFileMap.put(hFileName, regionPrefetchMap.getRegionFileSizeMap()); + }); + return hFileMap; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index dbd3d7f86646..bf44aff16431 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -106,8 +107,8 @@ public void testPrefetchPersistenceCrash() throws Exception { CacheConfig cacheConf = new CacheConfig(conf, bucketCache); FileSystem fs = HFileSystem.get(conf); // Load Cache - Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs); - Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs); + Path storeFile = writeStoreFile("Region0", "TestPrefetch0", conf, cacheConf, fs); + Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1", conf, cacheConf, fs); readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache); Thread.sleep(bucketCachePersistInterval); @@ -126,7 +127,7 @@ public void testPrefetchPersistenceCrashNegative() throws Exception { CacheConfig cacheConf = new CacheConfig(conf, bucketCache); FileSystem fs = HFileSystem.get(conf); // Load Cache - Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs); + Path storeFile = writeStoreFile("Region2", "TestPrefetch2", conf, cacheConf, fs); readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertFalse(new File(testDir + "/bucket.persistence").exists()); @@ -140,14 +141,18 @@ public void testPrefetchListUponBlockEviction() throws Exception { CacheConfig cacheConf = new CacheConfig(conf, bucketCache1); FileSystem fs = HFileSystem.get(conf); // Load Blocks in cache - Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs); + Path storeFile = writeStoreFile("Region3", "TestPrefetch3", conf, cacheConf, fs); readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1); Thread.sleep(500); // Evict Blocks from cache BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey(); assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName())); + int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size(); + assertTrue(initialRegionPrefetchInfoSize > 0); bucketCache1.evictBlock(bucketCacheKey); assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName())); + int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size(); + assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1); } public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf, @@ -172,9 +177,12 @@ public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheC } } - public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs) - throws IOException { - Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); + public Path writeStoreFile(String regionName, String fname, Configuration conf, + CacheConfig cacheConf, FileSystem fs) throws IOException { + // Create store files as per the following directory structure + // // + Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName); + Path storeFileParentDir = new Path(regionDir, fname); HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) .withOutputDir(storeFileParentDir).withFileContext(meta).build(); @@ -190,6 +198,18 @@ public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheCo } sfw.close(); + + // Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region + // name to be added to the prefetch file list + Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE); + File regionInfoFile = new File(regionInfoFilePath.toString()); + try { + if (!regionInfoFile.createNewFile()) { + assertFalse("Unable to create .regioninfo file", true); + } + } catch (IOException e) { + e.printStackTrace(); + } return sfw.getPath(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 771ab0158f61..843cf8000890 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -123,14 +124,16 @@ public void testPrefetchPersistence() throws Exception { assertEquals(0, usedSize); assertTrue(new File(testDir + "/bucket.cache").exists()); // Load Cache - Path storeFile = writeStoreFile("TestPrefetch0"); - Path storeFile2 = writeStoreFile("TestPrefetch1"); + Path storeFile = writeStoreFile("Region0", "TestPrefetch0"); + Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1"); readStoreFile(storeFile, 0); readStoreFile(storeFile2, 0); usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); bucketCache.shutdown(); + // Reset the info maintained in PrefetchExecutor + PrefetchExecutor.reset(); assertTrue(new File(testDir + "/bucket.persistence").exists()); assertTrue(new File(testDir + "/prefetch.persistence").exists()); bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, @@ -149,8 +152,12 @@ public void testPrefetchPersistence() throws Exception { public void closeStoreFile(Path path) throws Exception { HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); assertTrue(PrefetchExecutor.isFilePrefetched(path.getName())); + int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size(); + assertTrue(initialRegionPrefetchInfoSize > 0); reader.close(true); assertFalse(PrefetchExecutor.isFilePrefetched(path.getName())); + int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size(); + assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1); } public void readStoreFile(Path storeFilePath, long offset) throws Exception { @@ -174,8 +181,11 @@ public void readStoreFile(Path storeFilePath, long offset) throws Exception { } } - public Path writeStoreFile(String fname) throws IOException { - Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); + public Path writeStoreFile(String regionName, String fname) throws IOException { + // Create store files as per the following directory structure + // // + Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName); + Path storeFileParentDir = new Path(regionDir, fname); HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) .withOutputDir(storeFileParentDir).withFileContext(meta).build(); @@ -191,6 +201,19 @@ public Path writeStoreFile(String fname) throws IOException { } sfw.close(); + + // Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region name + // to be added to the prefetch file list + Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE); + File regionInfoFile = new File(regionInfoFilePath.toString()); + LOG.info("Create file: {}", regionInfoFilePath); + try { + if (!regionInfoFile.exists() && !regionInfoFile.createNewFile()) { + assertFalse("Unable to create .regioninfo file", true); + } + } catch (IOException e) { + e.printStackTrace(); + } return sfw.getPath(); } From 90d1df6c9d4836b063b372bcd624ea104545e9c4 Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Wed, 2 Aug 2023 13:22:33 +0530 Subject: [PATCH 2/4] HBASE-27997 Enhance prefetch executor to record region prefetch information along with the list of hfiles prefetched --- .../org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index 0af208a5b994..1d00cc5cde11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -22,6 +22,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -257,7 +258,7 @@ public static boolean isFilePrefetched(String hfileName) { } public static Map getRegionPrefetchInfo() { - return regionPrefetchSizeMap; + return Collections.unmodifiableMap(regionPrefetchSizeMap); } @RestrictedApi(explanation = "Should only be called in tests", link = "", From eb6b1558da5a108a530a561cfa6567c17d88af48 Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Wed, 2 Aug 2023 15:27:03 +0530 Subject: [PATCH 3/4] HBASE-27997 Enhance prefetch executor to record region prefetch information along with the list of hfiles prefetched --- .../hbase/io/hfile/PrefetchExecutor.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index 1d00cc5cde11..6e7145512551 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -56,8 +56,16 @@ public final class PrefetchExecutor { private static final Map> prefetchFutures = new ConcurrentSkipListMap<>(); /** Set of files for which prefetch is completed */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") - private static ConcurrentHashMap regionPrefetchSizeMap = new ConcurrentHashMap<>(); - private static HashMap> prefetchCompleted = new HashMap<>(); + /** + * Map of region -> total size of the region prefetched on this region server. + * This is the total size of hFiles for this region prefetched on this region server + */ + private static Map regionPrefetchSizeMap = new ConcurrentHashMap<>(); + /** + * Map of hFile -> Region -> File size. + * This map is used by the prefetch executor while caching or evicting individual hFiles. + */ + private static Map> prefetchCompleted = new HashMap<>(); /** Executor pool shared among all HFiles for block prefetch */ private static final ScheduledExecutorService prefetchExecutorPool; /** Delay before beginning prefetch */ @@ -131,9 +139,7 @@ private static void removeFileFromPrefetch(String hFileName) { prefetchCompleted.get(hFileName).entrySet().iterator().next(); String regionEncodedName = regionEntry.getKey(); long filePrefetchedSize = regionEntry.getValue(); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing file {} for region {}", hFileName, regionEncodedName); - } + LOG.debug("Removing file {} for region {}", hFileName, regionEncodedName); regionPrefetchSizeMap.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchedSize); // If all the blocks for a region are evicted from the cache, remove the entry for that region @@ -212,9 +218,7 @@ public static void retrieveFromFile(String path) throws IOException { private static void updateRegionSizeMapWhileRetrievingFromFile() { // Update the regionPrefetchedSizeMap with the region size while restarting the region server - if (LOG.isDebugEnabled()) { - LOG.debug("Updating region size map after retrieving prefetch file list"); - } + LOG.debug("Updating region size map after retrieving prefetch file list"); prefetchCompleted.forEach((hFileName, hFileSize) -> { // Get the region name for each file Map.Entry regionEntry = hFileSize.entrySet().iterator().next(); From 28cc475081feb0f8d92a158eff6acbd723511863 Mon Sep 17 00:00:00 2001 From: Rahul Agarkar Date: Wed, 2 Aug 2023 17:08:41 +0530 Subject: [PATCH 4/4] HBASE-27997 Enhance prefetch executor to record region prefetch information along with the list of hfiles prefetched --- .../main/protobuf/PrefetchPersistence.proto | 3 ++- .../hbase/io/hfile/PrefetchExecutor.java | 27 +++++++++---------- .../hbase/io/hfile/PrefetchProtoUtils.java | 13 +++++---- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto index 1237e27187f6..a024b94baa62 100644 --- a/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto +++ b/hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto @@ -31,5 +31,6 @@ message PrefetchedHfileName { } message RegionFileSizeMap { - map region_file_size = 1; + required string region_name = 1; + required uint64 region_prefetch_size = 2; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index 6e7145512551..3a0629a59c06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,15 +58,15 @@ public final class PrefetchExecutor { /** Set of files for which prefetch is completed */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") /** - * Map of region -> total size of the region prefetched on this region server. - * This is the total size of hFiles for this region prefetched on this region server + * Map of region -> total size of the region prefetched on this region server. This is the total + * size of hFiles for this region prefetched on this region server */ private static Map regionPrefetchSizeMap = new ConcurrentHashMap<>(); /** - * Map of hFile -> Region -> File size. - * This map is used by the prefetch executor while caching or evicting individual hFiles. + * Map of hFile -> Region -> File size. This map is used by the prefetch executor while caching or + * evicting individual hFiles. */ - private static Map> prefetchCompleted = new HashMap<>(); + private static Map> prefetchCompleted = new HashMap<>(); /** Executor pool shared among all HFiles for block prefetch */ private static final ScheduledExecutorService prefetchExecutorPool; /** Delay before beginning prefetch */ @@ -135,10 +136,9 @@ public static void request(Path path, Runnable runnable) { private static void removeFileFromPrefetch(String hFileName) { // Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted if (prefetchCompleted.containsKey(hFileName)) { - Map.Entry regionEntry = - prefetchCompleted.get(hFileName).entrySet().iterator().next(); - String regionEncodedName = regionEntry.getKey(); - long filePrefetchedSize = regionEntry.getValue(); + Pair regionEntry = prefetchCompleted.get(hFileName); + String regionEncodedName = regionEntry.getFirst(); + long filePrefetchedSize = regionEntry.getSecond(); LOG.debug("Removing file {} for region {}", hFileName, regionEncodedName); regionPrefetchSizeMap.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchedSize); @@ -155,9 +155,7 @@ private static void removeFileFromPrefetch(String hFileName) { public static void complete(final String regionName, Path path, long size) { prefetchFutures.remove(path); - Map tmpMap = new HashMap<>(); - tmpMap.put(regionName, size); - prefetchCompleted.put(path.getName(), tmpMap); + prefetchCompleted.put(path.getName(), new Pair<>(regionName, size)); regionPrefetchSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize); LOG.debug("Prefetch completed for {}", path.getName()); } @@ -221,9 +219,8 @@ private static void updateRegionSizeMapWhileRetrievingFromFile() { LOG.debug("Updating region size map after retrieving prefetch file list"); prefetchCompleted.forEach((hFileName, hFileSize) -> { // Get the region name for each file - Map.Entry regionEntry = hFileSize.entrySet().iterator().next(); - String regionEncodedName = regionEntry.getKey(); - long filePrefetchSize = regionEntry.getValue(); + String regionEncodedName = hFileSize.getFirst(); + long filePrefetchSize = hFileSize.getSecond(); regionPrefetchSizeMap.merge(regionEncodedName, filePrefetchSize, (oldpf, fileSize) -> oldpf + fileSize); }); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java index 815bd91bfc5a..df67e4429a2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos; @@ -27,23 +28,25 @@ private PrefetchProtoUtils() { } static PersistentPrefetchProtos.PrefetchedHfileName - toPB(Map> prefetchedHfileNames) { + toPB(Map> prefetchedHfileNames) { Map tmpMap = new HashMap<>(); prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> { PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize = PersistentPrefetchProtos.RegionFileSizeMap.newBuilder() - .putAllRegionFileSize(regionPrefetchMap).build(); + .setRegionName(regionPrefetchMap.getFirst()) + .setRegionPrefetchSize(regionPrefetchMap.getSecond()).build(); tmpMap.put(hFileName, tmpRegionFileSize); }); return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap) .build(); } - static Map> + static Map> fromPB(Map prefetchHFileNames) { - Map> hFileMap = new HashMap<>(); + Map> hFileMap = new HashMap<>(); prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> { - hFileMap.put(hFileName, regionPrefetchMap.getRegionFileSizeMap()); + hFileMap.put(hFileName, + new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionPrefetchSize())); }); return hFileMap; }