diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ReadIndexClosure.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ReadIndexClosure.java index cd715ef69..8a7dba16b 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ReadIndexClosure.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ReadIndexClosure.java @@ -16,12 +16,21 @@ */ package com.alipay.sofa.jraft.closure; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.core.NodeImpl; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; +import com.alipay.sofa.jraft.util.timer.Timeout; +import com.alipay.sofa.jraft.util.timer.Timer; +import com.alipay.sofa.jraft.util.timer.TimerTask; /** * Read index closure @@ -29,14 +38,43 @@ * @author dennis */ public abstract class ReadIndexClosure implements Closure { - private static final Logger LOG = LoggerFactory.getLogger(ReadIndexClosure.class); + + private static final Logger LOG = LoggerFactory + .getLogger(ReadIndexClosure.class); + + private static final AtomicIntegerFieldUpdater STATE_UPDATER = AtomicIntegerFieldUpdater + .newUpdater( + ReadIndexClosure.class, + "state"); + + private static final long DEFAULT_TIMEOUT = SystemPropertyUtil.getInt( + "jraft.read-index.timeout", + 2 * 1000); + private static final Timer TIMEOUT_SCANNER = NodeImpl.TIMER_FACTORY + .createTimer("read-index.timeout.scanner"); + + private static final int PENDING = 0; + private static final int COMPLETE = 1; + private static final int TIMEOUT = 2; /** * Invalid log index -1. */ - public static final long INVALID_LOG_INDEX = -1; - private long index = INVALID_LOG_INDEX; - private byte[] requestContext; + public static final long INVALID_LOG_INDEX = -1; + + private long index = INVALID_LOG_INDEX; + private byte[] requestContext; + + private volatile int state; + + public ReadIndexClosure() { + this(DEFAULT_TIMEOUT); + } + + public ReadIndexClosure(long timeoutMs) { + this.state = PENDING; + TIMEOUT_SCANNER.newTimeout(new TimeoutTask(this), timeoutMs, TimeUnit.MILLISECONDS); + } /** * Called when ReadIndex can be executed. @@ -79,10 +117,38 @@ public byte[] getRequestContext() { @Override public void run(final Status status) { + if (!STATE_UPDATER.compareAndSet(this, PENDING, COMPLETE)) { + LOG.warn("A timeout read-index response finally returned: {}.", status); + return; + } + try { run(status, this.index, this.requestContext); } catch (Throwable t) { LOG.error("Fail to run ReadIndexClosure with status: {}.", status, t); } } + + static class TimeoutTask implements TimerTask { + + private final ReadIndexClosure closure; + + TimeoutTask(ReadIndexClosure closure) { + this.closure = closure; + } + + @Override + public void run(final Timeout timeout) throws Exception { + if (!STATE_UPDATER.compareAndSet(this.closure, PENDING, TIMEOUT)) { + return; + } + + final Status status = new Status(RaftError.ETIMEDOUT, "read-index request timeout"); + try { + this.closure.run(status, INVALID_LOG_INDEX, null); + } catch (final Throwable t) { + LOG.error("[Timeout] fail to run ReadIndexClosure with status: {}.", status, t); + } + } + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 36314e55d..fefe7ffaa 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -153,7 +153,7 @@ public class NodeImpl implements Node, RaftServerService { } } - private final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader + public final static RaftTimerFactory TIMER_FACTORY = JRaftServiceLoader .load( RaftTimerFactory.class) // .first(); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java index df5afd61b..0df91f6ce 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReadOnlyServiceImpl.java @@ -178,8 +178,8 @@ public void run(final Status status) { } else { // Not applied, add it to pending-notify cache. ReadOnlyServiceImpl.this.pendingNotifyStatus - .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) // - .add(readIndexStatus); + .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) // + .add(readIndexStatus); } } finally { if (doUnlock) { @@ -242,21 +242,21 @@ public boolean init(final ReadOnlyServiceOptions opts) { this.raftOptions = opts.getRaftOptions(); this.scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true)); + .newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true)); this.readIndexDisruptor = DisruptorBuilder. newInstance() // - .setEventFactory(new ReadIndexEventFactory()) // - .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) // - .setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) // - .setWaitStrategy(new BlockingWaitStrategy()) // - .setProducerType(ProducerType.MULTI) // - .build(); + .setEventFactory(new ReadIndexEventFactory()) // + .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) // + .setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true)) // + .setWaitStrategy(new BlockingWaitStrategy()) // + .setProducerType(ProducerType.MULTI) // + .build(); this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler()); this.readIndexDisruptor - .setDefaultExceptionHandler(new LogExceptionHandler(this.getClass().getSimpleName())); + .setDefaultExceptionHandler(new LogExceptionHandler(getClass().getSimpleName())); this.readIndexQueue = this.readIndexDisruptor.start(); if (this.nodeMetrics.getMetricRegistry() != null) { this.nodeMetrics.getMetricRegistry() // - .register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue)); + .register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue)); } // listen on lastAppliedLogIndex change events. this.fsmCaller.addLastAppliedLogIndexListener(this); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java index a4143744c..68ec4feff 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/DefaultRaftTimerFactory.java @@ -95,11 +95,13 @@ public Scheduler getRaftScheduler(final boolean shared, final int workerNum, fin return shared ? SCHEDULER_REF.getRef() : createScheduler(workerNum, name); } - private static Timer createTimer(final String name) { + @Override + public Timer createTimer(final String name) { return new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048); } - private static Scheduler createScheduler(final int workerNum, final String name) { + @Override + public Scheduler createScheduler(final int workerNum, final String name) { return new TimerManager(workerNum, name); } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java index dcb60171f..fba500d13 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/timer/RaftTimerFactory.java @@ -32,4 +32,8 @@ public interface RaftTimerFactory { Timer getSnapshotTimer(final boolean shared, final String name); Scheduler getRaftScheduler(final boolean shared, final int workerNum, final String name); + + Timer createTimer(final String name); + + Scheduler createScheduler(final int workerNum, final String name); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index be35a7208..7d957ca08 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -1368,6 +1368,55 @@ public void run(final Status status, final long index, final byte[] reqCtx) { cluster.stopAll(); } + @Test + public void testReadIndexTimeout() throws Exception { + final List peers = TestUtils.generatePeers(3); + + final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers); + for (final PeerId peer : peers) { + assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); + } + + // elect leader + cluster.waitLeader(); + + // get leader + final Node leader = cluster.getLeader(); + assertNotNull(leader); + assertEquals(3, leader.listPeers().size()); + // apply tasks to leader + sendTestTaskAndWait(leader); + + assertReadIndex(leader, 11); + + // read from follower + for (final Node follower : cluster.getFollowers()) { + assertNotNull(follower); + assertReadIndex(follower, 11); + } + + // read with null request context + final CountDownLatch latch = new CountDownLatch(1); + final long start = System.currentTimeMillis(); + leader.readIndex(null, new ReadIndexClosure(0) { + + @Override + public void run(final Status status, final long index, final byte[] reqCtx) { + assertNull(reqCtx); + if (status.isOk()) { + System.err.println("Read-index so fast: " + (System.currentTimeMillis() - start) + "ms"); + } else { + assertEquals(status, new Status(RaftError.ETIMEDOUT, "read-index request timeout")); + assertEquals(index, -1); + } + latch.countDown(); + } + }); + latch.await(); + + cluster.stopAll(); + } + @Test public void testReadIndexFromLearner() throws Exception { final List peers = TestUtils.generatePeers(3);