From fa23d592acb6adab8227bbc9281dc3f03390b11d Mon Sep 17 00:00:00 2001 From: block Date: Mon, 1 Apr 2019 10:55:32 +0800 Subject: [PATCH] fixbug/install snapshot bug (#80) * (fix) required eof * (fix) typo * (fix) read file bug * (fix) format * (fix) code format * (fix) NodeTest's code format * (fix) FileService unit test * (fix) large snapshot unit test * (fix) add install snapshot rpc timeout, default is 5 min * (fix) code format * (fix) an error log is needed on copy failed * (fix) code format * (fix) typo * (fix) add unit test: testInstallLargeSnapshot() * (fix) minor fix * (fix) add more detailed error log on copy failed --- .../alipay/sofa/jraft/option/RpcOptions.java | 19 +- .../rpc/impl/core/BoltRaftClientService.java | 2 +- .../sofa/jraft/storage/FileService.java | 35 +- .../sofa/jraft/storage/io/FileReader.java | 17 +- .../sofa/jraft/storage/io/LocalDirReader.java | 31 +- .../snapshot/local/SnapshotFileReader.java | 16 +- .../storage/snapshot/remote/BoltSession.java | 58 +- .../snapshot/remote/RemoteFileCopier.java | 12 +- .../sofa/jraft/util/ByteBufferCollector.java | 33 +- .../com/alipay/sofa/jraft/util/Utils.java | 2 +- .../com/alipay/sofa/jraft/core/NodeTest.java | 494 +++++++++++------- .../alipay/sofa/jraft/core/TestCluster.java | 11 +- .../sofa/jraft/storage/FileServiceTest.java | 46 +- 13 files changed, 488 insertions(+), 288 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/RpcOptions.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/RpcOptions.java index 6870069d0..e14fc7a85 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/option/RpcOptions.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/option/RpcOptions.java @@ -32,6 +32,12 @@ public class RpcOptions { */ private int rpcDefaultTimeout = 5000; + /** + * Install snapshot RPC request default timeout in milliseconds + * Default: 5 * 60 * 1000(5min) + */ + private int rpcInstallSnapshotTimeout = 5 * 60 * 1000; + /** * Rpc process thread pool size * Default: 80 @@ -75,10 +81,19 @@ public void setRpcDefaultTimeout(int rpcDefaultTimeout) { this.rpcDefaultTimeout = rpcDefaultTimeout; } + public int getRpcInstallSnapshotTimeout() { + return rpcInstallSnapshotTimeout; + } + + public void setRpcInstallSnapshotTimeout(int rpcInstallSnapshotTimeout) { + this.rpcInstallSnapshotTimeout = rpcInstallSnapshotTimeout; + } + @Override public String toString() { return "RpcOptions{" + "rpcConnectTimeoutMs=" + rpcConnectTimeoutMs + ", rpcDefaultTimeout=" - + rpcDefaultTimeout + ", rpcProcessorThreadPoolSize=" + rpcProcessorThreadPoolSize + ", metricRegistry=" - + metricRegistry + '}'; + + rpcDefaultTimeout + ", rpcInstallSnapshotTimeout=" + rpcInstallSnapshotTimeout + + ", rpcProcessorThreadPoolSize=" + rpcProcessorThreadPoolSize + ", metricRegistry=" + metricRegistry + + '}'; } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java index a9ed03893..bafde4c2c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/BoltRaftClientService.java @@ -100,7 +100,7 @@ public Future getFile(Endpoint endpoint, GetFileRequest request, int ti @Override public Future installSnapshot(Endpoint endpoint, InstallSnapshotRequest request, RpcResponseClosure done) { - return invokeWithDone(endpoint, request, done, rpcOptions.getRpcDefaultTimeout()); + return invokeWithDone(endpoint, request, done, rpcOptions.getRpcInstallSnapshotTimeout()); } @Override diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/FileService.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/FileService.java index 753761aef..ac3d5f1a9 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/FileService.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/FileService.java @@ -55,7 +55,7 @@ public final class FileService { private static final FileService INSTANCE = new FileService(); private final ConcurrentMap fileReaderMap = new ConcurrentHashMap<>(); - private final AtomicLong nextId; + private final AtomicLong nextId = new AtomicLong(); /** * Retrieve the singleton instance of FileService. @@ -72,18 +72,16 @@ void clear() { } private FileService() { - this.nextId = new AtomicLong(); - final long initValue = Math.abs(Utils.getProcessId(ThreadLocalRandom.current().nextLong(10000, - Integer.MAX_VALUE)) << 45 - | System.nanoTime() << 17 >> 17); - this.nextId.set(initValue); - LOG.info("Initial file reader id in FileService is {}", this.nextId); + final long processId = Utils.getProcessId(ThreadLocalRandom.current().nextLong(10000, Integer.MAX_VALUE)); + final long initialValue = Math.abs(processId << 45 | System.nanoTime() << 17 >> 17); + this.nextId.set(initialValue); + LOG.info("Initial file reader id in FileService is {}", initialValue); } /** - * Handle GetFileRequest ,run the response or set the response with done. + * Handle GetFileRequest, run the response or set the response with done. */ - public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) { + public Message handleGetFile(final GetFileRequest request, final RpcRequestClosure done) { if (request.getCount() <= 0 || request.getOffset() < 0) { return RpcResponseFactory.newResponse(RaftError.EREQUEST, "Invalid request: %s", request); } @@ -92,8 +90,11 @@ public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) { return RpcResponseFactory.newResponse(RaftError.ENOENT, "Fail to find reader=%d", request.getReaderId()); } - LOG.debug("GetFile from {} path={} filename={} offset={} count={}", done.getBizContext().getRemoteAddress(), - reader.getPath(), request.getFilename(), request.getOffset(), request.getCount()); + if (LOG.isDebugEnabled()) { + LOG.debug("GetFile from {} path={} filename={} offset={} count={}", + done.getBizContext().getRemoteAddress(), reader.getPath(), request.getFilename(), request.getOffset(), + request.getCount()); + } final ByteBufferCollector dataBuffer = ByteBufferCollector.allocate(); final GetFileResponse.Builder responseBuilder = GetFileResponse.newBuilder(); @@ -101,14 +102,12 @@ public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) { final int read = reader .readFile(dataBuffer, request.getFilename(), request.getOffset(), request.getCount()); responseBuilder.setReadSize(read); - if (read == -1) { - responseBuilder.setEof(true); - } + responseBuilder.setEof(read == FileReader.EOF); final ByteBuffer buf = dataBuffer.getBuffer(); buf.flip(); if (!buf.hasRemaining()) { // skip empty data - return responseBuilder.setData(ByteString.EMPTY).build(); + responseBuilder.setData(ByteString.EMPTY); } else { // TODO check hole responseBuilder.setData(ZeroByteStringHelper.wrap(buf)); @@ -128,9 +127,9 @@ public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) { /** * Adds a file reader and return it's generated readerId. */ - public long addReader(FileReader reader) { + public long addReader(final FileReader reader) { final long readerId = this.nextId.getAndIncrement(); - if (fileReaderMap.putIfAbsent(readerId, reader) == null) { + if (this.fileReaderMap.putIfAbsent(readerId, reader) == null) { return readerId; } else { return -1L; @@ -140,7 +139,7 @@ public long addReader(FileReader reader) { /** * Remove the reader by readerId. */ - public boolean removeReader(long readerId) { + public boolean removeReader(final long readerId) { return this.fileReaderMap.remove(readerId) != null; } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/FileReader.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/FileReader.java index d63592e66..f67ee963c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/FileReader.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/FileReader.java @@ -30,6 +30,8 @@ */ public interface FileReader { + int EOF = -1; + /** * Get the file path. * @@ -39,8 +41,17 @@ public interface FileReader { /** * Read file into buf starts from offset at most maxCount. - * Returns -1 if reaches end, else return read count. + * + * @param buf read bytes into this buf + * @param fileName file name + * @param offset the offset of file + * @param maxCount max read bytes + * @return -1 if reaches end, else return read count. + * @throws IOException if some I/O error occurs + * @throws RetryAgainException if it's not allowed to read partly + * or it's allowed but throughput is throttled to 0, try again. */ - int readFile(ByteBufferCollector buf, String fileName, long offset, long maxCount) throws IOException, - RetryAgainException; + int readFile(final ByteBufferCollector buf, final String fileName, final long offset, final long maxCount) + throws IOException, + RetryAgainException; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/LocalDirReader.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/LocalDirReader.java index e0e84aa74..affb43590 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/LocalDirReader.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/io/LocalDirReader.java @@ -29,12 +29,14 @@ import com.google.protobuf.Message; /** - * read a file data form local dir by fileName. + * Read a file data form local dir by fileName. + * * @author boyan (boyan@alibaba-inc.com) * * 2018-Apr-06 9:25:12 PM */ public class LocalDirReader implements FileReader { + private static final Logger LOG = LoggerFactory.getLogger(LocalDirReader.class); private final String path; @@ -46,32 +48,33 @@ public LocalDirReader(String path) { @Override public String getPath() { - return this.path; + return path; } @Override - public int readFile(ByteBufferCollector buf, String fileName, long offset, long maxCount) throws IOException, - RetryAgainException { - return this.readFileWithMeta(buf, fileName, null, offset, maxCount); + public int readFile(final ByteBufferCollector buf, final String fileName, final long offset, final long maxCount) + throws IOException, + RetryAgainException { + return readFileWithMeta(buf, fileName, null, offset, maxCount); } @SuppressWarnings("unused") - protected int readFileWithMeta(ByteBufferCollector buf, String fileName, Message fileMeta, long offset, - long maxCount) throws IOException, RetryAgainException { + protected int readFileWithMeta(final ByteBufferCollector buf, final String fileName, final Message fileMeta, + long offset, final long maxCount) throws IOException, RetryAgainException { buf.expandIfNecessary(); final String filePath = this.path + File.separator + fileName; final File file = new File(filePath); - try (FileInputStream input = new FileInputStream(file); FileChannel fc = input.getChannel()) { + try (final FileInputStream input = new FileInputStream(file); final FileChannel fc = input.getChannel()) { int totalRead = 0; while (true) { final int nread = fc.read(buf.getBuffer(), offset); if (nread <= 0) { - return -1; + return EOF; } totalRead += nread; if (totalRead < maxCount) { if (buf.hasRemaining()) { - return -1; + return EOF; } else { buf.expandAtMost((int) (maxCount - totalRead)); offset += nread; @@ -79,11 +82,11 @@ protected int readFileWithMeta(ByteBufferCollector buf, String fileName, Message } else { final long fsize = file.length(); if (fsize < 0) { - LOG.warn("Invlaid file length {}", filePath); - return -1; + LOG.warn("Invalid file length {}", filePath); + return EOF; } - if (fsize == offset + maxCount) { - return -1; + if (fsize == offset + nread) { + return EOF; } else { return totalRead; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/SnapshotFileReader.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/SnapshotFileReader.java index 80b1df2ba..fcfcfd183 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/SnapshotFileReader.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/local/SnapshotFileReader.java @@ -54,21 +54,20 @@ public SnapshotFileReader(String path, SnapshotThrottle snapshotThrottle) { } public boolean open() { - final File file = new File(this.getPath()); + final File file = new File(getPath()); return file.exists(); } @Override - public int readFile(ByteBufferCollector metaBufferCollector, String fileName, long offset, long maxCount) - throws IOException, - RetryAgainException { + public int readFile(final ByteBufferCollector metaBufferCollector, final String fileName, final long offset, + final long maxCount) throws IOException, RetryAgainException { // read the whole meta file. if (fileName.equals(Snapshot.JRAFT_SNAPSHOT_META_FILE)) { final ByteBuffer metaBuf = this.metaTable.saveToByteBufferAsRemote(); - //because bufRef will flip the buffer before using, so we must set the meta buffer position to it's limit. + // because bufRef will flip the buffer before using, so we must set the meta buffer position to it's limit. metaBuf.position(metaBuf.limit()); metaBufferCollector.setBuffer(metaBuf); - return -1; + return EOF; } final LocalFileMeta fileMeta = this.metaTable.getFileMeta(fileName); if (fileMeta == null) { @@ -78,7 +77,7 @@ public int readFile(ByteBufferCollector metaBufferCollector, String fileName, lo // go through throttle long newMaxCount = maxCount; if (this.snapshotThrottle != null) { - newMaxCount = snapshotThrottle.throttledByThroughput(maxCount); + newMaxCount = this.snapshotThrottle.throttledByThroughput(maxCount); if (newMaxCount < maxCount) { // if it's not allowed to read partly or it's allowed but // throughput is throttled to 0, try again. @@ -88,7 +87,6 @@ public int readFile(ByteBufferCollector metaBufferCollector, String fileName, lo } } - return this.readFileWithMeta(metaBufferCollector, fileName, fileMeta, offset, newMaxCount); + return readFileWithMeta(metaBufferCollector, fileName, fileMeta, offset, newMaxCount); } - } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/BoltSession.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/BoltSession.java index 6940386ca..f3e957147 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/BoltSession.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/BoltSession.java @@ -37,6 +37,7 @@ import com.alipay.sofa.jraft.option.CopyOptions; import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.rpc.RaftClientService; +import com.alipay.sofa.jraft.rpc.RpcRequests; import com.alipay.sofa.jraft.rpc.RpcRequests.GetFileRequest; import com.alipay.sofa.jraft.rpc.RpcRequests.GetFileResponse; import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter; @@ -57,32 +58,31 @@ @ThreadSafe public class BoltSession implements Session { - private static final Logger LOG = LoggerFactory.getLogger(BoltSession.class); + private static final Logger LOG = LoggerFactory.getLogger(BoltSession.class); - private final Lock lock; - private final Status st; + private final Lock lock = new ReentrantLock(); + private final Status st = Status.OK(); + private final CountDownLatch finishLatch = new CountDownLatch(1); + private final GetFileResponseClosure done = new GetFileResponseClosure(); private final RaftClientService rpcService; - private int retryTimes; + private final GetFileRequest.Builder requestBuilder; + private final Endpoint endpoint; + private final TimerManager timerManager; + private final SnapshotThrottle snapshotThrottle; + private final RaftOptions raftOptions; + private int retryTimes = 0; private boolean finished; private ByteBufferCollector destBuf; - private CopyOptions copyOptions; - private final GetFileRequest.Builder requestBuilder; - private final CountDownLatch finishLatch; + private CopyOptions copyOptions = new CopyOptions(); private OutputStream outputStream; - private final Endpoint endpoint; - private final GetFileResponseClosure done = new GetFileResponseClosure(); private ScheduledFuture timer; private String destPath; - private final RaftOptions raftOptions; private Future rpcCall; - private final TimerManager timerManager; - private final SnapshotThrottle snapshotThrottle; /** * Get file response closure to answer client. - * @author boyan (boyan@alibaba-inc.com) * - * 2018-Apr-13 4:50:21 PM + * @author boyan (boyan@alibaba-inc.com) */ private class GetFileResponseClosure extends RpcResponseClosureAdapter { @@ -131,12 +131,7 @@ public BoltSession(RaftClientService rpcService, TimerManager timerManager, Snap this.timerManager = timerManager; this.rpcService = rpcService; this.requestBuilder = rb; - this.retryTimes = 0; - this.copyOptions = new CopyOptions(); - this.lock = new ReentrantLock(); this.endpoint = ep; - this.st = Status.OK(); - this.finishLatch = new CountDownLatch(1); } public void setDestBuf(ByteBufferCollector bufRef) { @@ -168,7 +163,7 @@ public void cancel() { this.st.setError(RaftError.ECANCELED, RaftError.ECANCELED.name()); } - this.onFinished(); + onFinished(); } finally { this.lock.unlock(); } @@ -186,6 +181,11 @@ public Status status() { private void onFinished() { if (!this.finished) { + if (!this.st.isOk()) { + LOG.error("Fail to copy data, readerId={} fileName={} offset={} status={}", + this.requestBuilder.getReaderId(), this.requestBuilder.getFilename(), + this.requestBuilder.getOffset(), this.st); + } if (this.outputStream != null) { Utils.closeQuietly(this.outputStream); this.outputStream = null; @@ -206,7 +206,7 @@ private void onTimer() { Utils.runInThread(this::sendNextRpc); } - void onRpcReturned(Status status, GetFileResponse response) { + void onRpcReturned(final Status status, final GetFileResponse response) { this.lock.lock(); try { if (this.finished) { @@ -218,7 +218,7 @@ void onRpcReturned(Status status, GetFileResponse response) { if (status.getCode() == RaftError.ECANCELED.getNumber()) { if (this.st.isOk()) { this.st.setError(status.getCode(), status.getErrorMsg()); - this.onFinished(); + onFinished(); return; } } @@ -228,7 +228,7 @@ void onRpcReturned(Status status, GetFileResponse response) { && ++this.retryTimes >= this.copyOptions.getMaxRetry()) { if (this.st.isOk()) { this.st.setError(status.getCode(), status.getErrorMsg()); - this.onFinished(); + onFinished(); return; } } @@ -239,7 +239,7 @@ void onRpcReturned(Status status, GetFileResponse response) { this.retryTimes = 0; Requires.requireNonNull(response, "response"); // Reset count to |real_read_size| to make next rpc get the right offset - if (response.hasReadSize() && response.getReadSize() != 0) { + if (!response.getEof()) { this.requestBuilder.setCount(response.getReadSize()); } if (this.outputStream != null) { @@ -248,7 +248,7 @@ void onRpcReturned(Status status, GetFileResponse response) { } catch (final IOException e) { LOG.error("Fail to write into file {}", this.destPath); this.st.setError(RaftError.EIO, RaftError.EIO.name()); - this.onFinished(); + onFinished(); return; } } else { @@ -282,7 +282,7 @@ void sendNextRpc() { // throttle long newMaxCount = maxCount; if (this.snapshotThrottle != null) { - newMaxCount = snapshotThrottle.throttledByThroughput(maxCount); + newMaxCount = this.snapshotThrottle.throttledByThroughput(maxCount); if (newMaxCount == 0) { // Reset count to make next rpc retry the previous one this.requestBuilder.setCount(0); @@ -292,9 +292,9 @@ void sendNextRpc() { } } this.requestBuilder.setCount(newMaxCount); - LOG.debug("Send get file request {} to peer {}", this.requestBuilder.build(), this.endpoint); - this.rpcCall = this.rpcService.getFile(endpoint, this.requestBuilder.build(), - this.copyOptions.getTimeoutMs(), done); + final RpcRequests.GetFileRequest request = this.requestBuilder.build(); + LOG.debug("Send get file request {} to peer {}", request, this.endpoint); + this.rpcCall = this.rpcService.getFile(this.endpoint, request, this.copyOptions.getTimeoutMs(), this.done); } finally { this.lock.unlock(); } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopier.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopier.java index 737539bb0..876074a47 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopier.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/snapshot/remote/RemoteFileCopier.java @@ -100,9 +100,9 @@ public boolean init(String uri, SnapshotThrottle snapshotThrottle, SnapshotCopie /** * Copy `source` from remote to local dest. * - * @param source source from remote - * @param destPath local path - * @param opts options of copy + * @param source source from remote + * @param destPath local path + * @param opts options of copy * @return true if copy success */ public boolean copyToFile(String source, String destPath, CopyOptions opts) throws IOException, @@ -152,9 +152,9 @@ private BoltSession newBoltSession(String source) { /** * Copy `source` from remote to buffer. - * @param source source from remote - * @param destBuf buffer of dest - * @param opt options of copy + * @param source source from remote + * @param destBuf buffer of dest + * @param opt options of copy * @return true if copy success */ public boolean copy2IoBuffer(String source, ByteBufferCollector destBuf, CopyOptions opt) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ByteBufferCollector.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ByteBufferCollector.java index d53407d21..7264c5044 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ByteBufferCollector.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/ByteBufferCollector.java @@ -24,37 +24,38 @@ * @author dennis */ public final class ByteBufferCollector { + private ByteBuffer buffer; public int capacity() { - return buffer != null ? buffer.capacity() : 0; + return this.buffer != null ? this.buffer.capacity() : 0; } public void expandIfNecessary() { - if (!this.hasRemaining()) { + if (!hasRemaining()) { getBuffer(Utils.RAFT_DATA_BUF_SIZE); } } - public void expandAtMost(int atMostBytes) { + public void expandAtMost(final int atMostBytes) { if (this.buffer == null) { this.buffer = Utils.allocate(atMostBytes); } else { - buffer = Utils.expandByteBufferAtMost(buffer, atMostBytes); + this.buffer = Utils.expandByteBufferAtMost(this.buffer, atMostBytes); } } public boolean hasRemaining() { - return buffer != null && buffer.hasRemaining(); + return this.buffer != null && this.buffer.hasRemaining(); } - private ByteBufferCollector(int size) { + private ByteBufferCollector(final int size) { if (size > 0) { this.buffer = Utils.allocate(size); } } - public static ByteBufferCollector allocate(int size) { + public static ByteBufferCollector allocate(final int size) { return new ByteBufferCollector(size); } @@ -62,24 +63,24 @@ public static ByteBufferCollector allocate() { return new ByteBufferCollector(Utils.RAFT_DATA_BUF_SIZE); } - private ByteBuffer getBuffer(int expectSize) { - if (buffer == null) { - buffer = Utils.allocate(expectSize); - } else if (buffer.remaining() < expectSize) { - buffer = Utils.expandByteBufferAtLeast(buffer, expectSize); + private ByteBuffer getBuffer(final int expectSize) { + if (this.buffer == null) { + this.buffer = Utils.allocate(expectSize); + } else if (this.buffer.remaining() < expectSize) { + this.buffer = Utils.expandByteBufferAtLeast(this.buffer, expectSize); } - return buffer; + return this.buffer; } - public void put(ByteBuffer buf) { + public void put(final ByteBuffer buf) { getBuffer(buf.remaining()).put(buf); } - public void put(byte[] bs) { + public void put(final byte[] bs) { getBuffer(bs.length).put(bs); } - public void setBuffer(ByteBuffer buffer) { + public void setBuffer(final ByteBuffer buffer) { this.buffer = buffer; } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java index 0c7985822..40b290e00 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/util/Utils.java @@ -172,7 +172,7 @@ public static long getProcessId(final long fallback) { } /** - * Default init and expand buffer size, it can be set by -Draft.byte_buf.size=n, default 1024. + * Default init and expand buffer size, it can be set by -Djraft.byte_buf.size=n, default 1024. */ public static final int RAFT_DATA_BUF_SIZE = Integer.parseInt(System.getProperty("jraft.byte_buf.size", "1024")); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index 00b85f907..5a75956f5 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -19,7 +19,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -55,6 +55,8 @@ import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.option.BootstrapOptions; import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.storage.SnapshotThrottle; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.ThroughputSnapshotThrottle; import com.alipay.sofa.jraft.test.TestUtils; @@ -124,7 +126,7 @@ public void testSingleNode() throws Exception { nodeOptions.setLogUri(this.dataPath + File.separator + "log"); nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta"); nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot"); - nodeOptions.setInitialConf(new Configuration(Arrays.asList(peer))); + nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer))); final Node node = new NodeImpl("unittest", peer); assertTrue(node.init(nodeOptions)); @@ -155,14 +157,14 @@ public void testNoLeader() throws Exception { final Node follower = followers.get(0); sendTestTaskAndWait(follower, 0, RaftError.EPERM); - //adds a peer3 + // adds a peer3 final PeerId peer3 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 3); CountDownLatch latch = new CountDownLatch(1); follower.addPeer(peer3, new ExpectClosure(RaftError.EPERM, latch)); waitLatch(latch); - //remove the peer0 - final PeerId peer0 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT + 0); + // remove the peer0 + final PeerId peer0 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT); latch = new CountDownLatch(1); follower.removePeer(peer0, new ExpectClosure(RaftError.EPERM, latch)); waitLatch(latch); @@ -188,6 +190,7 @@ private void sendTestTaskAndWait(Node node, int start, RaftError err) throws Int waitLatch(latch); } + @SuppressWarnings("SameParameterValue") private void sendTestTaskAndWait(String prefix, Node node, int code) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { @@ -207,14 +210,14 @@ public void testTripleNodes() throws Exception { assertTrue(cluster.start(peer.getEndpoint())); } - //elect leader + // elect leader cluster.waitLeader(); - //get leader + // get leader final Node leader = cluster.getLeader(); assertNotNull(leader); assertEquals(3, leader.listPeers().size()); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); { @@ -224,7 +227,7 @@ public void testTripleNodes() throws Exception { } { - //task with TaskClosure + // task with TaskClosure final ByteBuffer data = ByteBuffer.wrap("task closure".getBytes()); final Vector cbs = new Vector<>(); final CountDownLatch latch = new CountDownLatch(1); @@ -263,25 +266,25 @@ public void testReadIndex() throws Exception { assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); } - //elect leader + // elect leader cluster.waitLeader(); - //get leader + // get leader final Node leader = cluster.getLeader(); assertNotNull(leader); assertEquals(3, leader.listPeers().size()); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); assertReadIndex(leader, 11); - //read from follower + // read from follower for (final Node follower : cluster.getFollowers()) { assertNotNull(follower); assertReadIndex(follower, 11); } - //read with null request context + // read with null request context final CountDownLatch latch = new CountDownLatch(1); leader.readIndex(null, new ReadIndexClosure() { @@ -306,10 +309,10 @@ public void testReadIndexChaos() throws Exception { assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); } - //elect leader + // elect leader cluster.waitLeader(); - //get leader + // get leader final Node leader = cluster.getLeader(); assertNotNull(leader); assertEquals(3, leader.listPeers().size()); @@ -364,6 +367,7 @@ public void run(Status status, long index, byte[] reqCtx) { cluster.stopAll(); } + @SuppressWarnings({ "unused", "SameParameterValue" }) private void assertReadIndex(final Node node, int index) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final byte[] requestContext = TestUtils.getRandomBytes(); @@ -389,14 +393,14 @@ public void testNodeMetrics() throws Exception { assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); } - //elect leader + // elect leader cluster.waitLeader(); - //get leader + // get leader final Node leader = cluster.getLeader(); assertNotNull(leader); assertEquals(3, leader.listPeers().size()); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); { @@ -414,7 +418,7 @@ public void testNodeMetrics() throws Exception { reporter.close(); System.out.println(); } - //TODO check http status + // TODO check http status assertEquals(2, cluster.getFollowers().size()); cluster.stopAll(); // System.out.println(node.getNodeMetrics().getMetrics()); @@ -429,17 +433,17 @@ public void testLeaderFail() throws Exception { assertTrue(cluster.start(peer.getEndpoint())); } - //elect leader + // elect leader cluster.waitLeader(); - //get leader + // get leader Node leader = cluster.getLeader(); assertNotNull(leader); LOG.info("Current leader is {}", leader.getLeaderId()); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); - //stop leader + // stop leader LOG.warn("Stop leader {}", leader.getNodeId().getPeerId()); final PeerId oldLeader = leader.getNodeId().getPeerId(); assertTrue(cluster.stop(leader.getNodeId().getPeerId().getEndpoint())); @@ -449,11 +453,11 @@ public void testLeaderFail() throws Exception { assertFalse(followers.isEmpty()); this.sendTestTaskAndWait("follower apply ", followers.get(0), -1); - //elect new leader + // elect new leader cluster.waitLeader(); leader = cluster.getLeader(); LOG.info("Eelect new leader is {}", leader.getLeaderId()); - //apply tasks to new leader + // apply tasks to new leader CountDownLatch latch = new CountDownLatch(10); for (int i = 10; i < 20; i++) { final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes()); @@ -462,10 +466,10 @@ public void testLeaderFail() throws Exception { } waitLatch(latch); - //restart old leader + // restart old leader LOG.info("restart old leader {}", oldLeader); assertTrue(cluster.start(oldLeader.getEndpoint())); - //apply something + // apply something latch = new CountDownLatch(10); for (int i = 20; i < 30; i++) { final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes()); @@ -474,11 +478,11 @@ public void testLeaderFail() throws Exception { } waitLatch(latch); - //stop and clean old leader + // stop and clean old leader cluster.stop(oldLeader.getEndpoint()); cluster.clean(oldLeader.getEndpoint()); - //restart old leader + // restart old leader LOG.info("restart old leader {}", oldLeader); assertTrue(cluster.start(oldLeader.getEndpoint())); assertTrue(cluster.ensureSame(-1)); @@ -498,7 +502,7 @@ public void testJoinNodes() throws Exception { final ArrayList peers = new ArrayList<>(); peers.add(peer0); - //start single cluster + // start single cluster final TestCluster cluster = new TestCluster("unittest", dataPath, peers); assertTrue(cluster.start(peer0.getEndpoint())); @@ -509,9 +513,9 @@ public void testJoinNodes() throws Exception { Assert.assertEquals(leader.getNodeId().getPeerId(), peer0); this.sendTestTaskAndWait(leader); - //start peer1 + // start peer1 assertTrue(cluster.start(peer1.getEndpoint(), true, 300)); - //add peer1 + // add peer1 CountDownLatch latch = new CountDownLatch(1); peers.add(peer1); leader.addPeer(peer1, new ExpectClosure(latch)); @@ -523,26 +527,26 @@ public void testJoinNodes() throws Exception { assertEquals(10, fsm.getLogs().size()); } - //add peer2 but not start + // add peer2 but not start peers.add(peer2); latch = new CountDownLatch(1); leader.addPeer(peer2, new ExpectClosure(RaftError.ECATCHUP, latch)); waitLatch(latch); - //start peer2 after 2 seconds + // start peer2 after 2 seconds Thread.sleep(2000); assertTrue(cluster.start(peer2.getEndpoint(), true, 300)); Thread.sleep(10000); - //re-add peer2 + // re-add peer2 latch = new CountDownLatch(2); leader.addPeer(peer2, new ExpectClosure(latch)); // concurrent configuration change leader.addPeer(peer3, new ExpectClosure(RaftError.EBUSY, latch)); waitLatch(latch); - //re-add peer2 directly + // re-add peer2 directly try { leader.addPeer(peer2, new ExpectClosure(latch)); @@ -573,13 +577,13 @@ public void testRemoveFollower() throws Exception { assertTrue(cluster.start(peer.getEndpoint())); } - //elect leader + // elect leader cluster.waitLeader(); - //get leader + // get leader final Node leader = cluster.getLeader(); assertNotNull(leader); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); cluster.ensureSame(); @@ -590,12 +594,12 @@ public void testRemoveFollower() throws Exception { final PeerId followerPeer = followers.get(0).getNodeId().getPeerId(); final Endpoint followerAddr = followerPeer.getEndpoint(); - //stop and clean follower + // stop and clean follower LOG.info("Stop and clean follower {}", followerPeer); assertTrue(cluster.stop(followerAddr)); cluster.clean(followerAddr); - //remove follower + // remove follower LOG.info("Remove follower {}", followerPeer); CountDownLatch latch = new CountDownLatch(1); leader.removePeer(followerPeer, new ExpectClosure(latch)); @@ -608,10 +612,10 @@ public void testRemoveFollower() throws Exception { peers = TestUtils.generatePeers(3); assertTrue(peers.remove(followerPeer)); - //start follower + // start follower LOG.info("Start and add follower {}", followerPeer); assertTrue(cluster.start(followerAddr)); - //re-add follower + // re-add follower latch = new CountDownLatch(1); leader.addPeer(followerPeer, new ExpectClosure(latch)); waitLatch(latch); @@ -636,13 +640,13 @@ public void testRemoveLeader() throws Exception { assertTrue(cluster.start(peer.getEndpoint())); } - //elect leader + // elect leader cluster.waitLeader(); - //get leader + // get leader Node leader = cluster.getLeader(); assertNotNull(leader); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); cluster.ensureSame(); @@ -653,26 +657,26 @@ public void testRemoveLeader() throws Exception { final PeerId oldLeader = leader.getNodeId().getPeerId().copy(); final Endpoint oldLeaderAddr = oldLeader.getEndpoint(); - //remove old leader + // remove old leader LOG.info("Remove old leader {}", oldLeader); CountDownLatch latch = new CountDownLatch(1); leader.removePeer(oldLeader, new ExpectClosure(latch)); waitLatch(latch); - //elect new leader + // elect new leader cluster.waitLeader(); leader = cluster.getLeader(); LOG.info("New leader is {}", leader); assertNotNull(leader); - //apply tasks to new leader + // apply tasks to new leader this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); - //stop and clean old leader + // stop and clean old leader LOG.info("Stop and clean old leader {}", oldLeader); assertTrue(cluster.stop(oldLeaderAddr)); cluster.clean(oldLeaderAddr); - //Add and start old leader + // Add and start old leader LOG.info("Start and add old leader {}", oldLeader); assertTrue(cluster.start(oldLeaderAddr)); @@ -703,11 +707,11 @@ public void testPreVote() throws Exception { } cluster.waitLeader(); - //get leader + // get leader Node leader = cluster.getLeader(); final long savedTerm = ((NodeImpl) leader).getCurrentTerm(); assertNotNull(leader); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); cluster.ensureSame(); @@ -718,7 +722,7 @@ public void testPreVote() throws Exception { final PeerId followerPeer = followers.get(0).getNodeId().getPeerId(); final Endpoint followerAddr = followerPeer.getEndpoint(); - //remove follower + // remove follower LOG.info("Remove follower {}", followerPeer); CountDownLatch latch = new CountDownLatch(1); leader.removePeer(followerPeer, new ExpectClosure(latch)); @@ -728,7 +732,7 @@ public void testPreVote() throws Exception { Thread.sleep(2000); - //add follower + // add follower LOG.info("Add follower {}", followerAddr); peers = TestUtils.generatePeers(3); assertTrue(peers.remove(followerPeer)); @@ -737,7 +741,7 @@ public void testPreVote() throws Exception { waitLatch(latch); leader = cluster.getLeader(); assertNotNull(leader); - //leader term should not be changed. + // leader term should not be changed. assertEquals(savedTerm, ((NodeImpl) leader).getCurrentTerm()); cluster.stopAll(); } @@ -753,7 +757,7 @@ public void testSetPeer1() throws Exception { final List peers = new ArrayList<>(); peers.add(bootPeer); - //reset peers from empty + // reset peers from empty assertTrue(nodes.get(0).resetPeers(new Configuration(peers)).isOk()); cluster.waitLeader(); assertNotNull(cluster.getLeader()); @@ -772,10 +776,10 @@ public void testSetPeer2() throws Exception { } cluster.waitLeader(); - //get leader + // get leader Node leader = cluster.getLeader(); assertNotNull(leader); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); cluster.ensureSame(); @@ -792,9 +796,9 @@ public void testSetPeer2() throws Exception { assertTrue(cluster.stop(followerAddr1)); cluster.clean(followerAddr1); - //apply tasks to leader again + // apply tasks to leader again this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); - //set peer when no quorum die + // set peer when no quorum die final Endpoint leaderAddr = leader.getLeaderId().getEndpoint().copy(); LOG.info("Set peers to {}", leaderAddr); final List newPeers = TestUtils.generatePeers(3); @@ -804,14 +808,14 @@ public void testSetPeer2() throws Exception { assertTrue(cluster.stop(followerAddr2)); cluster.clean(followerAddr2); - //leader will stepdown, become follower + // leader will step-down, become follower Thread.sleep(2000); newPeers.clear(); newPeers.add(new PeerId(leaderAddr, 0)); - //new peers equal to current conf + // new peers equal to current conf assertTrue(leader.resetPeers(new Configuration(peers)).isOk()); - //set peer when quorum die + // set peer when quorum die LOG.warn("Set peers to {}", leaderAddr); assertTrue(leader.resetPeers(new Configuration(newPeers)).isOk()); @@ -857,21 +861,21 @@ public void testRestoreSnasphot() throws Exception { } cluster.waitLeader(); - //get leader + // get leader final Node leader = cluster.getLeader(); assertNotNull(leader); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); cluster.ensureSame(); triggerLeaderSnapshot(cluster, leader); - //stop leader + // stop leader final Endpoint leaderAddr = leader.getNodeId().getPeerId().getEndpoint().copy(); assertTrue(cluster.stop(leaderAddr)); Thread.sleep(2000); - //restart leader + // restart leader assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes()); assertTrue(cluster.start(leaderAddr)); cluster.ensureSame(); @@ -885,7 +889,7 @@ private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws Inte } private void triggerLeaderSnapshot(TestCluster cluster, Node leader, int times) throws InterruptedException { - //trigger leader snapshot + // trigger leader snapshot assertEquals(times - 1, cluster.getLeaderFsm().getSaveSnapshotTimes()); final CountDownLatch latch = new CountDownLatch(1); leader.snapshot(new ExpectClosure(latch)); @@ -904,37 +908,37 @@ public void testInstallSnapshotWithThrottle() throws Exception { } cluster.waitLeader(); - //get leader + // get leader final Node leader = cluster.getLeader(); assertNotNull(leader); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); cluster.ensureSame(); - //stop follower1 + // stop follower1 final List followers = cluster.getFollowers(); assertEquals(2, followers.size()); final Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint(); assertTrue(cluster.stop(followerAddr)); - //apply something more + // apply something more this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); Thread.sleep(1000); - //trigger leader snapshot + // trigger leader snapshot triggerLeaderSnapshot(cluster, leader); - //apply something more + // apply something more this.sendTestTaskAndWait(leader, 20, RaftError.SUCCESS); - //trigger leader snapshot + // trigger leader snapshot triggerLeaderSnapshot(cluster, leader, 2); - //wait leader to compact logs + // wait leader to compact logs Thread.sleep(1000); - //restart follower. + // restart follower. cluster.clean(followerAddr); assertTrue(cluster.start(followerAddr, true, 300, false, new ThroughputSnapshotThrottle(1024, 1))); @@ -949,6 +953,129 @@ public void testInstallSnapshotWithThrottle() throws Exception { cluster.stopAll(); } + @Test + public void testInstallLargeSnapshotWithThrottle() throws Exception { + final List peers = TestUtils.generatePeers(4); + final TestCluster cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3)); + for (int i = 0; i < peers.size() - 1; i++) { + final PeerId peer = peers.get(i); + final boolean started = cluster.start(peer.getEndpoint(), false, 200, false); + assertTrue(started); + } + cluster.waitLeader(); + // get leader + final Node leader = cluster.getLeader(); + assertNotNull(leader); + // apply tasks to leader + sendTestTaskAndWait(leader, 0, RaftError.SUCCESS); + + cluster.ensureSame(); + + // apply something more + for (int i = 1; i < 100; i++) { + sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS); + } + + Thread.sleep(1000); + + // trigger leader snapshot + triggerLeaderSnapshot(cluster, leader); + + // apply something more + for (int i = 100; i < 200; i++) { + sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS); + } + // trigger leader snapshot + triggerLeaderSnapshot(cluster, leader, 2); + + // wait leader to compact logs + Thread.sleep(1000); + + // add follower + final PeerId newPeer = peers.get(3); + final SnapshotThrottle snapshotThrottle = new ThroughputSnapshotThrottle(128, 1); + final boolean started = cluster.start(newPeer.getEndpoint(), true, 300, false, snapshotThrottle); + assertTrue(started); + + final CountDownLatch latch = new CountDownLatch(1); + leader.addPeer(newPeer, status -> { + assertTrue(status.toString(), status.isOk()); + latch.countDown(); + }); + waitLatch(latch); + + cluster.ensureSame(); + + assertEquals(4, cluster.getFsms().size()); + for (final MockStateMachine fsm : cluster.getFsms()) { + assertEquals(2000, fsm.getLogs().size()); + } + + cluster.stopAll(); + } + + @Test + public void testInstallLargeSnapshot() throws Exception { + final List peers = TestUtils.generatePeers(4); + final TestCluster cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3)); + for (int i = 0; i < peers.size() - 1; i++) { + final PeerId peer = peers.get(i); + final boolean started = cluster.start(peer.getEndpoint(), false, 200, false); + assertTrue(started); + } + cluster.waitLeader(); + // get leader + final Node leader = cluster.getLeader(); + assertNotNull(leader); + // apply tasks to leader + sendTestTaskAndWait(leader, 0, RaftError.SUCCESS); + + cluster.ensureSame(); + + // apply something more + for (int i = 1; i < 100; i++) { + sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS); + } + + Thread.sleep(1000); + + // trigger leader snapshot + triggerLeaderSnapshot(cluster, leader); + + // apply something more + for (int i = 100; i < 200; i++) { + sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS); + } + // trigger leader snapshot + triggerLeaderSnapshot(cluster, leader, 2); + + // wait leader to compact logs + Thread.sleep(1000); + + // add follower + final PeerId newPeer = peers.get(3); + final RaftOptions raftOptions = new RaftOptions(); + raftOptions.setMaxByteCountPerRpc(128); + final boolean started = cluster.start(newPeer.getEndpoint(), true, 300, false, null, raftOptions); + assertTrue(started); + + final CountDownLatch latch = new CountDownLatch(1); + leader.addPeer(newPeer, status -> { + assertTrue(status.toString(), status.isOk()); + latch.countDown(); + }); + waitLatch(latch); + + cluster.ensureSame(); + + assertEquals(4, cluster.getFsms().size()); + for (final MockStateMachine fsm : cluster.getFsms()) { + assertEquals(2000, fsm.getLogs().size()); + } + + cluster.stopAll(); + } + @Test public void testInstallSnapshot() throws Exception { final List peers = TestUtils.generatePeers(3); @@ -960,31 +1087,31 @@ public void testInstallSnapshot() throws Exception { } cluster.waitLeader(); - //get leader + // get leader final Node leader = cluster.getLeader(); assertNotNull(leader); - //apply tasks to leader + // apply tasks to leader this.sendTestTaskAndWait(leader); cluster.ensureSame(); - //stop follower1 + // stop follower1 final List followers = cluster.getFollowers(); assertEquals(2, followers.size()); final Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint(); assertTrue(cluster.stop(followerAddr)); - //apply something more + // apply something more this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); - //trigger leader snapshot + // trigger leader snapshot triggerLeaderSnapshot(cluster, leader); - //apply something more + // apply something more this.sendTestTaskAndWait(leader, 20, RaftError.SUCCESS); triggerLeaderSnapshot(cluster, leader, 2); - //wait leader to compact logs + // wait leader to compact logs Thread.sleep(50); //restart follower. @@ -1011,11 +1138,11 @@ public void testNoSnapshot() throws Exception { nodeOptions.setFsm(fsm); nodeOptions.setLogUri(this.dataPath + File.separator + "log"); nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta"); - nodeOptions.setInitialConf(new Configuration(Arrays.asList(new PeerId(addr, 0)))); + nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0)))); final Node node = new NodeImpl("unittest", new PeerId(addr, 0)); assertTrue(node.init(nodeOptions)); - //wait node elect self as leader + // wait node elect self as leader Thread.sleep(2000); @@ -1045,18 +1172,18 @@ public void testAutoSnapshot() throws Exception { nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot"); nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta"); nodeOptions.setSnapshotIntervalSecs(10); - nodeOptions.setInitialConf(new Configuration(Arrays.asList(new PeerId(addr, 0)))); + nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0)))); final Node node = new NodeImpl("unittest", new PeerId(addr, 0)); assertTrue(node.init(nodeOptions)); - //wait node elect self as leader + // wait node elect self as leader Thread.sleep(2000); this.sendTestTaskAndWait(node); assertEquals(-1, fsm.getSnapshotIndex()); assertEquals(0, fsm.getSaveSnapshotTimes()); - //wait for auto snapshot + // wait for auto snapshot Thread.sleep(10000); assertEquals(1, fsm.getSaveSnapshotTimes()); assertTrue(fsm.getSnapshotIndex() > 0); @@ -1078,7 +1205,7 @@ public void testLeaderShouldNotChange() throws Exception { } cluster.waitLeader(); - //get leader + // get leader final Node leader0 = cluster.getLeader(); assertNotNull(leader0); final long savedTerm = ((NodeImpl) leader0).getCurrentTerm(); @@ -1124,7 +1251,7 @@ public void testRecoverFollower() throws Exception { } // wait leader to compact logs Thread.sleep(5000); - //restart follower + // restart follower assertTrue(cluster.start(followerAddr)); assertTrue(cluster.ensureSame(30)); assertEquals(3, cluster.getFsms().size()); @@ -1242,10 +1369,10 @@ public void testLeaderTransferResumeOnFailure() throws Exception { leader = cluster.getLeader(); assertSame(leader, savedLeader); - //restart target peer + // restart target peer assertTrue(cluster.start(targetPeer.getEndpoint())); Thread.sleep(100); - //retry apply task + // retry apply task latch = new CountDownLatch(1); task = new Task(ByteBuffer.wrap("aaaaa".getBytes()), new ExpectClosure(latch)); leader.apply(task); @@ -1290,14 +1417,14 @@ public void testShutdownAndJoinWorkAfterInitFails() throws Exception { nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot"); nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta"); nodeOptions.setSnapshotIntervalSecs(10); - nodeOptions.setInitialConf(new Configuration(Arrays.asList(new PeerId(addr, 0)))); + nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0)))); final Node node = new NodeImpl("unittest", new PeerId(addr, 0)); assertTrue(node.init(nodeOptions)); Thread.sleep(1000); this.sendTestTaskAndWait(node); - //save snapshot + // save snapshot final CountDownLatch latch = new CountDownLatch(1); node.snapshot(new ExpectClosure(latch)); waitLatch(latch); @@ -1312,7 +1439,7 @@ public void testShutdownAndJoinWorkAfterInitFails() throws Exception { nodeOptions.setSnapshotUri(this.dataPath + File.separator + "snapshot"); nodeOptions.setRaftMetaUri(this.dataPath + File.separator + "meta"); nodeOptions.setSnapshotIntervalSecs(10); - nodeOptions.setInitialConf(new Configuration(Arrays.asList(new PeerId(addr, 0)))); + nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0)))); final Node node = new NodeImpl("unittest", new PeerId(addr, 0)); assertFalse(node.init(nodeOptions)); @@ -1410,7 +1537,7 @@ public void testTransferShouldWorkAfterInstallSnapshot() throws Exception { leader.snapshot(new ExpectClosure(latch)); waitLatch(latch); - //start the last peer which should be recover with snapshot. + // start the last peer which should be recover with snapshot. final PeerId lastPeer = peers.get(2); assertTrue(cluster.start(lastPeer.getEndpoint())); Thread.sleep(5000); @@ -1428,7 +1555,7 @@ public void testTransferShouldWorkAfterInstallSnapshot() throws Exception { @Test public void testAppendEntriesWhenFollowerIsInErrorState() throws Exception { - //start five nodes + // start five nodes final List peers = TestUtils.generatePeers(5); final TestCluster cluster = new TestCluster("unitest", dataPath, peers, 1000); @@ -1440,10 +1567,10 @@ public void testAppendEntriesWhenFollowerIsInErrorState() throws Exception { cluster.waitLeader(); final Node oldLeader = cluster.getLeader(); assertNotNull(oldLeader); - //apply something + // apply something this.sendTestTaskAndWait(oldLeader); - //set one follower into error state + // set one follower into error state final List followers = cluster.getFollowers(); assertEquals(4, followers.size()); final Node errorNode = followers.get(0); @@ -1460,14 +1587,14 @@ public void testAppendEntriesWhenFollowerIsInErrorState() throws Exception { final Node leader = cluster.getLeader(); assertNotNull(leader); LOG.info("Elect a new leader {}", leader); - //apply something again + // apply something again this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); - //stop error follower + // stop error follower Thread.sleep(20); LOG.info("Stop error follower {}", errorNode); assertTrue(cluster.stop(errorFollowerAddr)); - //restart error and old leader + // restart error and old leader LOG.info("Restart error follower {} and old leader {}", errorFollowerAddr, oldLeaderAddr); assertTrue(cluster.start(errorFollowerAddr)); @@ -1483,7 +1610,7 @@ public void testAppendEntriesWhenFollowerIsInErrorState() throws Exception { @Test public void testFollowerStartStopFollowing() throws Exception { - //start five nodes + // start five nodes final List peers = TestUtils.generatePeers(5); final TestCluster cluster = new TestCluster("unitest", dataPath, peers, 1000); @@ -1494,10 +1621,10 @@ public void testFollowerStartStopFollowing() throws Exception { cluster.waitLeader(); final Node firstLeader = cluster.getLeader(); assertNotNull(firstLeader); - //apply something + // apply something this.sendTestTaskAndWait(firstLeader); - //assert follow times + // assert follow times final List firstFollowers = cluster.getFollowers(); assertEquals(4, firstFollowers.size()); for (final Node node : firstFollowers) { @@ -1505,7 +1632,7 @@ public void testFollowerStartStopFollowing() throws Exception { assertEquals(0, ((MockStateMachine) node.getOptions().getFsm()).getOnStopFollowingTimes()); } - //stop leader and elect new one + // stop leader and elect new one final Endpoint fstLeaderAddr = firstLeader.getNodeId().getPeerId().getEndpoint(); assertTrue(cluster.stop(fstLeaderAddr)); cluster.waitLeader(); @@ -1513,7 +1640,7 @@ public void testFollowerStartStopFollowing() throws Exception { assertNotNull(secondLeader); this.sendTestTaskAndWait(secondLeader, 10, RaftError.SUCCESS); - //ensure start/stop following times + // ensure start/stop following times final List secondFollowers = cluster.getFollowers(); assertEquals(3, secondFollowers.size()); for (final Node node : secondFollowers) { @@ -1549,8 +1676,8 @@ public void testFollowerStartStopFollowing() throws Exception { } @Test - public void readCommitedUserLog() throws Exception { - //setup cluster + public void readCommittedUserLog() throws Exception { + // setup cluster final List peers = TestUtils.generatePeers(3); final TestCluster cluster = new TestCluster("unitest", dataPath, peers, 1000); @@ -1570,7 +1697,7 @@ public void readCommitedUserLog() throws Exception { assertEquals(2, userLog.getIndex()); assertEquals("hello0", new String(userLog.getData().array())); - //index == 5 is a DATA log(a user log) + // index == 5 is a DATA log(a user log) userLog = leader.readCommittedUserLog(5); assertNotNull(userLog); assertEquals(5, userLog.getIndex()); @@ -1726,7 +1853,7 @@ public void testBootStrapWithoutSnapshot() throws Exception { @Test public void testChangePeers() throws Exception { final PeerId peer0 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT); - final TestCluster cluster = new TestCluster("testChangePeers", dataPath, Arrays.asList(peer0)); + final TestCluster cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0)); assertTrue(cluster.start(peer0.getEndpoint())); cluster.waitLeader(); @@ -1745,7 +1872,7 @@ public void testChangePeers() throws Exception { Assert.assertEquals(peer, leader.getNodeId().getPeerId()); peer = new PeerId(TestUtils.getMyIp(), peer0.getEndpoint().getPort() + i + 1); final SynchronizedClosure done = new SynchronizedClosure(); - leader.changePeers(new Configuration(Arrays.asList(peer)), done); + leader.changePeers(new Configuration(Collections.singletonList(peer)), done); assertTrue(done.await().isOk()); } assertTrue(cluster.ensureSame()); @@ -1756,7 +1883,7 @@ public void testChangePeers() throws Exception { @Test public void testChangePeersAddMultiNodes() throws Exception { final PeerId peer0 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT); - final TestCluster cluster = new TestCluster("testChangePeers", dataPath, Arrays.asList(peer0)); + final TestCluster cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0)); assertTrue(cluster.start(peer0.getEndpoint())); cluster.waitLeader(); @@ -1770,22 +1897,22 @@ public void testChangePeersAddMultiNodes() throws Exception { } PeerId peer = new PeerId(TestUtils.getMyIp(), peer0.getEndpoint().getPort() + 1); - //fail, because the peers are not started. + // fail, because the peers are not started. final SynchronizedClosure done = new SynchronizedClosure(); - leader.changePeers(new Configuration(Arrays.asList(peer)), done); + leader.changePeers(new Configuration(Collections.singletonList(peer)), done); Assert.assertEquals(RaftError.ECATCHUP, done.await().getRaftError()); - //start peer1 + // start peer1 assertTrue(cluster.start(peer.getEndpoint())); - //still fail, because peer2 is not started + // still fail, because peer2 is not started done.reset(); leader.changePeers(conf, done); Assert.assertEquals(RaftError.ECATCHUP, done.await().getRaftError()); - //start peer2 + // start peer2 peer = new PeerId(TestUtils.getMyIp(), peer0.getEndpoint().getPort() + 2); assertTrue(cluster.start(peer.getEndpoint())); done.reset(); - //works + // works leader.changePeers(conf, done); assertTrue(done.await().isOk()); @@ -1806,7 +1933,7 @@ public void testChangePeersStepsDownInJointConsensus() throws Exception { final PeerId peer2 = JRaftUtils.getPeerId("127.0.0.1:5008"); final PeerId peer3 = JRaftUtils.getPeerId("127.0.0.1:5009"); - //start single cluster + // start single cluster peers.add(peer0); final TestCluster cluster = new TestCluster("testChangePeersStepsDownInJointConsensus", dataPath, peers); assertTrue(cluster.start(peer0.getEndpoint())); @@ -1816,7 +1943,7 @@ public void testChangePeersStepsDownInJointConsensus() throws Exception { assertNotNull(leader); this.sendTestTaskAndWait(leader); - //start peer1-3 + // start peer1-3 assertTrue(cluster.start(peer1.getEndpoint())); assertTrue(cluster.start(peer2.getEndpoint())); assertTrue(cluster.start(peer3.getEndpoint())); @@ -1827,12 +1954,12 @@ public void testChangePeersStepsDownInJointConsensus() throws Exception { conf.addPeer(peer2); conf.addPeer(peer3); - //change peers + // change peers final SynchronizedClosure done = new SynchronizedClosure(); leader.changePeers(conf, done); assertTrue(done.await().isOk()); - //stop peer3 + // stop peer3 assertTrue(cluster.stop(peer3.getEndpoint())); conf.removePeer(peer0); @@ -1883,53 +2010,49 @@ private Future startChangePeersThread(ChangeArg arg) { expectedErrors.add(RaftError.EPERM); expectedErrors.add(RaftError.ECATCHUP); - return Utils.runInThread(new Runnable() { - - @Override - public void run() { - try { - while (!arg.stop) { - arg.c.waitLeader(); - final Node leader = arg.c.getLeader(); - if (leader == null) { - continue; - } - //select peers in random - final Configuration conf = new Configuration(); - if (arg.dontRemoveFirstPeer) { - conf.addPeer(arg.peers.get(0)); - } - for (int i = 0; i < arg.peers.size(); i++) { - final boolean select = ThreadLocalRandom.current().nextInt(64) < 32; - if (select && !conf.contains(arg.peers.get(i))) { - conf.addPeer(arg.peers.get(i)); - } - } - if (conf.isEmpty()) { - LOG.warn("No peer has been selected"); - continue; + return Utils.runInThread(() -> { + try { + while (!arg.stop) { + arg.c.waitLeader(); + final Node leader = arg.c.getLeader(); + if (leader == null) { + continue; + } + // select peers in random + final Configuration conf = new Configuration(); + if (arg.dontRemoveFirstPeer) { + conf.addPeer(arg.peers.get(0)); + } + for (int i = 0; i < arg.peers.size(); i++) { + final boolean select = ThreadLocalRandom.current().nextInt(64) < 32; + if (select && !conf.contains(arg.peers.get(i))) { + conf.addPeer(arg.peers.get(i)); } - final SynchronizedClosure done = new SynchronizedClosure(); - leader.changePeers(conf, done); - done.await(); - assertTrue(done.getStatus().toString(), - done.getStatus().isOk() || expectedErrors.contains(done.getStatus().getRaftError())); } - } catch (final InterruptedException e) { - LOG.error("ChangePeersThread is interrupted", e); + if (conf.isEmpty()) { + LOG.warn("No peer has been selected"); + continue; + } + final SynchronizedClosure done = new SynchronizedClosure(); + leader.changePeers(conf, done); + done.await(); + assertTrue(done.getStatus().toString(), + done.getStatus().isOk() || expectedErrors.contains(done.getStatus().getRaftError())); } + } catch (final InterruptedException e) { + LOG.error("ChangePeersThread is interrupted", e); } }); } @Test public void testChangePeersChaosWithSnapshot() throws Exception { - //start cluster + // start cluster final List peers = new ArrayList<>(); peers.add(new PeerId("127.0.0.1", TestUtils.INIT_PORT)); final TestCluster cluster = new TestCluster("unittest", dataPath, peers, 1000); assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 1)); - //start other peers + // start other peers for (int i = 1; i < 10; i++) { final PeerId peer = new PeerId("127.0.0.1", TestUtils.INIT_PORT + i); peers.add(peer); @@ -1972,12 +2095,12 @@ public void testChangePeersChaosWithSnapshot() throws Exception { @Test public void testChangePeersChaosWithoutSnapshot() throws Exception { - //start cluster + // start cluster final List peers = new ArrayList<>(); peers.add(new PeerId("127.0.0.1", TestUtils.INIT_PORT)); final TestCluster cluster = new TestCluster("unittest", dataPath, peers, 1000); assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 100000)); - //start other peers + // start other peers for (int i = 1; i < 10; i++) { final PeerId peer = new PeerId("127.0.0.1", TestUtils.INIT_PORT + i); peers.add(peer); @@ -2021,12 +2144,12 @@ public void testChangePeersChaosWithoutSnapshot() throws Exception { @Test public void testChangePeersChaosApplyTasks() throws Exception { - //start cluster + // start cluster final List peers = new ArrayList<>(); peers.add(new PeerId("127.0.0.1", TestUtils.INIT_PORT)); final TestCluster cluster = new TestCluster("unittest", dataPath, peers, 1000); assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 100000)); - //start other peers + // start other peers for (int i = 1; i < 10; i++) { final PeerId peer = new PeerId("127.0.0.1", TestUtils.INIT_PORT + i); peers.add(peer); @@ -2042,31 +2165,28 @@ public void testChangePeersChaosApplyTasks() throws Exception { args.add(arg); futures.add(startChangePeersThread(arg)); - Utils.runInThread(new Runnable() { - @Override - public void run() { - try { - for (int i = 0; i < 5000;) { - cluster.waitLeader(); - final Node leader = cluster.getLeader(); - if (leader == null) { - continue; - } - final SynchronizedClosure done = new SynchronizedClosure(); - final Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done); - leader.apply(task); - final Status status = done.await(); - if (status.isOk()) { - LOG.info("Progress:" + (++i)); - } else { - assertEquals(RaftError.EPERM, status.getRaftError()); - } + Utils.runInThread(() -> { + try { + for (int i = 0; i < 5000;) { + cluster.waitLeader(); + final Node leader = cluster.getLeader(); + if (leader == null) { + continue; + } + final SynchronizedClosure done = new SynchronizedClosure(); + final Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done); + leader.apply(task); + final Status status = done.await(); + if (status.isOk()) { + LOG.info("Progress:" + (++i)); + } else { + assertEquals(RaftError.EPERM, status.getRaftError()); } - } catch (final Exception e) { - e.printStackTrace(); - } finally { - latch.countDown(); } + } catch (final Exception e) { + e.printStackTrace(); + } finally { + latch.countDown(); } }); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java index 785e5800f..19a98a10f 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java @@ -36,6 +36,7 @@ import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; import com.alipay.sofa.jraft.storage.SnapshotThrottle; import com.alipay.sofa.jraft.util.Endpoint; @@ -84,11 +85,16 @@ public boolean start(Endpoint listenAddr, boolean emptyPeers, int snapshotInterv public boolean start(Endpoint listenAddr, boolean emptyPeers, int snapshotIntervalSecs, boolean enableMetrics) throws IOException { - return this.start(listenAddr, emptyPeers, snapshotIntervalSecs, enableMetrics, null); + return this.start(listenAddr, emptyPeers, snapshotIntervalSecs, enableMetrics, null, null); } public boolean start(Endpoint listenAddr, boolean emptyPeers, int snapshotIntervalSecs, boolean enableMetrics, SnapshotThrottle snapshotThrottle) throws IOException { + return this.start(listenAddr, emptyPeers, snapshotIntervalSecs, enableMetrics, snapshotThrottle, null); + } + + public boolean start(Endpoint listenAddr, boolean emptyPeers, int snapshotIntervalSecs, boolean enableMetrics, + SnapshotThrottle snapshotThrottle, RaftOptions raftOptions) throws IOException { if (this.serverMap.get(listenAddr.toString()) != null) { return true; @@ -99,6 +105,9 @@ public boolean start(Endpoint listenAddr, boolean emptyPeers, int snapshotInterv nodeOptions.setEnableMetrics(enableMetrics); nodeOptions.setSnapshotThrottle(snapshotThrottle); nodeOptions.setSnapshotIntervalSecs(snapshotIntervalSecs); + if (raftOptions != null) { + nodeOptions.setRaftOptions(raftOptions); + } final String serverDataPath = this.dataPath + File.separator + listenAddr.toString().replace(':', '_'); FileUtils.forceMkdir(new File(serverDataPath)); nodeOptions.setLogUri(serverDataPath + File.separator + "logs"); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/FileServiceTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/FileServiceTest.java index 52bbea4e5..cc1d157e2 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/FileServiceTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/FileServiceTest.java @@ -35,6 +35,7 @@ import com.alipay.sofa.jraft.test.TestUtils; import com.google.protobuf.Message; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -107,8 +108,51 @@ public void testGetFileData() throws IOException { Message msg = FileService.getInstance().handleGetFile(request, new RpcRequestClosure(bizContext, asyncContext)); assertTrue(msg instanceof RpcRequests.GetFileResponse); RpcRequests.GetFileResponse response = (RpcRequests.GetFileResponse) msg; - assertEquals(response.getEof(), true); + assertTrue(response.getEof()); assertEquals("jraft is great!", new String(response.getData().toByteArray())); assertEquals(-1, response.getReadSize()); } + + private String writeLargeData() throws IOException { + File file = new File(this.path + File.separator + "data"); + String data = "jraft is great!"; + for (int i = 0; i < 1000; i++) { + FileUtils.writeStringToFile(file, data, true); + } + return data; + } + + @Test + public void testGetLargeFileData() throws IOException { + final String data = writeLargeData(); + final long readerId = FileService.getInstance().addReader(this.fileReader); + int fileOffset = 0; + while (true) { + final RpcRequests.GetFileRequest request = RpcRequests.GetFileRequest.newBuilder() // + .setCount(4096).setFilename("data") // + .setOffset(fileOffset) // + .setReaderId(readerId) // + .build(); + final BizContext bizContext = Mockito.mock(BizContext.class); + final AsyncContext asyncContext = Mockito.mock(AsyncContext.class); + final Message msg = FileService.getInstance() // + .handleGetFile(request, new RpcRequestClosure(bizContext, asyncContext)); + assertTrue(msg instanceof RpcRequests.GetFileResponse); + final RpcRequests.GetFileResponse response = (RpcRequests.GetFileResponse) msg; + final byte[] sourceArray = data.getBytes(); + final byte[] respData = response.getData().toByteArray(); + final int length = sourceArray.length; + int offset = 0; + while (offset + length <= respData.length) { + final byte[] respArray = new byte[length]; + System.arraycopy(respData, offset, respArray, 0, length); + assertArrayEquals(sourceArray, respArray); + offset += length; + } + fileOffset += offset; + if (response.getEof()) { + break; + } + } + } }