Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iterator):Auto commit mode for applying log iterator #962

Merged
merged 7 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Iterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
*/
public interface Iterator extends java.util.Iterator<ByteBuffer> {

/**
* When calling #{Iterator#next}, whether to commit the state machine. Don't by default.
* @param status enables auto-commit mode or not, true means enable.
*/
void setAutoCommitPerLog(boolean status);

/**
* Return the data whose content is the same as what was passed to
* Node#apply(Task) in the leader node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ public class IteratorImpl {
private final long firstClosureIndex;
private long currentIndex;
private final long committedIndex;
private long fsmCommittedIndex; // fsm commit index
private LogEntry currEntry = new LogEntry(); // blank entry
private long fsmCommittedIndex; // fsm commit index
private LogEntry currEntry = new LogEntry(); // blank entry
private final AtomicLong applyingIndex;
private RaftException error;
private boolean autoCommitPerLog = false; // Default enabled

public IteratorImpl(final FSMCallerImpl fsmCaller, final LogManager logManager, final List<Closure> closures,
final long firstClosureIndex, final long lastAppliedIndex, final long committedIndex,
Expand Down Expand Up @@ -90,6 +91,14 @@ public boolean hasError() {
return this.error != null;
}

public void setAutoCommitPerLog(boolean status) {
this.autoCommitPerLog = status;
}

public boolean getAutoCommitPerLog() {
return this.autoCommitPerLog;
}

/**
* Move to next
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,22 @@ public boolean hasNext() {

@Override
public ByteBuffer next() {
// commit log if auto-commit mode is enabled and no errors occur before accessing the next log
if (impl.getAutoCommitPerLog() && !impl.hasError()) {
commit();
}
final ByteBuffer data = getData();
if (hasNext()) {
this.impl.next();
}
return data;
}

@Override
public void setAutoCommitPerLog(boolean status) {
impl.setAutoCommitPerLog(status);
}

@Override
public ByteBuffer getData() {
final LogEntry entry = this.impl.entry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,31 @@ public void testSetErrorAndRollback() {
iterImpl.getError().getStatus().getErrorMsg());
assertEquals(6, iter.getIndex());
}

@Test
public void testAutoCommitPerLog() {
iter.setAutoCommitPerLog(true);
int i = 1;
while (iter.hasNext()) {
assertEquals(i, iter.getIndex());
assertNotNull(iter.done());
assertEquals(i, iter.getIndex());
assertEquals(1, iter.getTerm());
assertEquals(i, iter.getData().remaining());
iter.next();
i++;
}
assertFalse(iter.commit());
assertEquals(i, 11);
assertFalse(iterImpl.hasError());
this.iter.setErrorAndRollback(10, new Status(-1, "test"));
assertTrue(iterImpl.hasError());
Assert.assertEquals(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE, iterImpl.getError().getType());
Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), iterImpl.getError().getStatus().getCode());
Assert
.assertEquals(
"StateMachine meet critical error when applying one or more tasks since index=11, Status[UNKNOWN<-1>: test]",
iterImpl.getError().getStatus().getErrorMsg());
assertEquals(11, iter.getIndex());
}
}