Skip to content

Commit

Permalink
[fix] add read-index timeout scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jun 12, 2020
1 parent 55d215e commit c700ab1
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,65 @@
*/
package com.alipay.sofa.jraft.closure;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.alipay.sofa.jraft.util.timer.Timeout;
import com.alipay.sofa.jraft.util.timer.Timer;
import com.alipay.sofa.jraft.util.timer.TimerTask;

/**
* Read index closure
*
* @author dennis
*/
public abstract class ReadIndexClosure implements Closure {
private static final Logger LOG = LoggerFactory.getLogger(ReadIndexClosure.class);

private static final Logger LOG = LoggerFactory
.getLogger(ReadIndexClosure.class);

private static final AtomicIntegerFieldUpdater<ReadIndexClosure> STATE_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(
ReadIndexClosure.class,
"state");

private static final long DEFAULT_TIMEOUT = SystemPropertyUtil.getInt(
"jraft.read-index.timeout",
2 * 1000);
private static final Timer TIMEOUT_SCANNER = NodeImpl.TIMER_FACTORY
.createTimer("read-index.timeout.scanner");

private static final int PENDING = 0;
private static final int COMPLETE = 1;
private static final int TIMEOUT = 2;

/**
* Invalid log index -1.
*/
public static final long INVALID_LOG_INDEX = -1;
private long index = INVALID_LOG_INDEX;
private byte[] requestContext;
public static final long INVALID_LOG_INDEX = -1;

private long index = INVALID_LOG_INDEX;
private byte[] requestContext;

private volatile int state;

public ReadIndexClosure() {
this(DEFAULT_TIMEOUT);
}

public ReadIndexClosure(long timeoutMs) {
this.state = PENDING;
TIMEOUT_SCANNER.newTimeout(new TimeoutTask(this), timeoutMs, TimeUnit.MILLISECONDS);
}

/**
* Called when ReadIndex can be executed.
Expand Down Expand Up @@ -79,10 +117,38 @@ public byte[] getRequestContext() {

@Override
public void run(final Status status) {
if (!STATE_UPDATER.compareAndSet(this, PENDING, COMPLETE)) {
LOG.warn("A timeout read-index response finally returned: {}.", status);
return;
}

try {
run(status, this.index, this.requestContext);
} catch (Throwable t) {
LOG.error("Fail to run ReadIndexClosure with status: {}.", status, t);
}
}

static class TimeoutTask implements TimerTask {

private final ReadIndexClosure closure;

TimeoutTask(ReadIndexClosure closure) {
this.closure = closure;
}

@Override
public void run(final Timeout timeout) throws Exception {
if (!STATE_UPDATER.compareAndSet(this.closure, PENDING, TIMEOUT)) {
return;
}

final Status status = new Status(RaftError.ETIMEDOUT, "read-index request timeout");
try {
this.closure.run(status, INVALID_LOG_INDEX, null);
} catch (final Throwable t) {
LOG.error("[Timeout] fail to run ReadIndexClosure with status: {}.", status, t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public class NodeImpl implements Node, RaftServerService {
}
}

private final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader
public final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader
.load(
RaftTimerFactory.class) //
.first();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ public void run(final Status status) {
} else {
// Not applied, add it to pending-notify cache.
ReadOnlyServiceImpl.this.pendingNotifyStatus
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
.add(readIndexStatus);
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
.add(readIndexStatus);
}
} finally {
if (doUnlock) {
Expand Down Expand Up @@ -242,21 +242,21 @@ public boolean init(final ReadOnlyServiceOptions opts) {
this.raftOptions = opts.getRaftOptions();

this.scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
this.readIndexDisruptor = DisruptorBuilder.<ReadIndexEvent> newInstance() //
.setEventFactory(new ReadIndexEventFactory()) //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.setProducerType(ProducerType.MULTI) //
.build();
.setEventFactory(new ReadIndexEventFactory()) //
.setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.setProducerType(ProducerType.MULTI) //
.build();
this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
this.readIndexDisruptor
.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.readIndexQueue = this.readIndexDisruptor.start();
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry() //
.register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
.register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
}
// listen on lastAppliedLogIndex change events.
this.fsmCaller.addLastAppliedLogIndexListener(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ public Scheduler getRaftScheduler(final boolean shared, final int workerNum, fin
return shared ? SCHEDULER_REF.getRef() : createScheduler(workerNum, name);
}

private static Timer createTimer(final String name) {
@Override
public Timer createTimer(final String name) {
return new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048);
}

private static Scheduler createScheduler(final int workerNum, final String name) {
@Override
public Scheduler createScheduler(final int workerNum, final String name) {
return new TimerManager(workerNum, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ public interface RaftTimerFactory {
Timer getSnapshotTimer(final boolean shared, final String name);

Scheduler getRaftScheduler(final boolean shared, final int workerNum, final String name);

Timer createTimer(final String name);

Scheduler createScheduler(final int workerNum, final String name);
}
49 changes: 49 additions & 0 deletions jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,55 @@ public void run(final Status status, final long index, final byte[] reqCtx) {
cluster.stopAll();
}

@Test
public void testReadIndexTimeout() throws Exception {
final List<PeerId> peers = TestUtils.generatePeers(3);

final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers);
for (final PeerId peer : peers) {
assertTrue(cluster.start(peer.getEndpoint(), false, 300, true));
}

// elect leader
cluster.waitLeader();

// get leader
final Node leader = cluster.getLeader();
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
sendTestTaskAndWait(leader);

assertReadIndex(leader, 11);

// read from follower
for (final Node follower : cluster.getFollowers()) {
assertNotNull(follower);
assertReadIndex(follower, 11);
}

// read with null request context
final CountDownLatch latch = new CountDownLatch(1);
final long start = System.currentTimeMillis();
leader.readIndex(null, new ReadIndexClosure(0) {

@Override
public void run(final Status status, final long index, final byte[] reqCtx) {
assertNull(reqCtx);
if (status.isOk()) {
System.err.println("Read-index so fast: " + (System.currentTimeMillis() - start) + "ms");
} else {
assertEquals(status, new Status(RaftError.ETIMEDOUT, "read-index request timeout"));
assertEquals(index, -1);
}
latch.countDown();
}
});
latch.await();

cluster.stopAll();
}

@Test
public void testReadIndexFromLearner() throws Exception {
final List<PeerId> peers = TestUtils.generatePeers(3);
Expand Down

0 comments on commit c700ab1

Please sign in to comment.