Skip to content

Commit

Permalink
HBASE-27794: Tooling for parsing/reading the prefetch files list file (
Browse files Browse the repository at this point in the history
…#5468) (#5491)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>

(cherry picked from commit fa4c896)
  • Loading branch information
Kota-SH authored Nov 2, 2023
1 parent 0d04a60 commit 8b1f690
Show file tree
Hide file tree
Showing 17 changed files with 143 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3317,4 +3317,9 @@ List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, Server
* @throws IOException if a remote or network exception occurs
*/
Future<Void> truncateRegionAsync(byte[] regionName) throws IOException;

/**
* Get the list of cached files
*/
List<String> getCachedFilesList(ServerName serverName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1702,4 +1702,9 @@ CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, Str
* Flush master local region
*/
CompletableFuture<Void> flushMasterStore();

/**
* Get the list of cached files
*/
CompletableFuture<List<String>> getCachedFilesList(ServerName serverName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -911,4 +911,9 @@ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNam
public CompletableFuture<Void> flushMasterStore() {
return wrap(rawAdmin.flushMasterStore());
}

@Override
public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
return wrap(rawAdmin.getCachedFilesList(serverName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2078,14 +2078,23 @@ public Future<Void> truncateRegionAsync(byte[] regionName) throws IOException {
throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
}

TableName tableName = hri.getTable();
TableName tableName = (hri != null) ? hri.getTable() : null;

MasterProtos.TruncateRegionResponse response =
executeCallable(getTruncateRegionCallable(tableName, hri));

return new TruncateRegionFuture(this, tableName, response);
}

/**
* Get the list of cached files
*/
@Override
public List<String> getCachedFilesList(ServerName serverName) throws IOException {
return ProtobufUtil.getCachedFilesList(rpcControllerFactory.newController(),
this.connection.getAdmin(serverName));
}

private MasterCallable<MasterProtos.TruncateRegionResponse>
getTruncateRegionCallable(TableName tableName, RegionInfo hri) {
return new MasterCallable<MasterProtos.TruncateRegionResponse>(getConnection(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
Expand Down Expand Up @@ -4264,4 +4266,15 @@ Void> call(controller, stub, request.build(),
(s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
.call();
}

@Override
public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
GetCachedFilesListRequest.Builder request = GetCachedFilesListRequest.newBuilder();
return this.<List<String>> newAdminCaller()
.action((controller, stub) -> this.<GetCachedFilesListRequest, GetCachedFilesListResponse,
List<String>> adminCall(controller, stub, request.build(),
(s, c, req, done) -> s.getCachedFilesList(c, req, done),
resp -> resp.getCachedFilesList()))
.serverName(serverName).call();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
Expand Down Expand Up @@ -1746,6 +1748,21 @@ public static List<org.apache.hadoop.hbase.client.RegionInfo> getOnlineRegions(
return getRegionInfos(response);
}

/**
* Get the list of cached files
*/
public static List<String> getCachedFilesList(final RpcController controller,
final AdminService.BlockingInterface admin) throws IOException {
GetCachedFilesListRequest request = GetCachedFilesListRequest.newBuilder().build();
GetCachedFilesListResponse response = null;
try {
response = admin.getCachedFilesList(controller, request);
} catch (ServiceException se) {
throw getRemoteException(se);
}
return new ArrayList<>(response.getCachedFilesList());
}

/**
* Get the list of region info from a GetOnlineRegionResponse
* @param proto the GetOnlineRegionResponse
Expand Down
10 changes: 10 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ message ExecuteProceduresRequest {
message ExecuteProceduresResponse {
}

message GetCachedFilesListRequest {
}

message GetCachedFilesListResponse {
repeated string cached_files = 1;
}

/**
* Slow/Large log (LogRequest) use-case specific RPC request. This request payload will be
* converted in bytes and sent to generic RPC API: GetLogEntries
Expand Down Expand Up @@ -406,4 +413,7 @@ service AdminService {
rpc GetLogEntries(LogRequest)
returns(LogEntry);

rpc GetCachedFilesList(GetCachedFilesListRequest)
returns(GetCachedFilesListResponse);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.io.hfile;

import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -146,4 +148,11 @@ Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
default boolean isMetaBlock(BlockType blockType) {
return blockType != null && blockType.getCategory() != BlockType.BlockCategory.DATA;
}

/**
* Returns the list of fully cached files
*/
default Optional<Map<String, Boolean>> getFullyCachedFiles() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.io.hfile;

import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -369,6 +371,14 @@ public BlockCache[] getBlockCaches() {
return new BlockCache[] { this.l1Cache, this.l2Cache };
}

/**
* Returns the list of fully cached files
*/
@Override
public Optional<Map<String, Boolean>> getFullyCachedFiles() {
return this.l2Cache.getFullyCachedFiles();
}

@Override
public void setMaxSize(long size) {
this.l1Cache.setMaxSize(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
final MutableBoolean fileAlreadyCached = new MutableBoolean(false);
Optional<BucketCache> bucketCacheOptional =
BucketCache.getBucketCacheFromCacheConfig(cacheConf);
bucketCacheOptional.ifPresent(bc -> fileAlreadyCached
.setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false : true));
bucketCacheOptional.flatMap(BucketCache::getFullyCachedFiles).ifPresent(fcf -> {
fileAlreadyCached.setValue(fcf.get(path.getName()) == null ? false : true);
});
// Prefetch file blocks upon open if requested
if (
cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1950,8 +1950,9 @@ public AtomicBoolean getBackingMapValidated() {
return backingMapValidated;
}

public Map<String, Boolean> getFullyCachedFiles() {
return fullyCachedFiles;
@Override
public Optional<Map<String, Boolean>> getFullyCachedFiles() {
return Optional.of(fullyCachedFiles);
}

public static Optional<BucketCache> getBucketCacheFromCacheConfig(CacheConfig cacheConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
Expand Down Expand Up @@ -3065,6 +3067,12 @@ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
throw new ServiceException("Invalid request params");
}

@Override
public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
GetCachedFilesListRequest request) throws ServiceException {
throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
}

private MasterProtos.BalancerDecisionsResponse
getBalancerDecisions(MasterProtos.BalancerDecisionsRequest request) {
final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
Expand Down Expand Up @@ -4033,6 +4035,17 @@ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
throw new ServiceException("Invalid request params");
}

@Override
public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
GetCachedFilesListRequest request) throws ServiceException {
GetCachedFilesListResponse.Builder responseBuilder = GetCachedFilesListResponse.newBuilder();
List<String> fullyCachedFiles = new ArrayList<>();
regionServer.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).ifPresent(fcf -> {
fullyCachedFiles.addAll(fcf.keySet());
});
return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
}

public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,22 @@
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -111,7 +115,17 @@ public void testPrefetchPersistence() throws Exception {

// Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files
// should exist.
assertTrue(new File(testDir + "/bucket.persistence").exists());

HRegionServer regionServingRS = cluster.getRegionServer(1).getRegions(tableName).size() == 1
? cluster.getRegionServer(1)
: cluster.getRegionServer(0);

Admin admin = TEST_UTIL.getAdmin();
List<String> cachedFilesList = admin.getCachedFilesList(regionServingRS.getServerName());
assertEquals(1, cachedFilesList.size());
for (HStoreFile h : regionServingRS.getRegions().get(0).getStores().get(0).getStorefiles()) {
assertTrue(cachedFilesList.contains(h.getPath().getName()));
}

// Stop the RS
cluster.stopRegionServer(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testPrefetchDoesntOverwork() throws Exception {
BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
LOG.debug("removing block {}", key);
bc.getBackingMap().remove(key);
bc.getFullyCachedFiles().remove(storeFile.getName());
bc.getFullyCachedFiles().ifPresent(fcf -> fcf.remove(storeFile.getName()));
assertTrue(snapshot.size() > bc.getBackingMap().size());
LOG.debug("Third read should prefetch again, as we removed one block for the file.");
readStoreFile(storeFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetCachedFilesListResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
Expand Down Expand Up @@ -694,6 +696,12 @@ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
return null;
}

@Override
public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
GetCachedFilesListRequest request) throws ServiceException {
return null;
}

@Override
public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,14 @@ public Future<Void> truncateRegionAsync(byte[] regionName) {
throw new NotImplementedException("Truncate Region Async not supported in ThriftAdmin");
}

/**
* Get the list of cached files
*/
@Override
public List<String> getCachedFilesList(ServerName serverName) {
throw new NotImplementedException("getCachedFilesList not supported in ThriftAdmin");
}

@Override
public void splitRegion(byte[] regionName, byte[] splitPoint) {
throw new NotImplementedException("splitRegion not supported in ThriftAdmin");
Expand Down

0 comments on commit 8b1f690

Please sign in to comment.