Skip to content

Commit

Permalink
(fix) Iterator#setErrorAndRollback may break ReadIndex promise, #317 (#…
Browse files Browse the repository at this point in the history
…361)

* (fix) Iterator#setErrorAndRollback may break ReadIndex promise, #317

* (feat) Adds comment

* fix hang with CI

* (fix) FSMCallerTest#testOnCommittedError

* feat/release 1.3.0 (#363)

* minor change

* release 1.3.0

* release 1.3.0

* fix hang with ut
  • Loading branch information
killme2008 authored and fengjiachun committed Nov 30, 2019
1 parent 8fdde14 commit f8d84b6
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 45 deletions.
2 changes: 1 addition & 1 deletion jraft-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>jraft-parent</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.2.6</version>
<version>1.3.0</version>
</parent>
<artifactId>jraft-core</artifactId>
<packaging>jar</packaging>
Expand Down
5 changes: 3 additions & 2 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Iterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public interface Iterator extends java.util.Iterator<ByteBuffer> {
* |ntail| tasks (starting from the last iterated one) as not applied. After
* this point, no further changes on the StateMachine as well as the Node
* would be allowed and you should try to repair this replica or just drop it.
*
* If |statInfo| is not NULL, it should describe the detail of the error.
*
* @param ntail the number of tasks (starting from the last iterated one) considered as not to be applied.
* @param st Status to describe the detail of the error.
*/
void setErrorAndRollback(final long ntail, final Status st);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.jraft;

import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;

/**
Expand All @@ -43,4 +44,10 @@ public interface ReadOnlyService extends Lifecycle<ReadOnlyServiceOptions> {
*/
void join() throws InterruptedException;

/**
* Called when the node is turned into error state.
* @param error error with raft info
*/
void setError(final RaftException error);

}
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,10 @@ private void doCommitted(final long committedIndex) {
final long lastIndex = iterImpl.getIndex() - 1;
final long lastTerm = this.logManager.getTerm(lastIndex);
final LogId lastAppliedId = new LogId(lastIndex, lastTerm);
this.lastAppliedIndex.set(committedIndex);
this.lastAppliedIndex.set(lastIndex);
this.lastAppliedTerm = lastTerm;
this.logManager.setAppliedId(lastAppliedId);
notifyLastAppliedIndexUpdated(committedIndex);
notifyLastAppliedIndexUpdated(lastIndex);
} finally {
this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - startMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2373,6 +2373,9 @@ public void onError(final RaftException error) {
// onError of fsmCaller is guaranteed to be executed once.
this.fsmCaller.onError(error);
}
if (this.readOnlyService != null) {
this.readOnlyService.setError(error);
}
this.writeLock.lock();
try {
// If it is leader, need to wake up a new one;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.alipay.sofa.jraft.entity.ReadIndexState;
import com.alipay.sofa.jraft.entity.ReadIndexStatus;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyServiceOptions;
import com.alipay.sofa.jraft.rpc.RpcRequests.ReadIndexRequest;
Expand Down Expand Up @@ -82,6 +83,8 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex

private NodeMetrics nodeMetrics;

private volatile RaftException error;

// <logIndex, statusList>
private final TreeMap<Long, List<ReadIndexStatus>> pendingNotifyStatus = new TreeMap<>();

Expand Down Expand Up @@ -217,6 +220,20 @@ private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}

private void resetPendingStatusError(final Status st) {
this.lock.lock();
try {
for (final List<ReadIndexStatus> statuses : this.pendingNotifyStatus.values()) {
for (final ReadIndexStatus status : statuses) {
reportError(status, st);
}
}
this.pendingNotifyStatus.clear();
} finally {
this.lock.unlock();
}
}

@Override
public boolean init(final ReadOnlyServiceOptions opts) {
this.node = opts.getNode();
Expand Down Expand Up @@ -250,6 +267,13 @@ public boolean init(final ReadOnlyServiceOptions opts) {
return true;
}

@Override
public synchronized void setError(final RaftException error) {
if (this.error == null) {
this.error = error;
}
}

@Override
public synchronized void shutdown() {
if (this.shutdownLatch != null) {
Expand All @@ -267,6 +291,7 @@ public void join() throws InterruptedException {
this.shutdownLatch.await();
}
this.readIndexDisruptor.shutdown();
resetPendingStatusError(new Status(RaftError.ESTOP, "Node is quit."));
this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -331,6 +356,16 @@ public void onApplied(final long appliedIndex) {
}

}

/*
* Remaining pending statuses are notified by error if it is presented.
* When the node is in error state, consider following situations:
* 1. If commitIndex > appliedIndex, then all pending statuses should be notified by error status.
* 2. When commitIndex == appliedIndex, there will be no more pending statuses.
*/
if (this.error != null) {
resetPendingStatusError(this.error.getStatus());
}
} finally {
this.lock.unlock();
if (pendingStatuses != null && !pendingStatuses.isEmpty()) {
Expand All @@ -356,6 +391,20 @@ TreeMap<Long, List<ReadIndexStatus>> getPendingNotifyStatus() {
return this.pendingNotifyStatus;
}

private void reportError(final ReadIndexStatus status, final Status st) {
final long nowMs = Utils.monotonicMs();
final List<ReadIndexState> states = status.getStates();
final int taskCount = states.size();
for (int i = 0; i < taskCount; i++) {
final ReadIndexState task = states.get(i);
final ReadIndexClosure done = task.getDone();
if (done != null) {
this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
done.run(st);
}
}
}

private void notifySuccess(final ReadIndexStatus status) {
final long nowMs = Utils.monotonicMs();
final List<ReadIndexState> states = status.getStates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ boolean prepareEntry(final long nextSendingIndex, final int offset, final RaftOu
}
emb.setTerm(entry.getId().getTerm());
if (entry.hasChecksum()) {
emb.setChecksum(entry.getChecksum()); //since 1.2.6
emb.setChecksum(entry.getChecksum()); // since 1.2.6
}
emb.setType(entry.getType());
if (entry.getPeers() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
*/
public class ReadIndexStatus {

private final ReadIndexRequest request; //raw request
private final List<ReadIndexState> states; //read index requests in batch.
private final long index; //committed log index.
private final ReadIndexRequest request; // raw request
private final List<ReadIndexState> states; // read index requests in batch.
private final long index; // committed log index.

public ReadIndexStatus(List<ReadIndexState> states, ReadIndexRequest request, long index) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.alipay.sofa.jraft.core;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.CountDownLatch;

import org.junit.After;
Expand Down Expand Up @@ -47,10 +51,6 @@
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.test.TestUtils;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(value = MockitoJUnitRunner.class)
public class FSMCallerTest {
private FSMCallerImpl fsmCaller;
Expand All @@ -67,9 +67,9 @@ public void setup() {
this.fsmCaller = new FSMCallerImpl();
this.closureQueue = new ClosureQueueImpl();
final FSMCallerOptions opts = new FSMCallerOptions();
Mockito.when(node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
opts.setNode(node);
opts.setFsm(fsm);
Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
opts.setNode(this.node);
opts.setFsm(this.fsm);
opts.setLogManager(this.logManager);
opts.setBootstrapId(new LogId(10, 1));
opts.setClosureQueue(this.closureQueue);
Expand All @@ -93,13 +93,13 @@ public void testShutdownJoin() throws Exception {

@Test
public void testOnCommittedError() throws Exception {
Mockito.when(logManager.getTerm(10)).thenReturn(1L);
Mockito.when(logManager.getEntry(11)).thenReturn(null);
Mockito.when(this.logManager.getTerm(10)).thenReturn(1L);
Mockito.when(this.logManager.getEntry(11)).thenReturn(null);

assertTrue(this.fsmCaller.onCommitted(11));

this.fsmCaller.flush();
assertEquals(this.fsmCaller.getLastAppliedIndex(), 11);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 10);
Mockito.verify(this.logManager).setAppliedId(new LogId(10, 1));
assertFalse(this.fsmCaller.getError().getStatus().isOk());
assertEquals("Fail to get entry at index=11 while committed_index=11", this.fsmCaller.getError().getStatus()
Expand All @@ -111,8 +111,8 @@ public void testOnCommitted() throws Exception {
final LogEntry log = new LogEntry(EntryType.ENTRY_TYPE_DATA);
log.getId().setIndex(11);
log.getId().setTerm(1);
Mockito.when(logManager.getTerm(11)).thenReturn(1L);
Mockito.when(logManager.getEntry(11)).thenReturn(log);
Mockito.when(this.logManager.getTerm(11)).thenReturn(1L);
Mockito.when(this.logManager.getEntry(11)).thenReturn(log);
final ArgumentCaptor<Iterator> itArg = ArgumentCaptor.forClass(Iterator.class);

assertTrue(this.fsmCaller.onCommitted(11));
Expand All @@ -138,7 +138,7 @@ public void testOnSnapshotLoad() throws Exception {
this.fsmCaller.onSnapshotLoad(new LoadSnapshotClosure() {

@Override
public void run(Status status) {
public void run(final Status status) {
assertTrue(status.isOk());
latch.countDown();
}
Expand All @@ -164,7 +164,7 @@ public void testOnSnapshotLoadFSMError() throws Exception {
this.fsmCaller.onSnapshotLoad(new LoadSnapshotClosure() {

@Override
public void run(Status status) {
public void run(final Status status) {
assertFalse(status.isOk());
assertEquals(-1, status.getCode());
assertEquals("StateMachine onSnapshotLoad failed", status.getErrorMsg());
Expand All @@ -186,14 +186,14 @@ public void testOnSnapshotSaveEmptyConf() throws Exception {
this.fsmCaller.onSnapshotSave(new SaveSnapshotClosure() {

@Override
public void run(Status status) {
public void run(final Status status) {
assertFalse(status.isOk());
assertEquals("Empty conf entry for lastAppliedIndex=10", status.getErrorMsg());
latch.countDown();
}

@Override
public SnapshotWriter start(SnapshotMeta meta) {
public SnapshotWriter start(final SnapshotMeta meta) {
// TODO Auto-generated method stub
return null;
}
Expand All @@ -209,12 +209,12 @@ public void testOnSnapshotSave() throws Exception {
final SaveSnapshotClosure done = new SaveSnapshotClosure() {

@Override
public void run(Status status) {
public void run(final Status status) {

}

@Override
public SnapshotWriter start(SnapshotMeta meta) {
public SnapshotWriter start(final SnapshotMeta meta) {
assertEquals(10, meta.getLastIncludedIndex());
return writer;
}
Expand Down Expand Up @@ -269,7 +269,7 @@ public void testOnSnapshotLoadStale() throws Exception {
this.fsmCaller.onSnapshotLoad(new LoadSnapshotClosure() {

@Override
public void run(Status status) {
public void run(final Status status) {
assertFalse(status.isOk());
assertEquals(RaftError.ESTALE, status.getRaftError());
latch.countDown();
Expand Down
Loading

0 comments on commit f8d84b6

Please sign in to comment.