Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28690 added masterStartCode as fencing token for remote procedures #1

Merged
merged 7 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3097,10 +3097,12 @@ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte
}

public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
ServerName destinationServer, long closeProcId, boolean evictCache) {
ServerName destinationServer, long closeProcId, boolean evictCache,
long initiatingMasterActiveTime) {
CloseRegionRequest.Builder builder =
getBuilder(server, regionName, destinationServer, closeProcId);
builder.setEvictCache(evictCache);
builder.setInitiatingMasterActiveTime(initiatingMasterActiveTime);
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ message RegionStateTransition {
optional uint64 open_seq_num = 3;

repeated int64 proc_id = 4;

// Master active time as fencing token
optional int64 initiating_master_active_time = 5;
enum TransitionCode {
OPENED = 0;
FAILED_OPEN = 1;
Expand Down Expand Up @@ -155,6 +158,8 @@ message RemoteProcedureResult {
}
required Status status = 2;
optional ForeignExceptionMessage error = 3;
// Master active time as fencing token
optional int64 initiating_master_active_time = 4;
}
message ReportProcedureDoneRequest {
repeated RemoteProcedureResult result = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ message OpenRegionRequest {
repeated RegionOpenInfo open_info = 1;
// the intended server for this RPC.
optional uint64 serverStartCode = 2;
// Master active time as fencing token
optional int64 initiating_master_active_time = 3;
// wall clock time from master
optional uint64 master_system_time = 5;

Expand Down Expand Up @@ -123,6 +125,8 @@ message CloseRegionRequest {
optional uint64 serverStartCode = 5;
optional int64 close_proc_id = 6 [default = -1];
optional bool evict_cache = 7 [default = false];
// Master active time as fencing token
optional int64 initiating_master_active_time = 8;
}

message CloseRegionResponse {
Expand Down Expand Up @@ -272,6 +276,8 @@ message RemoteProcedureRequest {
required uint64 proc_id = 1;
required string proc_class = 2;
optional bytes proc_data = 3;
// Master active time as fencing token
optional int64 initiating_master_active_time = 4;
}

message ExecuteProceduresRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseRpcServicesBase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerMetrics;
Expand All @@ -64,7 +65,6 @@
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
Expand Down Expand Up @@ -396,6 +396,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RSGroupAdminProtos.UpdateRSGroupConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
Expand Down Expand Up @@ -1854,6 +1855,15 @@ public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcContro
ReportRegionStateTransitionRequest req) throws ServiceException {
try {
server.checkServiceStarted();
for (RegionServerStatusProtos.RegionStateTransition transition : req.getTransitionList()) {
long procId =
transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
// -1 is less than any possible MasterActiveCode
long initiatingMasterActiveTime = transition.hasInitiatingMasterActiveTime()
? transition.getInitiatingMasterActiveTime()
: -1;
throwOnOldMasterStartCode(procId, initiatingMasterActiveTime);
}
return server.getAssignmentManager().reportRegionStateTransition(req);
} catch (IOException ioe) {
throw new ServiceException(ioe);
Expand Down Expand Up @@ -2553,8 +2563,14 @@ public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
// Check Masters is up and ready for duty before progressing. Remote side will keep trying.
try {
this.server.checkServiceStarted();
} catch (ServerNotRunningYetException snrye) {
throw new ServiceException(snrye);
for (RemoteProcedureResult result : request.getResultList()) {
// -1 is less than any possible MasterActiveCode
long initiatingMasterActiveTime =
result.hasInitiatingMasterActiveTime() ? result.getInitiatingMasterActiveTime() : -1;
throwOnOldMasterStartCode(result.getProcId(), initiatingMasterActiveTime);
}
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
request.getResultList().forEach(result -> {
if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
Expand All @@ -2567,6 +2583,18 @@ public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
return ReportProcedureDoneResponse.getDefaultInstance();
}

private void throwOnOldMasterStartCode(long procId, long initiatingMasterActiveTime)
throws MasterNotRunningException {
if (initiatingMasterActiveTime > server.getMasterActiveTime()) {
// procedure is initiated by new active master but report received on master with older active
// time
LOG.warn(
"Report for procId: {} and initiatingMasterAT {} received on master with activeTime {}",
procId, initiatingMasterActiveTime, server.getMasterActiveTime());
throw new MasterNotRunningException("Another master is active");
}
}

@Override
public FileArchiveNotificationResponse reportFileArchival(RpcController controller,
FileArchiveNotificationRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.RpcConnectionConstants;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.ServerManager;
Expand Down Expand Up @@ -416,13 +417,16 @@ public void dispatchOpenRequests(final MasterProcedureEnv env,
public void dispatchCloseRequests(final MasterProcedureEnv env,
final List<RegionCloseOperation> operations) {
for (RegionCloseOperation op : operations) {
request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
request.addCloseRegion(op.buildCloseRegionRequest(getServerName(),
((HMaster) env.getMasterServices()).getMasterActiveTime()));
}
}

@Override
public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
operations.stream()
.map(o -> o.buildRequest(((HMaster) env.getMasterServices()).getMasterActiveTime()))
.forEachOrdered(request::addProc);
}

// will be overridden in test.
Expand All @@ -441,7 +445,9 @@ protected final void remoteCallFailed(final MasterProcedureEnv env, final IOExce
private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
final ServerName serverName, final List<RegionOpenOperation> operations) {
final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
builder.setServerStartCode(serverName.getStartcode());
builder.setServerStartCode(serverName.getStartCode());
builder
.setInitiatingMasterActiveTime(((HMaster) env.getMasterServices()).getMasterActiveTime());
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
for (RegionOpenOperation op : operations) {
builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
Expand Down Expand Up @@ -471,9 +477,10 @@ public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rs
this.rsProcData = rsProcData;
}

public RemoteProcedureRequest buildRequest() {
public RemoteProcedureRequest buildRequest(long initiatingMasterActiveTime) {
return RemoteProcedureRequest.newBuilder().setProcId(procId)
.setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
.setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData))
.setInitiatingMasterActiveTime(initiatingMasterActiveTime).build();
}
}

Expand Down Expand Up @@ -517,9 +524,10 @@ public ServerName getDestinationServer() {
return destinationServer;
}

public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName,
long initiatingMasterActiveTime) {
return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
getDestinationServer(), procId, evictCache);
getDestinationServer(), procId, evictCache, initiatingMasterActiveTime);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2231,6 +2231,7 @@ public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOEx
HRegion r = context.getRegion();
long openProcId = context.getOpenProcId();
long masterSystemTime = context.getMasterSystemTime();
long initiatingMasterActiveTime = context.getInitiatingMasterActiveTime();
rpcServices.checkOpen();
LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
Expand All @@ -2254,7 +2255,7 @@ public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOEx
// Notify master
if (
!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))
openSeqNum, openProcId, masterSystemTime, r.getRegionInfo(), initiatingMasterActiveTime))
) {
throw new IOException(
"Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
Expand Down Expand Up @@ -2315,6 +2316,7 @@ private boolean skipReportingTransition(final RegionStateTransitionContext conte
for (long procId : procIds) {
transition.addProcId(procId);
}
transition.setInitiatingMasterActiveTime(context.getInitiatingMasterActiveTime());

return builder.build();
}
Expand Down Expand Up @@ -3533,12 +3535,15 @@ public boolean reportFileArchivalForQuotas(TableName tableName,
return true;
}

void executeProcedure(long procId, RSProcedureCallable callable) {
executorService.submit(new RSProcedureHandler(this, procId, callable));
void executeProcedure(long procId, long initiatingMasterActiveTime,
RSProcedureCallable callable) {
executorService
.submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable));
}

public void remoteProcedureComplete(long procId, Throwable error) {
procedureResultReporter.complete(procId, error);
public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime,
Throwable error) {
procedureResultReporter.complete(procId, initiatingMasterActiveTime, error);
}

void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3859,6 +3859,8 @@ public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController control
private void executeOpenRegionProcedures(OpenRegionRequest request,
Map<TableName, TableDescriptor> tdCache) {
long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
long initiatingMasterActiveTime =
request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1;
for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
RegionInfo regionInfo = ProtobufUtil.toRegionInfo(regionOpenInfo.getRegion());
TableName tableName = regionInfo.getTable();
Expand All @@ -3884,14 +3886,16 @@ private void executeOpenRegionProcedures(OpenRegionRequest request,
}
long procId = regionOpenInfo.getOpenProcId();
if (server.submitRegionProcedure(procId)) {
server.getExecutorService().submit(
AssignRegionHandler.create(server, regionInfo, procId, tableDesc, masterSystemTime));
server.getExecutorService().submit(AssignRegionHandler.create(server, regionInfo, procId,
tableDesc, masterSystemTime, initiatingMasterActiveTime));
}
}
}

private void executeCloseRegionProcedures(CloseRegionRequest request) {
String encodedName;
long initiatingMasterActiveTime =
request.hasInitiatingMasterActiveTime() ? request.getInitiatingMasterActiveTime() : -1;
try {
encodedName = ProtobufUtil.getRegionEncodedName(request.getRegion());
} catch (DoNotRetryIOException e) {
Expand All @@ -3903,8 +3907,8 @@ private void executeCloseRegionProcedures(CloseRegionRequest request) {
long procId = request.getCloseProcId();
boolean evictCache = request.getEvictCache();
if (server.submitRegionProcedure(procId)) {
server.getExecutorService().submit(
UnassignRegionHandler.create(server, encodedName, procId, false, destination, evictCache));
server.getExecutorService().submit(UnassignRegionHandler.create(server, encodedName, procId,
false, destination, evictCache, initiatingMasterActiveTime));
}
}

Expand All @@ -3916,12 +3920,13 @@ private void executeProcedures(RemoteProcedureRequest request) {
} catch (Exception e) {
LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(),
request.getProcId(), e);
server.remoteProcedureComplete(request.getProcId(), e);
server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(),
e);
return;
}
callable.init(request.getProcData().toByteArray(), server);
LOG.debug("Executing remote procedure {}, pid={}", callable.getClass(), request.getProcId());
server.executeProcedure(request.getProcId(), callable);
server.executeProcedure(request.getProcId(), request.getInitiatingMasterActiveTime(), callable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,14 @@ class PostOpenDeployContext {
private final HRegion region;
private final long openProcId;
private final long masterSystemTime;
private final long initiatingMasterActiveTime;

public PostOpenDeployContext(HRegion region, long openProcId, long masterSystemTime) {
public PostOpenDeployContext(HRegion region, long openProcId, long masterSystemTime,
long initiatingMasterActiveTime) {
this.region = region;
this.openProcId = openProcId;
this.masterSystemTime = masterSystemTime;
this.initiatingMasterActiveTime = initiatingMasterActiveTime;
}

public HRegion getRegion() {
Expand All @@ -111,6 +114,10 @@ public long getOpenProcId() {
public long getMasterSystemTime() {
return masterSystemTime;
}

public long getInitiatingMasterActiveTime() {
return initiatingMasterActiveTime;
}
}

/**
Expand All @@ -123,23 +130,26 @@ class RegionStateTransitionContext {
private final TransitionCode code;
private final long openSeqNum;
private final long masterSystemTime;
private final long initiatingMasterActiveTime;
private final long[] procIds;
private final RegionInfo[] hris;

public RegionStateTransitionContext(TransitionCode code, long openSeqNum, long masterSystemTime,
RegionInfo... hris) {
long initiatingMasterActiveTime, RegionInfo... hris) {
this.code = code;
this.openSeqNum = openSeqNum;
this.masterSystemTime = masterSystemTime;
this.initiatingMasterActiveTime = initiatingMasterActiveTime;
this.hris = hris;
this.procIds = new long[hris.length];
}

public RegionStateTransitionContext(TransitionCode code, long openSeqNum, long procId,
long masterSystemTime, RegionInfo hri) {
long masterSystemTime, RegionInfo hri, long initiatingMasterActiveTime) {
this.code = code;
this.openSeqNum = openSeqNum;
this.masterSystemTime = masterSystemTime;
this.initiatingMasterActiveTime = initiatingMasterActiveTime;
this.hris = new RegionInfo[] { hri };
this.procIds = new long[] { procId };
}
Expand All @@ -163,6 +173,10 @@ public RegionInfo[] getHris() {
public long[] getProcIds() {
return procIds;
}

public long getInitiatingMasterActiveTime() {
return initiatingMasterActiveTime;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public RemoteProcedureResultReporter(HRegionServer server) {
this.server = server;
}

public void complete(long procId, Throwable error) {
RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId);
public void complete(long procId, long initiatingMasterActiveTime, Throwable error) {
RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId)
.setInitiatingMasterActiveTime(initiatingMasterActiveTime);
if (error != null) {
LOG.debug("Failed to complete execution of pid={}", procId, error);
builder.setStatus(RemoteProcedureResult.Status.ERROR).setError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void requestRegionSplit() {
// are created just to pass the information to the reportRegionStateTransition().
if (
!server.reportRegionStateTransition(new RegionStateTransitionContext(
TransitionCode.READY_TO_SPLIT, HConstants.NO_SEQNUM, -1, parent, hri_a, hri_b))
TransitionCode.READY_TO_SPLIT, HConstants.NO_SEQNUM, -1, -1, parent, hri_a, hri_b))
) {
LOG.error("Unable to ask master to split " + parent.getRegionNameAsString());
}
Expand Down
Loading