Skip to content

Commit

Permalink
CDPD-45617: Backport HBASE-27474 Evict blocks on split/merge; Avoid c…
Browse files Browse the repository at this point in the history
…aching reference/hlinks if compaction is enabled (apache#4868)

Signed-off-by: Peter Somogyi <psomogyi@apache.org>
(cherry picked from commit 222ec68)

Change-Id: I0d864cbd2a423e8df27e964834c2bcee21a532f9
  • Loading branch information
wchevreuil authored and Wellington Ramos Chevreuil committed Dec 20, 2022
1 parent bbe9d0c commit 2c1affa
Show file tree
Hide file tree
Showing 15 changed files with 197 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2952,6 +2952,19 @@ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte
}

public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
ServerName destinationServer, long closeProcId) {
return ProtobufUtil.getBuilder(server, regionName, destinationServer, closeProcId).build();
}

public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
ServerName destinationServer, long closeProcId, boolean evictCache) {
CloseRegionRequest.Builder builder =
getBuilder(server, regionName, destinationServer, closeProcId);
builder.setEvictCache(evictCache);
return builder.build();
}

public static CloseRegionRequest.Builder getBuilder(ServerName server, byte[] regionName,
ServerName destinationServer, long closeProcId) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
RegionSpecifier region =
Expand All @@ -2964,7 +2977,7 @@ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte
builder.setServerStartCode(server.getStartcode());
}
builder.setCloseProcId(closeProcId);
return builder.build();
return builder;
}

