Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/some changes #358

Merged
merged 7 commits into from
Nov 28, 2019
Merged
4 changes: 1 addition & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,5 @@ cache:
install:
- mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true -B -V
- sh ./tools/check_format.sh

script:
- travis_retry mvn --projects $TESTFOLDER test

- MAVEN_SKIP_RC=true MAVEN_OPTS="-Xmx1g -server -XX:+UseG1GC" travis_retry mvn --projects $TESTFOLDER test
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.conf.ConfigurationEntry;
import com.alipay.sofa.jraft.entity.EnumOutter;
import com.alipay.sofa.jraft.entity.EnumOutter.ErrorType;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.LogEntry;
import com.alipay.sofa.jraft.entity.LogId;
Expand Down Expand Up @@ -231,7 +232,11 @@ private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
LOG.warn("FSMCaller is stopped, can not apply new task.");
return false;
}
this.taskQueue.publishEvent(tpl);
if (!this.taskQueue.tryPublishEvent(tpl)) {
onError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
"FSMCaller is overload.")));
return false;
}
return true;
}

Expand Down
49 changes: 43 additions & 6 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ private void decayTargetPriority() {
/**
* Check and set configuration for node.At the same time, if configuration is changed,
* then compute and update the target priority value.
*
*
* @param inLock whether the writeLock has already been locked in other place.
*
*/
Expand Down Expand Up @@ -905,10 +905,27 @@ protected void onTrigger() {
this.snapshotTimer = new RepeatedTimer("JRaft-SnapshotTimer-" + suffix,
this.options.getSnapshotIntervalSecs() * 1000) {

private volatile boolean firstSchedule = true;

@Override
protected void onTrigger() {
handleSnapshotTimeout();
}

@Override
protected int adjustTimeout(final int timeoutMs) {
if (!this.firstSchedule) {
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
return timeoutMs;
}

// Randomize the first snapshot trigger timeout
this.firstSchedule = false;
if (timeoutMs > 0) {
return ThreadLocalRandom.current().nextInt(timeoutMs) + 1;
} else {
return timeoutMs;
}
}
};

this.configManager = new ConfigurationManager();
Expand Down Expand Up @@ -1047,6 +1064,13 @@ protected void onTrigger() {
return true;
}

@OnlyForTest
void tryElectSelf() {
this.writeLock.lock();
// unlock in electSelf
electSelf();
}

// should be in writeLock
private void electSelf() {
long oldTerm;
Expand Down Expand Up @@ -2554,11 +2578,23 @@ private void preVote() {

private void handleVoteTimeout() {
this.writeLock.lock();
if (this.state == State.STATE_CANDIDATE) {
LOG.debug("Node {} term {} retry elect.", getNodeId(), this.currTerm);
electSelf();
} else {
if (this.state != State.STATE_CANDIDATE) {
this.writeLock.unlock();
return;
}

if (this.raftOptions.isStepDownWhenVoteTimedout()) {
LOG.warn(
"Candidate node {} term {} steps down when election reaching vote timeout: fail to get quorum vote-granted.",
this.nodeId, this.currTerm);
stepDown(this.currTerm, false, new Status(RaftError.ETIMEDOUT,
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
"Vote timeout: fail to get quorum vote-granted."));
// unlock in preVote
preVote();
} else {
LOG.debug("Node {} term {} retry to vote self.", getNodeId(), this.currTerm);
// unlock in electSelf
electSelf();
}
}

Expand Down Expand Up @@ -2613,7 +2649,8 @@ public void shutdown(final Closure done) {
if (this.applyQueue != null) {
final CountDownLatch latch = new CountDownLatch(1);
this.shutdownLatch = latch;
Utils.runInThread(() -> this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = latch));
Utils.runInThread(
() -> this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = latch));
} else {
final int num = GLOBAL_NUM_NODES.decrementAndGet();
LOG.info("The number of active nodes decrement to {}.", num);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ public void run(final Status status) {
} else {
// Not applied, add it to pending-notify cache.
ReadOnlyServiceImpl.this.pendingNotifyStatus
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
.add(readIndexStatus);
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
.add(readIndexStatus);
}
} finally {
if (doUnlock) {
Expand Down Expand Up @@ -225,32 +225,28 @@ public boolean init(final ReadOnlyServiceOptions opts) {
this.raftOptions = opts.getRaftOptions();

this.scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
this.readIndexDisruptor = DisruptorBuilder.<ReadIndexEvent> newInstance() //
.setEventFactory(new ReadIndexEventFactory()) //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.setProducerType(ProducerType.MULTI) //
.build();
.setEventFactory(new ReadIndexEventFactory()) //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.setProducerType(ProducerType.MULTI) //
.build();
this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
this.readIndexDisruptor
.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
this.readIndexQueue = this.readIndexDisruptor.start();
if(this.nodeMetrics.getMetricRegistry() != null) {
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry() //
.register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
.register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
}
// listen on lastAppliedLogIndex change events.
this.fsmCaller.addLastAppliedLogIndexListener(this);

// start scanner
this.scheduledExecutorService.scheduleAtFixedRate(
() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
this.raftOptions.getMaxElectionDelayMs(),
this.raftOptions.getMaxElectionDelayMs(),
TimeUnit.MILLISECONDS
);
this.scheduledExecutorService.scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
return true;
}

Expand All @@ -260,7 +256,9 @@ public synchronized void shutdown() {
return;
}
this.shutdownLatch = new CountDownLatch(1);
this.readIndexQueue.publishEvent((event, sequence) -> event.shutdownLatch = this.shutdownLatch);
Utils.runInThread(() -> {
this.readIndexQueue.publishEvent((event, sequence) -> event.shutdownLatch = this.shutdownLatch);
});
this.scheduledExecutorService.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ public class RaftOptions implements Copiable<RaftOptions> {
* in that case.
*/
private ReadOnlyOption readOnlyOptions = ReadOnlyOption.ReadOnlySafe;
/**
* Candidate steps down when election reaching timeout, default is true(enabled).
* @since 1.3.0
*/
private boolean stepDownWhenVoteTimedout = true;
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

public boolean isStepDownWhenVoteTimedout() {
return this.stepDownWhenVoteTimedout;
}

public void setStepDownWhenVoteTimedout(final boolean stepDownWhenVoteTimeout) {
this.stepDownWhenVoteTimedout = stepDownWhenVoteTimeout;
}

public int getDisruptorPublishEventWaitTimeoutSecs() {
return this.disruptorPublishEventWaitTimeoutSecs;
Expand Down Expand Up @@ -210,10 +223,10 @@ public void setSyncMeta(final boolean syncMeta) {
}

public boolean isOpenStatistics() {
return openStatistics;
return this.openStatistics;
}

public void setOpenStatistics(boolean openStatistics) {
public void setOpenStatistics(final boolean openStatistics) {
this.openStatistics = openStatistics;
}

Expand Down Expand Up @@ -242,13 +255,15 @@ public RaftOptions copy() {

@Override
public String toString() {
return "RaftOptions{" + "maxByteCountPerRpc=" + maxByteCountPerRpc + ", fileCheckHole=" + fileCheckHole
+ ", maxEntriesSize=" + maxEntriesSize + ", maxBodySize=" + maxBodySize + ", maxAppendBufferSize="
+ maxAppendBufferSize + ", maxElectionDelayMs=" + maxElectionDelayMs + ", electionHeartbeatFactor="
+ electionHeartbeatFactor + ", applyBatch=" + applyBatch + ", sync=" + sync + ", syncMeta=" + syncMeta
+ ", openStatistics=" + openStatistics + ", replicatorPipeline=" + replicatorPipeline
+ ", maxReplicatorInflightMsgs=" + maxReplicatorInflightMsgs + ", disruptorBufferSize="
+ disruptorBufferSize + ", disruptorPublishEventWaitTimeoutSecs=" + disruptorPublishEventWaitTimeoutSecs
+ ", enableLogEntryChecksum=" + enableLogEntryChecksum + ", readOnlyOptions=" + readOnlyOptions + '}';
return "RaftOptions{" + "maxByteCountPerRpc=" + this.maxByteCountPerRpc + ", fileCheckHole="
+ this.fileCheckHole + ", maxEntriesSize=" + this.maxEntriesSize + ", maxBodySize=" + this.maxBodySize
+ ", maxAppendBufferSize=" + this.maxAppendBufferSize + ", maxElectionDelayMs="
+ this.maxElectionDelayMs + ", electionHeartbeatFactor=" + this.electionHeartbeatFactor
+ ", applyBatch=" + this.applyBatch + ", sync=" + this.sync + ", syncMeta=" + this.syncMeta
+ ", openStatistics=" + this.openStatistics + ", replicatorPipeline=" + this.replicatorPipeline
+ ", maxReplicatorInflightMsgs=" + this.maxReplicatorInflightMsgs + ", disruptorBufferSize="
+ this.disruptorBufferSize + ", disruptorPublishEventWaitTimeoutSecs="
+ this.disruptorPublishEventWaitTimeoutSecs + ", enableLogEntryChecksum=" + this.enableLogEntryChecksum
+ ", readOnlyOptions=" + this.readOnlyOptions + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,14 @@ private void offerEvent(final StableClosure done, final EventType type) {
Utils.runClosureInThread(done, new Status(RaftError.ESTOP, "Log manager is stopped."));
return;
}
this.diskQueue.publishEvent((event, sequence) -> {
if (!this.diskQueue.tryPublishEvent((event, sequence) -> {
event.reset();
event.type = type;
event.done = done;
});
})) {
reportError(RaftError.EBUSY.getNumber(), "Log manager is overload.");
Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Log manager is overload."));
}
}

private boolean tryOfferEvent(final StableClosure done, final EventTranslator<StableClosureEvent> translator) {
Expand Down Expand Up @@ -752,14 +755,14 @@ public long getTerm(final long index) {
}
this.readLock.lock();
try {
// out of range, direct return NULL
if (index > this.lastLogIndex) {
return 0;
}
// check index equal snapshot_index, return snapshot_term
if (index == this.lastSnapshotId.getIndex()) {
return this.lastSnapshotId.getTerm();
}
// out of range, direct return 0
if (index > this.lastLogIndex || index < this.firstLogIndex) {
return 0;
}
final LogEntry entry = getEntryFromMemory(index);
if (entry != null) {
return entry.getId().getTerm();
Expand Down Expand Up @@ -832,13 +835,14 @@ private long unsafeGetTerm(final long index) {
if (index == 0) {
return 0;
}
if (index > this.lastLogIndex) {
return 0;
}

final LogId lss = this.lastSnapshotId;
if (index == lss.getIndex()) {
return lss.getTerm();
}
if (index > this.lastLogIndex || index < this.firstLogIndex) {
return 0;
}
final LogEntry entry = getEntryFromMemory(index);
if (entry != null) {
return entry.getId().getTerm();
Expand Down
Loading