Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11581. Remove duplicate ContainerStateMachine#RaftGroupId #7312

Merged
merged 1 commit into from
Oct 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ long getStartTime() {

private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final RaftGroupId gid;
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private final XceiverServerRatis ratisServer;
Expand Down Expand Up @@ -218,7 +217,6 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
ConfigurationSource conf,
String threadNamePrefix) {
this.datanodeService = hddsDatanodeService;
this.gid = gid;
this.dispatcher = dispatcher;
this.containerController = containerController;
this.ratisServer = ratisServer;
Expand Down Expand Up @@ -282,7 +280,7 @@ public void initialize(
throws IOException {
super.initialize(server, id, raftStorage);
storage.init(raftStorage);
ratisServer.notifyGroupAdd(gid);
ratisServer.notifyGroupAdd(id);

LOG.info("{}: initialize {}", server.getId(), id);
loadSnapshot(storage.getLatestSnapshot());
Expand All @@ -293,15 +291,15 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
if (snapshot == null) {
TermIndex empty = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX);
LOG.info("{}: The snapshot info is null. Setting the last applied index " +
"to:{}", gid, empty);
"to:{}", getGroupId(), empty);
setLastAppliedTermIndex(empty);
return empty.getIndex();
}

final File snapshotFile = snapshot.getFile().getPath().toFile();
final TermIndex last =
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
LOG.info("{}: Setting the last applied index to {}", gid, last);
LOG.info("{}: Setting the last applied index to {}", getGroupId(), last);
setLastAppliedTermIndex(last);

// initialize the dispatcher with snapshot so that it build the missing
Expand Down Expand Up @@ -351,7 +349,7 @@ public long takeSnapshot() throws IOException {
long startTime = Time.monotonicNow();
if (!isStateMachineHealthy()) {
String msg =
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
"Failed to take snapshot " + " for " + getGroupId() + " as the stateMachine"
+ " is unhealthy. The last applied index is at " + ti;
StateMachineException sme = new StateMachineException(msg);
LOG.error(msg);
Expand All @@ -360,19 +358,19 @@ public long takeSnapshot() throws IOException {
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
LOG.info("{}: Taking a snapshot at:{} file {}", getGroupId(), ti, snapshotFile);
try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
persistContainerSet(fos);
fos.flush();
// make sure the snapshot file is synced
fos.getFD().sync();
} catch (IOException ioe) {
LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
LOG.error("{}: Failed to write snapshot at:{} file {}", getGroupId(), ti,
snapshotFile);
throw ioe;
}
LOG.info("{}: Finished taking a snapshot at:{} file:{} took: {} ms",
gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
getGroupId(), ti, snapshotFile, (Time.monotonicNow() - startTime));
return ti.getIndex();
}
return -1;
Expand All @@ -386,7 +384,7 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole rol
final StateMachineLogEntryProto stateMachineLogEntry = entry.getStateMachineLogEntry();
final ContainerCommandRequestProto logProto;
try {
logProto = getContainerCommandRequestProto(gid, stateMachineLogEntry.getLogData());
logProto = getContainerCommandRequestProto(getGroupId(), stateMachineLogEntry.getLogData());
} catch (InvalidProtocolBufferException e) {
trx.setException(e);
return trx;
Expand All @@ -413,7 +411,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
long startTime = Time.monotonicNowNanos();
final ContainerCommandRequestProto proto =
message2ContainerCommandRequestProto(request.getMessage());
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
Preconditions.checkArgument(request.getRaftGroupId().equals(getGroupId()));

final TransactionContext.Builder builder = TransactionContext.newBuilder()
.setClientRequest(request)
Expand Down Expand Up @@ -449,7 +447,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
final WriteChunkRequestProto.Builder commitWriteChunkProto = WriteChunkRequestProto.newBuilder(write)
.clearData();
protoBuilder.setWriteChunk(commitWriteChunkProto)
.setPipelineID(gid.getUuid().toString())
.setPipelineID(getGroupId().getUuid().toString())
.setTraceID(proto.getTraceID());

builder.setStateMachineData(write.getData());
Expand Down Expand Up @@ -491,20 +489,20 @@ private static ContainerCommandRequestProto getContainerCommandRequestProto(

private ContainerCommandRequestProto message2ContainerCommandRequestProto(
Message message) throws InvalidProtocolBufferException {
return ContainerCommandRequestMessage.toProto(message.getContent(), gid);
return ContainerCommandRequestMessage.toProto(message.getContent(), getGroupId());
}

private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto, DispatcherContext context) {
if (LOG.isTraceEnabled()) {
LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", getGroupId(),
requestProto.getCmdType(), requestProto.getContainerID(),
requestProto.getPipelineID(), requestProto.getTraceID());
}
ContainerCommandResponseProto response =
dispatcher.dispatch(requestProto, context);
if (LOG.isTraceEnabled()) {
LOG.trace("{}: response {}", gid, response);
LOG.trace("{}: response {}", getGroupId(), response);
}
return response;
}
Expand All @@ -531,7 +529,7 @@ private CompletableFuture<Message> writeStateMachineData(
RaftServer server = ratisServer.getServer();
Preconditions.checkArgument(!write.getData().isEmpty());
try {
if (server.getDivision(gid).getInfo().isLeader()) {
if (server.getDivision(getGroupId()).getInfo().isLeader()) {
stateMachineDataCache.put(entryIndex, write.getData());
}
} catch (InterruptedException ioe) {
Expand Down Expand Up @@ -559,7 +557,7 @@ private CompletableFuture<Message> writeStateMachineData(
return dispatchCommand(requestProto, context);
} catch (Exception e) {
LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
"{} logIndex {} chunkName {}", gid, write.getBlockID(),
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName(), e);
metrics.incNumWriteDataFails();
// write chunks go in parallel. It's possible that one write chunk
Expand All @@ -573,7 +571,7 @@ private CompletableFuture<Message> writeStateMachineData(
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
"{} logIndex {} chunkName {}", gid, write.getBlockID(),
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName());
}
// Remove the future once it finishes execution from the
Expand All @@ -587,7 +585,7 @@ private CompletableFuture<Message> writeStateMachineData(
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName() + " Error message: " +
r.getMessage() + " Container Result: " + r.getResult());
Expand All @@ -601,7 +599,7 @@ private CompletableFuture<Message> writeStateMachineData(
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
if (LOG.isDebugEnabled()) {
LOG.debug(gid +
LOG.debug(getGroupId() +
": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName());
Expand All @@ -622,7 +620,7 @@ private StateMachine.DataChannel getStreamDataChannel(
DispatcherContext context) throws StorageContainerException {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: getStreamDataChannel {} containerID={} pipelineID={} " +
"traceID={}", gid, requestProto.getCmdType(),
"traceID={}", getGroupId(), requestProto.getCmdType(),
requestProto.getContainerID(), requestProto.getPipelineID(),
requestProto.getTraceID());
}
Expand Down Expand Up @@ -781,7 +779,7 @@ private ByteString readStateMachineData(
new StorageContainerException(response.getMessage(),
response.getResult());
LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : "
+ "{} Container Result: {}", gid, response.getCmdType(), index,
+ "{} Container Result: {}", getGroupId(), response.getCmdType(), index,
response.getMessage(), response.getResult());
stateMachineHealthy.set(false);
throw sce;
Expand Down Expand Up @@ -856,7 +854,7 @@ public CompletableFuture<ByteString> read(LogEntryProto entry, TransactionContex
.map(TransactionContext::getStateMachineContext)
.orElse(null);
final ContainerCommandRequestProto requestProto = context != null ? context.getLogProto()
: getContainerCommandRequestProto(gid, entry.getStateMachineLogEntry().getLogData());
: getContainerCommandRequestProto(getGroupId(), entry.getStateMachineLogEntry().getLogData());

if (requestProto.getCmdType() != Type.WriteChunk) {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
Expand All @@ -874,7 +872,7 @@ public CompletableFuture<ByteString> read(LogEntryProto entry, TransactionContex
return future;
} catch (Exception e) {
metrics.incNumReadStateMachineFails();
LOG.error("{} unable to read stateMachineData:", gid, e);
LOG.error("{} unable to read stateMachineData:", getGroupId(), e);
return completeExceptionally(e);
}
}
Expand Down Expand Up @@ -920,7 +918,7 @@ public void notifyServerShutdown(RaftProtos.RoleInfoProto roleInfo, boolean allS
// from `HddsDatanodeService.stop()`, otherwise, it indicates this `close` originates from ratis.
if (allServer) {
if (datanodeService != null && !datanodeService.isStopped()) {
LOG.info("{} is closed by ratis", gid);
LOG.info("{} is closed by ratis", getGroupId());
if (semaphore.tryAcquire()) {
// run with a different thread, so this raft group can be closed
Runnable runnable = () -> {
Expand Down Expand Up @@ -952,7 +950,7 @@ public void notifyServerShutdown(RaftProtos.RoleInfoProto roleInfo, boolean allS
CompletableFuture.runAsync(runnable);
}
} else {
LOG.info("{} is closed by HddsDatanodeService", gid);
LOG.info("{} is closed by HddsDatanodeService", getGroupId());
}
}
}
Expand Down Expand Up @@ -983,7 +981,7 @@ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
private void removeStateMachineDataIfNeeded(long index) {
if (waitOnBothFollowers) {
try {
RaftServer.Division division = ratisServer.getServer().getDivision(gid);
RaftServer.Division division = ratisServer.getServer().getDivision(getGroupId());
if (division.getInfo().isLeader()) {
long minIndex = Arrays.stream(division.getInfo()
.getFollowerNextIndices()).min().getAsLong();
Expand Down Expand Up @@ -1041,7 +1039,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
CompletableFuture<Message> applyTransactionFuture =
new CompletableFuture<>();
final Consumer<Throwable> exceptionHandler = e -> {
LOG.error(gid + ": failed to applyTransaction at logIndex " + index
LOG.error(getGroupId() + ": failed to applyTransaction at logIndex " + index
+ " for " + requestProto.getCmdType(), e);
stateMachineHealthy.compareAndSet(true, false);
metrics.incNumApplyTransactionsFails();
Expand Down Expand Up @@ -1069,7 +1067,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
"gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
+ "{} Container Result: {}", gid, r.getCmdType(), index,
+ "{} Container Result: {}", getGroupId(), r.getCmdType(), index,
r.getMessage(), r.getResult());
metrics.incNumApplyTransactionsFails();
// Since the applyTransaction now is completed exceptionally,
Expand All @@ -1078,12 +1076,12 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// shutdown.
applyTransactionFuture.completeExceptionally(sce);
stateMachineHealthy.compareAndSet(true, false);
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
ratisServer.handleApplyTransactionFailure(getGroupId(), trx.getServerRole());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
+ "{} Container Result: {}", gid, r.getCmdType(), index,
+ "{} Container Result: {}", getGroupId(), r.getCmdType(), index,
r.getMessage(), r.getResult());
}
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
Expand Down Expand Up @@ -1161,25 +1159,25 @@ public void evictStateMachineCache() {

@Override
public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer follower) {
ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower);
ratisServer.handleFollowerSlowness(getGroupId(), roleInfoProto, follower);
}

@Override
public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
ratisServer.handleNoLeader(gid, roleInfoProto);
ratisServer.handleNoLeader(getGroupId(), roleInfoProto);
}

@Override
public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
LOG.error("{}: {} {}", gid, TermIndex.valueOf(failedEntry),
LOG.error("{}: {} {}", getGroupId(), TermIndex.valueOf(failedEntry),
toStateMachineLogEntryString(failedEntry.getStateMachineLogEntry()), t);
ratisServer.handleNodeLogFailure(gid, t);
ratisServer.handleNodeLogFailure(getGroupId(), t);
}

@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto,
ratisServer.handleInstallSnapshotFromLeader(getGroupId(), roleInfoProto,
firstTermIndexInLog);
final CompletableFuture<TermIndex> future = new CompletableFuture<>();
future.complete(firstTermIndexInLog);
Expand All @@ -1188,15 +1186,15 @@ public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(

@Override
public void notifyGroupRemove() {
ratisServer.notifyGroupRemove(gid);
ratisServer.notifyGroupRemove(getGroupId());
// Make best effort to quasi-close all the containers on group removal.
// Containers already in terminal state like CLOSED or UNHEALTHY will not
// be affected.
for (Long cid : container2BCSIDMap.keySet()) {
try {
containerController.markContainerForClose(cid);
containerController.quasiCloseContainer(cid,
"Ratis group removed. Group id: " + gid);
"Ratis group removed. Group id: " + getGroupId());
} catch (IOException e) {
LOG.debug("Failed to quasi-close container {}", cid);
}
Expand All @@ -1218,7 +1216,7 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,

@Override
public String toStateMachineLogEntryString(StateMachineLogEntryProto proto) {
return smProtoToString(gid, containerController, proto);
return smProtoToString(getGroupId(), containerController, proto);
}

public static String smProtoToString(RaftGroupId gid,
Expand Down