Skip to content

Commit

Permalink
feat: add methods to Node for getting last log,last committed log and…
Browse files Browse the repository at this point in the history
… last applied log index #879
  • Loading branch information
killme2008 committed Sep 17, 2022
1 parent 41421c1 commit 2bbb1b5
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 0 deletions.
6 changes: 6 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ interface LastAppliedLogIndexListener {
*/
long getLastAppliedIndex();

/**
* Returns the last log entry that was committed in raft group.
* @return
*/
long getLastCommittedIndex();

/**
* Called after shutdown to wait it terminates.
*
Expand Down
21 changes: 21 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,25 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
* @since 1.3.8
*/
State getNodeState();

/**
* Retrieve the last log index in log storage. Note: the last log may not be committed to raft group, and may be truncated in some cases.
* @return the last log index
* @since 1.3.12
*/
long getLastLogIndex();

/**
* Retrieve the last log index that committed to the raft group.
* @return the last committed log index
* @since 1.3.12
*/
long getLastCommittedIndex();

/**
* Retrieve the last log index that applied to state machine.
* @return the last applied log index
* @since 1.3.12
*/
long getLastAppliedLogIndex();
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ private void setFsmThread() {
private StateMachine fsm;
private ClosureQueue closureQueue;
private final AtomicLong lastAppliedIndex;
private final AtomicLong lastCommittedIndex;
private long lastAppliedTerm;
private Closure afterShutdown;
private NodeImpl node;
Expand All @@ -180,6 +181,7 @@ public FSMCallerImpl() {
this.currTask = TaskType.IDLE;
this.lastAppliedIndex = new AtomicLong(0);
this.applyingIndex = new AtomicLong(0);
this.lastCommittedIndex = new AtomicLong(0);
}

@SuppressWarnings("unchecked")
Expand All @@ -191,6 +193,7 @@ public boolean init(final FSMCallerOptions opts) {
this.afterShutdown = opts.getAfterShutdown();
this.node = opts.getNode();
this.nodeMetrics = this.node.getNodeMetrics();
this.lastCommittedIndex.set(opts.getBootstrapId().getIndex());
this.lastAppliedIndex.set(opts.getBootstrapId().getIndex());
notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get());
this.lastAppliedTerm = opts.getBootstrapId().getTerm();
Expand Down Expand Up @@ -357,6 +360,11 @@ public boolean onError(final RaftException error) {
});
}

@Override
public long getLastCommittedIndex() {
return lastCommittedIndex.get();
}

@Override
public long getLastAppliedIndex() {
return this.lastAppliedIndex.get();
Expand Down Expand Up @@ -509,6 +517,7 @@ private void doCommitted(final long committedIndex) {
if (lastAppliedIndex >= committedIndex) {
return;
}
this.lastCommittedIndex.set(committedIndex);
final long startMs = Utils.monotonicMs();
try {
final List<Closure> closures = new ArrayList<>();
Expand Down Expand Up @@ -707,6 +716,7 @@ private void doSnapshotLoad(final LoadSnapshotClosure done) {
}
this.fsm.onConfigurationCommitted(conf);
}
this.lastCommittedIndex.set(meta.getLastIncludedIndex());
this.lastAppliedIndex.set(meta.getLastIncludedIndex());
this.lastAppliedTerm = meta.getLastIncludedTerm();
done.run(Status.OK());
Expand Down
39 changes: 39 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3458,6 +3458,45 @@ public State getNodeState() {
return this.state;
}

@Override
public long getLastLogIndex() {
this.readLock.lock();
try {
if (this.state.isActive()) {
return this.logManager.getLastLogIndex();
}
throw new IllegalStateException("The node is not actived");
} finally {
this.readLock.unlock();
}
}

@Override
public long getLastCommittedIndex() {
this.readLock.lock();
try {
if (this.state.isActive()) {
return this.fsmCaller.getLastCommittedIndex();
}
throw new IllegalStateException("The node is not actived");
} finally {
this.readLock.unlock();
}
}

@Override
public long getLastAppliedLogIndex() {
this.readLock.lock();
try {
if (this.state.isActive()) {
return this.fsmCaller.getLastAppliedIndex();
}
throw new IllegalStateException("The node is not actived");
} finally {
this.readLock.unlock();
}
}

@Override
public void describe(final Printer out) {
// node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void testOnCommittedError() throws Exception {
assertTrue(this.fsmCaller.onCommitted(11));

this.fsmCaller.flush();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 11);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 10);
Mockito.verify(this.logManager).setAppliedId(new LogId(10, 1));
assertFalse(this.fsmCaller.getError().getStatus().isOk());
Expand All @@ -122,6 +123,7 @@ public void testOnCommitted() throws Exception {
assertTrue(this.fsmCaller.onCommitted(11));

this.fsmCaller.flush();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 11);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 11);
Mockito.verify(this.fsm).onApply(itArg.capture());
final Iterator it = itArg.getValue();
Expand Down Expand Up @@ -153,6 +155,7 @@ public SnapshotReader start() {
}
});
latch.await();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 12);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 12);
Mockito.verify(this.fsm).onConfigurationCommitted(Mockito.any());
}
Expand Down Expand Up @@ -181,6 +184,7 @@ public SnapshotReader start() {
}
});
latch.await();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 10);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 10);
}

Expand Down Expand Up @@ -316,6 +320,7 @@ public SnapshotReader start() {
}
});
latch.await();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 10);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 10);
}

Expand Down
18 changes: 18 additions & 0 deletions jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -596,11 +596,19 @@ public void testTripleNodes() throws Exception {

// get leader
final Node leader = cluster.getLeader();
assertEquals(1, leader.getLastAppliedLogIndex());
assertEquals(1, leader.getLastCommittedIndex());
assertEquals(1, leader.getLastLogIndex());
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
this.sendTestTaskAndWait(leader);

assertEquals(11, leader.getLastCommittedIndex());
assertEquals(11, leader.getLastLogIndex());
Thread.sleep(500);
assertEquals(11, leader.getLastAppliedLogIndex());

{
final ByteBuffer data = ByteBuffer.wrap("no closure".getBytes());
final Task task = new Task(data, null);
Expand Down Expand Up @@ -633,8 +641,18 @@ public void onCommitted() {
assertEquals("apply", cbs.get(1));
}

assertEquals(13, leader.getLastCommittedIndex());
assertEquals(13, leader.getLastLogIndex());
Thread.sleep(500);
assertEquals(13, leader.getLastAppliedLogIndex());

cluster.ensureSame(-1);
assertEquals(2, cluster.getFollowers().size());
for (Node follower : cluster.getFollowers()) {
assertEquals(13, follower.getLastCommittedIndex());
assertEquals(13, follower.getLastLogIndex());
assertEquals(13, follower.getLastAppliedLogIndex());
}
cluster.stopAll();
}

Expand Down

0 comments on commit 2bbb1b5

Please sign in to comment.