Skip to content

Commit

Permalink
HDDS-11548. Add some logging to the StateMachine (apache#7291)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghuazhu authored Oct 11, 2024
1 parent 523c860 commit e2f2aeb
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public void initialize(
storage.init(raftStorage);
ratisServer.notifyGroupAdd(gid);

LOG.info("{}: initialize {}", server.getId(), id);
loadSnapshot(storage.getLatestSnapshot());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -41,6 +43,8 @@
* operation in DB.
*/
public class SCMHADBTransactionBufferImpl implements SCMHADBTransactionBuffer {

public static final Logger LOG = LoggerFactory.getLogger(SCMHADBTransactionBufferImpl.class);
private final StorageContainerManager scm;
private SCMMetadataStore metadataStore;
private BatchOperation currentBatchOperation;
Expand Down Expand Up @@ -107,6 +111,8 @@ public SnapshotInfo getLatestSnapshot() {

@Override
public void setLatestSnapshot(SnapshotInfo latestSnapshot) {
LOG.info("{}: Set latest Snapshot to {}",
scm.getScmHAManager().getRatisServer().getDivision().getId(), latestSnapshot);
this.latestSnapshot.set(latestSnapshot);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void initialize(RaftServer server, RaftGroupId id,
getLifeCycle().startAndTransition(() -> {
super.initialize(server, id, raftStorage);
storage.init(raftStorage);
LOG.info("{}: initialize {}", server.getId(), id);
});
}

Expand All @@ -149,6 +150,9 @@ public CompletableFuture<Message> applyTransaction(
final SCMRatisRequest request = SCMRatisRequest.decode(
Message.valueOf(trx.getStateMachineLogEntry().getLogData()));

if (LOG.isDebugEnabled()) {
LOG.debug("{}: applyTransaction {}", getId(), TermIndex.valueOf(trx.getLogEntry()));
}
try {
applyTransactionFuture.complete(process(request));
} catch (SCMException ex) {
Expand Down Expand Up @@ -389,6 +393,7 @@ public void notifyConfigurationChanged(long term, long index,
@Override
public void pause() {
final LifeCycle lc = getLifeCycle();
LOG.info("{}: Try to pause from current LifeCycle state {}", getId(), lc);
if (lc.getCurrentState() != LifeCycle.State.NEW) {
lc.transition(LifeCycle.State.PAUSING);
lc.transition(LifeCycle.State.PAUSED);
Expand All @@ -414,6 +419,8 @@ public void reinitialize() throws IOException {
throw new IOException(e);
}

LOG.info("{}: SCMStateMachine is reinitializing. newTermIndex = {}", getId(), termIndex);

// re-initialize the DBTransactionBuffer and update the lastAppliedIndex.
try {
transactionBuffer.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
import org.apache.hadoop.ozone.protocolPB.RequestHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
Expand Down Expand Up @@ -88,7 +89,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
new SimpleStateMachineStorage();
private final OzoneManager ozoneManager;
private RequestHandler handler;
private RaftGroupId raftGroupId;
private volatile OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
private final ExecutorService executorService;
private final ExecutorService installSnapshotExecutor;
Expand Down Expand Up @@ -134,17 +134,18 @@ public void initialize(RaftServer server, RaftGroupId id,
RaftStorage raftStorage) throws IOException {
getLifeCycle().startAndTransition(() -> {
super.initialize(server, id, raftStorage);
this.raftGroupId = id;
storage.init(raftStorage);
LOG.info("{}: initialize {} with {}", getId(), id, getLastAppliedTermIndex());
});
}

@Override
public synchronized void reinitialize() throws IOException {
loadSnapshotInfoFromDB();
if (getLifeCycleState() == LifeCycle.State.PAUSED) {
unpause(getLastAppliedTermIndex().getIndex(),
getLastAppliedTermIndex().getTerm());
final TermIndex lastApplied = getLastAppliedTermIndex();
unpause(lastApplied.getIndex(), lastApplied.getTerm());
LOG.info("{}: reinitialize {} with {}", getId(), getGroupId(), lastApplied);
}
}

Expand All @@ -160,6 +161,7 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
RaftPeerId newLeaderId) {
// Initialize OMHAMetrics
ozoneManager.omHAMetricsInit(newLeaderId.toString());
LOG.info("{}: leader changed to {}", groupMemberId, newLeaderId);
}

/** Notified by Ratis for non-StateMachine term-index update. */
Expand Down Expand Up @@ -263,7 +265,7 @@ public TransactionContext startTransaction(
messageContent);

Preconditions.checkArgument(raftClientRequest.getRaftGroupId().equals(
raftGroupId));
getGroupId()));
try {
handler.validateRequest(omRequest);
} catch (IOException ioe) {
Expand Down Expand Up @@ -293,6 +295,10 @@ public TransactionContext preAppendTransaction(TransactionContext trx)

OzoneManagerPrepareState prepareState = ozoneManager.getPrepareState();

if (LOG.isDebugEnabled()) {
LOG.debug("{}: preAppendTransaction {}", getId(), TermIndex.valueOf(trx.getLogEntry()));
}

if (cmdType == OzoneManagerProtocolProtos.Type.Prepare) {
// Must authenticate prepare requests here, since we must determine
// whether or not to apply the prepare gate before proceeding with the
Expand All @@ -303,8 +309,7 @@ public TransactionContext preAppendTransaction(TransactionContext trx)
if (ozoneManager.getAclsEnabled()
&& !ozoneManager.isAdmin(userGroupInformation)) {
String message = "Access denied for user " + userGroupInformation
+ ". "
+ "Superuser privilege is required to prepare ozone managers.";
+ ". Superuser privilege is required to prepare upgrade/downgrade.";
OMException cause =
new OMException(message, OMException.ResultCodes.ACCESS_DENIED);
// Leader should not step down because of this failure.
Expand Down Expand Up @@ -341,6 +346,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
: OMRatisHelper.convertByteStringToOMRequest(
trx.getStateMachineLogEntry().getLogData());
final TermIndex termIndex = TermIndex.valueOf(trx.getLogEntry());
LOG.debug("{}: applyTransaction {}", getId(), termIndex);
// In the current approach we have one single global thread executor.
// with single thread. Right now this is being done for correctness, as
// applyTransaction will be run on multiple OM's we want to execute the
Expand Down Expand Up @@ -427,12 +433,14 @@ public synchronized void pause() {
*/
public synchronized void unpause(long newLastAppliedSnaphsotIndex,
long newLastAppliedSnapShotTermIndex) {
LOG.info("OzoneManagerStateMachine is un-pausing");
if (statePausedCount.decrementAndGet() == 0) {
getLifeCycle().startAndTransition(() -> {
this.ozoneManagerDoubleBuffer = buildDoubleBufferForRatis();
this.setLastAppliedTermIndex(TermIndex.valueOf(
newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
LOG.info("{}: OzoneManagerStateMachine un-pause completed. " +
"newLastAppliedSnaphsotIndex: {}, newLastAppliedSnapShotTermIndex: {}",
getId(), newLastAppliedSnaphsotIndex, newLastAppliedSnapShotTermIndex);
});
}
}
Expand Down Expand Up @@ -482,15 +490,15 @@ private synchronized long takeSnapshotImpl() throws IOException {
final TermIndex applied = getLastAppliedTermIndex();
final TermIndex notified = getLastNotifiedTermIndex();
final TermIndex snapshot = applied.compareTo(notified) > 0 ? applied : notified;
LOG.info(" applied = {}", applied);
LOG.info(" skipped = {}", lastSkippedIndex);
LOG.info("notified = {}", notified);
LOG.info("snapshot = {}", snapshot);

long startTime = Time.monotonicNow();
final TransactionInfo transactionInfo = TransactionInfo.valueOf(snapshot);
ozoneManager.setTransactionInfo(transactionInfo);
ozoneManager.getMetadataManager().getTransactionInfoTable().put(TRANSACTION_INFO_KEY, transactionInfo);
ozoneManager.getMetadataManager().getStore().flushDB();
LOG.info("{}: taking snapshot. applied = {}, skipped = {}, " +
"notified = {}, current snapshot index = {}, took {} ms",
getId(), applied, lastSkippedIndex, notified, snapshot, Time.monotonicNow() - startTime);
return snapshot.getIndex();
}

Expand Down

0 comments on commit e2f2aeb

Please sign in to comment.