public static ProcedureDescription buildProcedureDescription(String signature, String instance,
Expand Down
1 change: 1 addition & 0 deletions hbase-protocol-shaded/src/main/protobuf/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ message CloseRegionRequest {
// the intended server for this RPC.
optional uint64 serverStartCode = 5;
optional int64 close_proc_id = 6 [default = -1];
optional bool evict_cache = 7 [default = false];
}

message CloseRegionResponse {
Expand Down
3 changes: 3 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ message RegionStateTransitionStateData {
required RegionTransitionType type = 1;
optional ServerName assign_candidate = 2;
required bool force_new_plan = 3;
optional bool is_split = 4 [default = false];
optional bool evict_cache = 5 [default = false];
}

enum RegionRemoteProcedureBaseState {
Expand All @@ -525,6 +527,7 @@ message OpenRegionProcedureStateData {

message CloseRegionProcedureStateData {
optional ServerName assign_candidate = 1;
optional bool evict_cache = 2 [default = false];
}

enum SwitchRpcThrottleState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, conf);
// Prefetch file blocks upon open if requested
if (cacheConf.shouldPrefetchOnOpen()) {
if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()) {
PrefetchExecutor.request(path, new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -97,7 +97,7 @@ public void close(boolean evictOnClose) throws IOException {
if (evictOnClose) {
int numEvicted = cache.evictBlocksByHfileName(name);
if (LOG.isTraceEnabled()) {
LOG.trace("On close, file=" + name + " evicted=" + numEvicted + " block(s)");
LOG.trace("On close, file= {} evicted= {} block(s)", name, numEvicted);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.DataInput;
Expand All @@ -38,13 +40,15 @@
import org.apache.hadoop.hbase.SizeCachedKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsByteBufferKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
Expand Down Expand Up @@ -1262,6 +1266,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
BlockCacheKey cacheKey =
new BlockCacheKey(name, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType);

boolean cacheable = cacheBlock && cacheIfCompactionsOff();

boolean useLock = false;
IdLock.Entry lockEntry = null;
Span span = TraceUtil.getGlobalTracer().spanBuilder("HFileReaderImpl.readBlock").startSpan();
Expand Down Expand Up @@ -1303,7 +1309,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
return cachedBlock;
}

if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
if (!useLock && cacheable && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
// check cache again with lock
useLock = true;
continue;
Expand All @@ -1322,10 +1328,10 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo

// Don't need the unpacked block back and we're storing the block in the cache compressed
if (cacheOnly && cacheCompressed && cacheOnRead) {
LOG.debug("Skipping decompression of block in prefetch");
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
}
});
Expand All @@ -1338,7 +1344,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory(), cacheOnly);
Expand Down Expand Up @@ -1663,4 +1669,9 @@ public int getMajorVersion() {
public void unbufferStream() {
fsBlockReader.unbufferStream();
}

protected boolean cacheIfCompactionsOff() {
return (!StoreFileInfo.isReference(name) && !HFileLink.isHFileLink(name))
|| !conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static TransitRegionStateProcedure[] createUnassignProceduresForSplitOrMerge(
for (; i < procs.length; i++) {
RegionStateNode regionNode = regionNodes.get(i);
TransitRegionStateProcedure proc =
TransitRegionStateProcedure.unassign(env, regionNode.getRegionInfo());
TransitRegionStateProcedure.unassignSplitMerge(env, regionNode.getRegionInfo());
if (regionNode.getProcedure() != null) {
throw new HBaseIOException(
"The parent region " + regionNode + " is currently in transition, give up");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
// wrong(but do not make it wrong intentionally). The client can handle this error.
private ServerName assignCandidate;

private boolean evictCache;

public CloseRegionProcedure() {
super();
}

public CloseRegionProcedure(TransitRegionStateProcedure parent, RegionInfo region,
ServerName targetServer, ServerName assignCandidate) {
ServerName targetServer, ServerName assignCandidate, boolean evictCache) {
super(parent, region, targetServer);
this.assignCandidate = assignCandidate;
this.evictCache = evictCache;
}

@Override
Expand All @@ -62,7 +65,7 @@ public TableOperationType getTableOperationType() {

@Override
public RemoteOperation newRemoteOperation() {
return new RegionCloseOperation(this, region, getProcId(), assignCandidate);
return new RegionCloseOperation(this, region, getProcId(), assignCandidate, evictCache);
}

@Override
Expand All @@ -72,6 +75,7 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
if (assignCandidate != null) {
builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
}
builder.setEvictCache(evictCache);
serializer.serialize(builder.build());
}

Expand All @@ -83,6 +87,7 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
if (data.hasAssignCandidate()) {
assignCandidate = ProtobufUtil.toServerName(data.getAssignCandidate());
}
evictCache = data.getEvictCache();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.master.assignment;

import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
import static org.apache.hadoop.hbase.master.LoadBalancer.BOGUS_SERVER_NAME;

import edu.umd.cs.findbugs.annotations.Nullable;
Expand Down Expand Up @@ -120,6 +122,10 @@ public class TransitRegionStateProcedure

private RegionRemoteProcedureBase remoteProc;

private boolean evictCache;

private boolean isSplit;

public TransitRegionStateProcedure() {
}

Expand Down Expand Up @@ -155,6 +161,14 @@ protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
if (type == TransitionType.REOPEN) {
this.assignCandidate = getRegionStateNode(env).getRegionLocation();
}
evictCache =
env.getMasterConfiguration().getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
}

protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
ServerName assignCandidate, boolean forceNewPlan, TransitionType type, boolean isSplit) {
this(env, hri, assignCandidate, forceNewPlan, type);
this.isSplit = isSplit;
}

@Override
Expand Down Expand Up @@ -264,8 +278,12 @@ private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode) thr
if (regionNode.isInState(State.OPEN, State.CLOSING, State.MERGING, State.SPLITTING)) {
// this is the normal case
env.getAssignmentManager().regionClosing(regionNode);
addChildProcedure(new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
assignCandidate));
CloseRegionProcedure closeProc = isSplit
? new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
assignCandidate, true)
: new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
assignCandidate, evictCache);
addChildProcedure(closeProc);
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED);
} else {
forceNewPlan = true;
Expand Down Expand Up @@ -504,8 +522,9 @@ private static RegionTransitionType convert(TransitionType type) {
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
RegionStateTransitionStateData.Builder builder = RegionStateTransitionStateData.newBuilder()
.setType(convert(type)).setForceNewPlan(forceNewPlan);
RegionStateTransitionStateData.Builder builder =
RegionStateTransitionStateData.newBuilder().setType(convert(type))
.setForceNewPlan(forceNewPlan).setEvictCache(evictCache).setIsSplit(isSplit);
if (assignCandidate != null) {
builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
}
Expand All @@ -523,6 +542,8 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
if (data.hasAssignCandidate()) {
assignCandidate = ProtobufUtil.toServerName(data.getAssignCandidate());
}
evictCache = data.getEvictCache();
isSplit = data.getIsSplit();
}

@Override
Expand Down Expand Up @@ -586,6 +607,12 @@ public static TransitRegionStateProcedure unassign(MasterProcedureEnv env, Regio
new TransitRegionStateProcedure(env, region, null, false, TransitionType.UNASSIGN));
}

public static TransitRegionStateProcedure unassignSplitMerge(MasterProcedureEnv env,
RegionInfo region) {
return setOwner(env,
new TransitRegionStateProcedure(env, region, null, false, TransitionType.UNASSIGN, true));
}

public static TransitRegionStateProcedure reopen(MasterProcedureEnv env, RegionInfo region) {
return setOwner(env,
new TransitRegionStateProcedure(env, region, null, false, TransitionType.REOPEN));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,13 @@ public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInf

public static class RegionCloseOperation extends RegionOperation {
private final ServerName destinationServer;
private boolean evictCache;

public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
ServerName destinationServer) {
ServerName destinationServer, boolean evictCache) {
super(remoteProcedure, regionInfo, procId);
this.destinationServer = destinationServer;
this.evictCache = evictCache;
}

public ServerName getDestinationServer() {
Expand All @@ -485,7 +487,8 @@ public ServerName getDestinationServer() {

public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
getDestinationServer(), procId);
getDestinationServer(), procId, evictCache);

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ public boolean isCompactionsEnabled() {

public void setCompactionsEnabled(boolean compactionsEnabled) {
this.compactionsEnabled = compactionsEnabled;
this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION, String.valueOf(compactionsEnabled));
this.conf.setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, compactionsEnabled);
}

/** Returns the longCompactions thread pool executor */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3267,6 +3267,9 @@ private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
* <p>
* If a close was in progress, this new request will be ignored, and an exception thrown.
* </p>
* <p>
* Provides additional flag to indicate if this region blocks should be evicted from the cache.
* </p>
* @param encodedName Region to close
* @param abort True if we are aborting
* @param destination Where the Region is being moved too... maybe null if unknown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3858,9 +3858,10 @@ private void executeCloseRegionProcedures(CloseRegionRequest request) {
? ProtobufUtil.toServerName(request.getDestinationServer())
: null;
long procId = request.getCloseProcId();
boolean evictCache = request.getEvictCache();
if (regionServer.submitRegionProcedure(procId)) {
regionServer.executorService.submit(
UnassignRegionHandler.create(regionServer, encodedName, procId, false, destination));
regionServer.getExecutorService().submit(
UnassignRegionHandler.create(regionServer, encodedName, procId, false, destination, evictCache));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.handler;

import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -27,6 +29,7 @@
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
Expand Down Expand Up @@ -277,6 +280,10 @@ Throwable getException() {
/** Returns Instance of HRegion if successful open else null. */
private HRegion openRegion() {
HRegion region = null;
boolean compactionEnabled =
((HRegionServer) server).getCompactSplitThread().isCompactionsEnabled();
this.server.getConfiguration().setBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,
compactionEnabled);
try {
// Instantiate the region. This also periodically tickles OPENING
// state so master doesn't timeout this region in transition.
Expand Down
Loading

0 comments on commit 2c1affa

Please sign in to comment.