Skip to content

Commit

Permalink
HBASE-28690 Aborting Active HMaster is not rejecting reportRegionStat…
Browse files Browse the repository at this point in the history
…eTransition if procedure is initialised by next Active master (#6136)

Added masterActiveTime as fencing token for remote procedures

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Reviewed-by: Aman Poonia <aman.poonia.29@gmail.com>
  • Loading branch information
Umeshkumar9414 authored Aug 25, 2024
1 parent 18ddef1 commit 9df62ab
Show file tree
Hide file tree
Showing 30 changed files with 190 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3054,10 +3054,12 @@ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte
}

public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
ServerName destinationServer, long closeProcId, boolean evictCache) {
ServerName destinationServer, long closeProcId, boolean evictCache,
long initiatingMasterActiveTime) {
CloseRegionRequest.Builder builder =
getBuilder(server, regionName, destinationServer, closeProcId);
builder.setEvictCache(evictCache);
builder.setInitiatingMasterActiveTime(initiatingMasterActiveTime);
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,22 @@ protected final void submitTask(Runnable task, long delay, TimeUnit unit) {
*/
public static abstract class RemoteOperation {
private final RemoteProcedure remoteProcedure;
// active time of the master that sent this request, used for fencing
private final long initiatingMasterActiveTime;

protected RemoteOperation(final RemoteProcedure remoteProcedure) {
protected RemoteOperation(final RemoteProcedure remoteProcedure,
long initiatingMasterActiveTime) {
this.remoteProcedure = remoteProcedure;
this.initiatingMasterActiveTime = initiatingMasterActiveTime;
}

public RemoteProcedure getRemoteProcedure() {
return remoteProcedure;
}

public long getInitiatingMasterActiveTime() {
return initiatingMasterActiveTime;
}
}

/**
Expand Down
6 changes: 6 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ message OpenRegionRequest {
repeated RegionOpenInfo open_info = 1;
// the intended server for this RPC.
optional uint64 serverStartCode = 2;
// Master active time as fencing token
optional int64 initiating_master_active_time = 3;
// wall clock time from master
optional uint64 master_system_time = 5;

Expand Down Expand Up @@ -123,6 +125,8 @@ message CloseRegionRequest {
optional uint64 serverStartCode = 5;
optional int64 close_proc_id = 6 [default = -1];
optional bool evict_cache = 7 [default = false];
// Master active time as fencing token
optional int64 initiating_master_active_time = 8;
}

message CloseRegionResponse {
Expand Down Expand Up @@ -272,6 +276,8 @@ message RemoteProcedureRequest {
required uint64 proc_id = 1;
required string proc_class = 2;
optional bytes proc_data = 3;
// Master active time as fencing token
optional int64 initiating_master_active_time = 4;
}

message ExecuteProceduresRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ message RegionStateTransition {
optional uint64 open_seq_num = 3;

repeated int64 proc_id = 4;

// Master active time as fencing token
optional int64 initiating_master_active_time = 5;
enum TransitionCode {
OPENED = 0;
FAILED_OPEN = 1;
Expand Down Expand Up @@ -155,6 +158,8 @@ message RemoteProcedureResult {
}
required Status status = 2;
optional ForeignExceptionMessage error = 3;
// Master active time as fencing token
optional int64 initiating_master_active_time = 4;
}
message ReportProcedureDoneRequest {
repeated RemoteProcedureResult result = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3097,6 +3097,7 @@ public long getMasterStartTime() {
}

/** Returns timestamp in millis when HMaster became the active master. */
@Override
public long getMasterActiveTime() {
return masterActiveTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.Server;
Expand Down Expand Up @@ -72,7 +73,6 @@
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
Expand Down Expand Up @@ -338,6 +338,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
Expand Down Expand Up @@ -1794,6 +1795,15 @@ public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcContro
ReportRegionStateTransitionRequest req) throws ServiceException {
try {
master.checkServiceStarted();
for (RegionServerStatusProtos.RegionStateTransition transition : req.getTransitionList()) {
long procId =
transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
// -1 is less than any possible MasterActiveCode
long initiatingMasterActiveTime = transition.hasInitiatingMasterActiveTime()
? transition.getInitiatingMasterActiveTime()
: -1;
throwOnOldMaster(procId, initiatingMasterActiveTime);
}
return master.getAssignmentManager().reportRegionStateTransition(req);
} catch (IOException ioe) {
throw new ServiceException(ioe);
Expand Down Expand Up @@ -2544,8 +2554,14 @@ public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
// Check Masters is up and ready for duty before progressing. Remote side will keep trying.
try {
this.master.checkServiceStarted();
} catch (ServerNotRunningYetException snrye) {
throw new ServiceException(snrye);
for (RemoteProcedureResult result : request.getResultList()) {
// -1 is less than any possible MasterActiveCode
long initiatingMasterActiveTime =
result.hasInitiatingMasterActiveTime() ? result.getInitiatingMasterActiveTime() : -1;
throwOnOldMaster(result.getProcId(), initiatingMasterActiveTime);
}
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
request.getResultList().forEach(result -> {
if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
Expand All @@ -2558,6 +2574,18 @@ public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
return ReportProcedureDoneResponse.getDefaultInstance();
}

private void throwOnOldMaster(long procId, long initiatingMasterActiveTime)
throws MasterNotRunningException {
if (initiatingMasterActiveTime > master.getMasterActiveTime()) {
// procedure is initiated by new active master but report received on master with older active
// time
LOG.warn(
"Report for procId: {} and initiatingMasterAT {} received on master with activeTime {}",
procId, initiatingMasterActiveTime, master.getMasterActiveTime());
throw new MasterNotRunningException("Another master is active");
}
}

// HBCK Services

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ long splitRegion(final RegionInfo regionInfo, final byte[] splitRow, final long
/** Returns true if master is the active one */
boolean isActiveMaster();

/** Returns timestamp in millis when this master became the active one. */
long getMasterActiveTime();

/** Returns true if master is initialized */
boolean isInitialized();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ public TableOperationType getTableOperationType() {
}

@Override
public RemoteOperation newRemoteOperation() {
return new RegionCloseOperation(this, region, getProcId(), assignCandidate, evictCache);
public RemoteOperation newRemoteOperation(MasterProcedureEnv env) {
return new RegionCloseOperation(this, region, getProcId(), assignCandidate, evictCache,
env.getMasterServices().getMasterActiveTime());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ public TableOperationType getTableOperationType() {
}

@Override
public RemoteOperation newRemoteOperation() {
return new RegionOpenOperation(this, region, getProcId());
public RemoteOperation newRemoteOperation(MasterProcedureEnv env) {
return new RegionOpenOperation(this, region, getProcId(),
env.getMasterServices().getMasterActiveTime());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(Maste
if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
return Optional.empty();
}
return Optional.of(newRemoteOperation());
return Optional.of(newRemoteOperation(env));
}

protected abstract RemoteProcedureDispatcher.RemoteOperation newRemoteOperation();
protected abstract RemoteProcedureDispatcher.RemoteOperation
newRemoteOperation(MasterProcedureEnv env);

@Override
public void remoteOperationCompleted(MasterProcedureEnv env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerN
}
}
}
return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(),
FlushRegionCallable.class, builder.build().toByteArray()));
return Optional
.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), FlushRegionCallable.class,
builder.build().toByteArray(), env.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public void dispatchCloseRequests(final MasterProcedureEnv env,

@Override
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
operations.stream().map(ServerOperation::buildRequest).forEachOrdered(request::addProc);
}

// will be overridden in test.
Expand All @@ -450,7 +450,9 @@ protected final void remoteCallFailed(final MasterProcedureEnv env, final IOExce
private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
final ServerName serverName, final List<RegionOpenOperation> operations) {
final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
builder.setServerStartCode(serverName.getStartcode());
builder.setServerStartCode(serverName.getStartCode());
operations.stream().map(RemoteOperation::getInitiatingMasterActiveTime).findAny()
.ifPresent(builder::setInitiatingMasterActiveTime);
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
for (RegionOpenOperation op : operations) {
builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
Expand All @@ -473,35 +475,37 @@ public static final class ServerOperation extends RemoteOperation {
private final byte[] rsProcData;

public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
byte[] rsProcData) {
super(remoteProcedure);
byte[] rsProcData, long initiatingMasterActiveTime) {
super(remoteProcedure, initiatingMasterActiveTime);
this.procId = procId;
this.rsProcClass = rsProcClass;
this.rsProcData = rsProcData;
}

public RemoteProcedureRequest buildRequest() {
return RemoteProcedureRequest.newBuilder().setProcId(procId)
.setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
.setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData))
.setInitiatingMasterActiveTime(getInitiatingMasterActiveTime()).build();
}
}

public static abstract class RegionOperation extends RemoteOperation {
protected final RegionInfo regionInfo;
protected final long procId;

protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
super(remoteProcedure);
protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
long initiatingMasterActiveTime) {
super(remoteProcedure, initiatingMasterActiveTime);
this.regionInfo = regionInfo;
this.procId = procId;
}
}

public static class RegionOpenOperation extends RegionOperation {

public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
long procId) {
super(remoteProcedure, regionInfo, procId);
public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
long initiatingMasterActiveTime) {
super(remoteProcedure, regionInfo, procId, initiatingMasterActiveTime);
}

public OpenRegionRequest.RegionOpenInfo
Expand All @@ -516,8 +520,8 @@ public static class RegionCloseOperation extends RegionOperation {
private boolean evictCache;

public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
ServerName destinationServer, boolean evictCache) {
super(remoteProcedure, regionInfo, procId);
ServerName destinationServer, boolean evictCache, long initiatingMasterActiveTime) {
super(remoteProcedure, regionInfo, procId, initiatingMasterActiveTime);
this.destinationServer = destinationServer;
this.evictCache = evictCache;
}
Expand All @@ -528,8 +532,7 @@ public ServerName getDestinationServer() {

public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
getDestinationServer(), procId, evictCache);

getDestinationServer(), procId, evictCache, getInitiatingMasterActiveTime());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,11 @@ protected boolean holdLock(MasterProcedureEnv env) {

@Override
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName serverName) {
return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(),
SnapshotRegionCallable.class, MasterProcedureProtos.SnapshotRegionParameter.newBuilder()
.setRegion(ProtobufUtil.toRegionInfo(region)).setSnapshot(snapshot).build().toByteArray()));
return Optional
.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), SnapshotRegionCallable.class,
MasterProcedureProtos.SnapshotRegionParameter.newBuilder()
.setRegion(ProtobufUtil.toRegionInfo(region)).setSnapshot(snapshot).build().toByteArray(),
env.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,9 @@ protected void toStringClassDetails(StringBuilder builder) {
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName serverName) {
SnapshotVerifyParameter.Builder builder = SnapshotVerifyParameter.newBuilder();
builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region));
return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(),
SnapshotVerifyCallable.class, builder.build().toByteArray()));
return Optional
.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), SnapshotVerifyCallable.class,
builder.build().toByteArray(), env.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
@Override
public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
ServerName serverName) {
return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(),
SplitWALCallable.class, MasterProcedureProtos.SplitWALParameter.newBuilder()
.setWalPath(walPath).build().toByteArray()));
return Optional.of(new RSProcedureDispatcher.ServerOperation(
this, getProcId(), SplitWALCallable.class, MasterProcedureProtos.SplitWALParameter
.newBuilder().setWalPath(walPath).build().toByteArray(),
env.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
SwitchRpcThrottleRemoteCallable.class,
SwitchRpcThrottleRemoteStateData.newBuilder()
.setTargetServer(ProtobufUtil.toServerName(remote))
.setRpcThrottleEnabled(rpcThrottleEnabled).build().toByteArray()));
.setRpcThrottleEnabled(rpcThrottleEnabled).build().toByteArray(),
masterProcedureEnv.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerN
return Optional.of(new ServerOperation(this, getProcId(), ClaimReplicationQueueCallable.class,
ClaimReplicationQueueRemoteParameter.newBuilder()
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue).build()
.toByteArray()));
.toByteArray(),
env.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerN
assert targetServer.equals(remote);
return Optional.of(new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()));
.setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray(),
env.getMasterServices().getMasterActiveTime()));
}

@Override
Expand Down
Loading

0 comments on commit 9df62ab

Please sign in to comment.