Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't block heartbeat response when fsm is busy #969

Merged
merged 4 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ interface LastAppliedLogIndexListener {
*/
boolean onCommitted(final long committedIndex);

/**
* Given specified <tt>requiredCapacity</tt> determines if that amount of space
* is available to submit new tasks to fsm. Returns true when available.
* @param requiredCapacity
* @return Returns true when available.
*/
public boolean hasAvailableCapacity(final int requiredCapacity);

/**
* Called when loading snapshot.
*
Expand Down
20 changes: 14 additions & 6 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class BallotBox implements Lifecycle<BallotBoxOptions>, Describer {
private long lastCommittedIndex = 0;
private long pendingIndex;
private final SegmentList<Ballot> pendingMetaQueue = new SegmentList<>(false);
private BallotBoxOptions opts;

@OnlyForTest
long getPendingIndex() {
Expand Down Expand Up @@ -84,6 +85,7 @@ public boolean init(final BallotBoxOptions opts) {
LOG.error("waiter or closure queue is null.");
return false;
}
this.opts = opts;
this.waiter = opts.getWaiter();
this.closureQueue = opts.getClosureQueue();
return true;
Expand Down Expand Up @@ -128,7 +130,8 @@ public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final
// removal request, we think it's safe to commit all the uncommitted
// previous logs, which is not well proved right now
this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);
LOG.debug("Committed log fromIndex={}, toIndex={}.", this.pendingIndex, lastCommittedIndex);
LOG.debug("Node {} committed log fromIndex={}, toIndex={}.", this.opts.getNodeId(), this.pendingIndex,
lastCommittedIndex);
this.pendingIndex = lastCommittedIndex + 1;
this.lastCommittedIndex = lastCommittedIndex;
} finally {
Expand Down Expand Up @@ -168,13 +171,13 @@ public boolean resetPendingIndex(final long newPendingIndex) {
final long stamp = this.stampedLock.writeLock();
try {
if (!(this.pendingIndex == 0 && this.pendingMetaQueue.isEmpty())) {
LOG.error("resetPendingIndex fail, pendingIndex={}, pendingMetaQueueSize={}.", this.pendingIndex,
this.pendingMetaQueue.size());
LOG.error("Node {} resetPendingIndex fail, pendingIndex={}, pendingMetaQueueSize={}.",
this.opts.getNodeId(), this.pendingIndex, this.pendingMetaQueue.size());
return false;
}
if (newPendingIndex <= this.lastCommittedIndex) {
LOG.error("resetPendingIndex fail, newPendingIndex={}, lastCommittedIndex={}.", newPendingIndex,
this.lastCommittedIndex);
LOG.error("Node {} resetPendingIndex fail, newPendingIndex={}, lastCommittedIndex={}.",
this.opts.getNodeId(), newPendingIndex, this.lastCommittedIndex);
return false;
}
this.pendingIndex = newPendingIndex;
Expand Down Expand Up @@ -203,7 +206,7 @@ public boolean appendPendingTask(final Configuration conf, final Configuration o
final long stamp = this.stampedLock.writeLock();
try {
if (this.pendingIndex <= 0) {
LOG.error("Fail to appendingTask, pendingIndex={}.", this.pendingIndex);
LOG.error("Node {} fail to appendingTask, pendingIndex={}.", this.opts.getNodeId(), this.pendingIndex);
return false;
}
this.pendingMetaQueue.add(bl);
Expand Down Expand Up @@ -231,6 +234,11 @@ public boolean setLastCommittedIndex(final long lastCommittedIndex) {
lastCommittedIndex);
return false;
}
if (!this.waiter.hasAvailableCapacity(1)) {
LOG.warn("Node {} fsm is busy, can't set lastCommittedIndex to be {}, lastCommittedIndex={}.",
this.opts.getNodeId(), lastCommittedIndex, this.lastCommittedIndex);
return false;
}
if (lastCommittedIndex < this.lastCommittedIndex) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
return true;
}

@Override
public boolean hasAvailableCapacity(int requiredCapacity) {
if (this.shutdownLatch != null) {
return false;
}
return this.taskQueue.hasAvailableCapacity(requiredCapacity);
}

@Override
public boolean onCommitted(final long committedIndex) {
return enqueueTask((task, sequence) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ protected int adjustTimeout(final int timeoutMs) {
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
ballotBoxOpts.setClosureQueue(this.closureQueue);
ballotBoxOpts.setNodeId(getNodeId());
if (!this.ballotBox.init(ballotBoxOpts)) {
LOG.error("Node {} init ballotBox failed.", getNodeId());
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ private void handleReadIndex(final ReadOnlyOption option, final List<ReadIndexEv
final List<ReadIndexState> states = events.stream()
.filter(it -> option.equals(it.readOnlyOptions))
.map(it -> {
rb.addEntries(ZeroByteStringHelper.wrap(it.requestContext.get()));
byte [] bytes = it.requestContext.get();
rb.addEntries(ZeroByteStringHelper.wrap(bytes == null? new byte[0]: bytes));
return new ReadIndexState(it.requestContext, it.done, it.startTime);
})
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alipay.sofa.jraft.FSMCaller;
import com.alipay.sofa.jraft.closure.ClosureQueue;
import com.alipay.sofa.jraft.entity.NodeId;

/**
* Ballot box options.
Expand All @@ -30,6 +31,15 @@ public class BallotBoxOptions {

private FSMCaller waiter;
private ClosureQueue closureQueue;
private NodeId nodeId;

public NodeId getNodeId() {
return nodeId;
}

public void setNodeId(NodeId nodeId) {
this.nodeId = nodeId;
}

public FSMCaller getWaiter() {
return this.waiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
// Describe a specific SnapshotStorage in format ${type}://${parameters}
private String snapshotUri;

// Snapshot temp directory for writing. Default is null(not present), jraft will use a `temp` dir under #{snapshotUri}
private String snapshotTempUri;

// If enable, we will filter duplicate files before copy remote snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,10 @@ public void testSetLastCommittedIndexLessThan() {

@Test
public void testSetLastCommittedIndex() {
Mockito.when(this.waiter.hasAvailableCapacity(1)).thenReturn(true);
assertEquals(0, this.box.getLastCommittedIndex());
assertTrue(this.box.setLastCommittedIndex(1));
assertEquals(1, this.box.getLastCommittedIndex());
Mockito.verify(this.waiter, Mockito.only()).onCommitted(1);
Mockito.verify(this.waiter, Mockito.times(1)).onCommitted(1);
}
}