Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixbug/install snapshot bug #80

Merged
merged 16 commits into from
Apr 1, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public class RpcOptions {
*/
private int rpcDefaultTimeout = 5000;

/**
* Install snapshot RPC request default timeout in milliseconds
* Default: 5 * 60 * 1000(5min)
*/
private int rpcInstallSnapshotTimeout = 5 * 60 * 1000;

/**
* Rpc process thread pool size
* Default: 80
Expand Down Expand Up @@ -75,10 +81,19 @@ public void setRpcDefaultTimeout(int rpcDefaultTimeout) {
this.rpcDefaultTimeout = rpcDefaultTimeout;
}

public int getRpcInstallSnapshotTimeout() {
return rpcInstallSnapshotTimeout;
}

public void setRpcInstallSnapshotTimeout(int rpcInstallSnapshotTimeout) {
this.rpcInstallSnapshotTimeout = rpcInstallSnapshotTimeout;
}

@Override
public String toString() {
return "RpcOptions{" + "rpcConnectTimeoutMs=" + rpcConnectTimeoutMs + ", rpcDefaultTimeout="
+ rpcDefaultTimeout + ", rpcProcessorThreadPoolSize=" + rpcProcessorThreadPoolSize + ", metricRegistry="
+ metricRegistry + '}';
+ rpcDefaultTimeout + ", rpcInstallSnapshotTimeout=" + rpcInstallSnapshotTimeout
+ ", rpcProcessorThreadPoolSize=" + rpcProcessorThreadPoolSize + ", metricRegistry=" + metricRegistry
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Future<Message> getFile(Endpoint endpoint, GetFileRequest request, int ti
@Override
public Future<Message> installSnapshot(Endpoint endpoint, InstallSnapshotRequest request,
RpcResponseClosure<InstallSnapshotResponse> done) {
return invokeWithDone(endpoint, request, done, rpcOptions.getRpcDefaultTimeout());
return invokeWithDone(endpoint, request, done, rpcOptions.getRpcInstallSnapshotTimeout());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public final class FileService {
private static final FileService INSTANCE = new FileService();

private final ConcurrentMap<Long, FileReader> fileReaderMap = new ConcurrentHashMap<>();
private final AtomicLong nextId;
private final AtomicLong nextId = new AtomicLong();

/**
* Retrieve the singleton instance of FileService.
Expand All @@ -72,18 +72,16 @@ void clear() {
}

private FileService() {
this.nextId = new AtomicLong();
final long initValue = Math.abs(Utils.getProcessId(ThreadLocalRandom.current().nextLong(10000,
Integer.MAX_VALUE)) << 45
| System.nanoTime() << 17 >> 17);
this.nextId.set(initValue);
LOG.info("Initial file reader id in FileService is {}", this.nextId);
final long processId = Utils.getProcessId(ThreadLocalRandom.current().nextLong(10000, Integer.MAX_VALUE));
final long initialValue = Math.abs(processId << 45 | System.nanoTime() << 17 >> 17);
this.nextId.set(initialValue);
LOG.info("Initial file reader id in FileService is {}", initialValue);
}

/**
* Handle GetFileRequest ,run the response or set the response with done.
* Handle GetFileRequest, run the response or set the response with done.
*/
public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) {
public Message handleGetFile(final GetFileRequest request, final RpcRequestClosure done) {
if (request.getCount() <= 0 || request.getOffset() < 0) {
return RpcResponseFactory.newResponse(RaftError.EREQUEST, "Invalid request: %s", request);
}
Expand All @@ -92,23 +90,24 @@ public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) {
return RpcResponseFactory.newResponse(RaftError.ENOENT, "Fail to find reader=%d", request.getReaderId());
}

LOG.debug("GetFile from {} path={} filename={} offset={} count={}", done.getBizContext().getRemoteAddress(),
reader.getPath(), request.getFilename(), request.getOffset(), request.getCount());
if (LOG.isDebugEnabled()) {
LOG.debug("GetFile from {} path={} filename={} offset={} count={}",
done.getBizContext().getRemoteAddress(), reader.getPath(), request.getFilename(), request.getOffset(),
request.getCount());
}

final ByteBufferCollector dataBuffer = ByteBufferCollector.allocate();
final GetFileResponse.Builder responseBuilder = GetFileResponse.newBuilder();
try {
final int read = reader
.readFile(dataBuffer, request.getFilename(), request.getOffset(), request.getCount());
responseBuilder.setReadSize(read);
if (read == -1) {
responseBuilder.setEof(true);
}
responseBuilder.setEof(read == FileReader.EOF);
final ByteBuffer buf = dataBuffer.getBuffer();
buf.flip();
if (!buf.hasRemaining()) {
// skip empty data
return responseBuilder.setData(ByteString.EMPTY).build();
responseBuilder.setData(ByteString.EMPTY);
} else {
// TODO check hole
responseBuilder.setData(ZeroByteStringHelper.wrap(buf));
Expand All @@ -128,9 +127,9 @@ public Message handleGetFile(GetFileRequest request, RpcRequestClosure done) {
/**
* Adds a file reader and return it's generated readerId.
*/
public long addReader(FileReader reader) {
public long addReader(final FileReader reader) {
final long readerId = this.nextId.getAndIncrement();
if (fileReaderMap.putIfAbsent(readerId, reader) == null) {
if (this.fileReaderMap.putIfAbsent(readerId, reader) == null) {
return readerId;
} else {
return -1L;
Expand All @@ -140,7 +139,7 @@ public long addReader(FileReader reader) {
/**
* Remove the reader by readerId.
*/
public boolean removeReader(long readerId) {
public boolean removeReader(final long readerId) {
return this.fileReaderMap.remove(readerId) != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
public interface FileReader {

int EOF = -1;

/**
* Get the file path.
*
Expand All @@ -39,8 +41,17 @@ public interface FileReader {

/**
* Read file into buf starts from offset at most maxCount.
* Returns -1 if reaches end, else return read count.
*
* @param buf read bytes into this buf
* @param fileName file name
* @param offset the offset of file
* @param maxCount max read bytes
* @return -1 if reaches end, else return read count.
* @throws IOException if some I/O error occurs
* @throws RetryAgainException if it's not allowed to read partly
* or it's allowed but throughput is throttled to 0, try again.
*/
int readFile(ByteBufferCollector buf, String fileName, long offset, long maxCount) throws IOException,
RetryAgainException;
int readFile(final ByteBufferCollector buf, final String fileName, final long offset, final long maxCount)
throws IOException,
RetryAgainException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import com.google.protobuf.Message;

/**
* read a file data form local dir by fileName.
* Read a file data form local dir by fileName.
*
* @author boyan (boyan@alibaba-inc.com)
*
* 2018-Apr-06 9:25:12 PM
*/
public class LocalDirReader implements FileReader {

private static final Logger LOG = LoggerFactory.getLogger(LocalDirReader.class);

private final String path;
Expand All @@ -46,44 +48,45 @@ public LocalDirReader(String path) {

@Override
public String getPath() {
return this.path;
return path;
}

@Override
public int readFile(ByteBufferCollector buf, String fileName, long offset, long maxCount) throws IOException,
RetryAgainException {
return this.readFileWithMeta(buf, fileName, null, offset, maxCount);
public int readFile(final ByteBufferCollector buf, final String fileName, final long offset, final long maxCount)
throws IOException,
RetryAgainException {
return readFileWithMeta(buf, fileName, null, offset, maxCount);
}

@SuppressWarnings("unused")
protected int readFileWithMeta(ByteBufferCollector buf, String fileName, Message fileMeta, long offset,
long maxCount) throws IOException, RetryAgainException {
protected int readFileWithMeta(final ByteBufferCollector buf, final String fileName, final Message fileMeta,
long offset, final long maxCount) throws IOException, RetryAgainException {
buf.expandIfNecessary();
final String filePath = this.path + File.separator + fileName;
final File file = new File(filePath);
try (FileInputStream input = new FileInputStream(file); FileChannel fc = input.getChannel()) {
try (final FileInputStream input = new FileInputStream(file); final FileChannel fc = input.getChannel()) {
int totalRead = 0;
while (true) {
final int nread = fc.read(buf.getBuffer(), offset);
if (nread <= 0) {
return -1;
return EOF;
}
totalRead += nread;
if (totalRead < maxCount) {
if (buf.hasRemaining()) {
return -1;
return EOF;
} else {
buf.expandAtMost((int) (maxCount - totalRead));
offset += nread;
}
} else {
final long fsize = file.length();
if (fsize < 0) {
LOG.warn("Invlaid file length {}", filePath);
return -1;
LOG.warn("Invalid file length {}", filePath);
return EOF;
}
if (fsize == offset + maxCount) {
return -1;
if (fsize == offset + nread) {
return EOF;
} else {
return totalRead;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,20 @@ public SnapshotFileReader(String path, SnapshotThrottle snapshotThrottle) {
}

public boolean open() {
final File file = new File(this.getPath());
final File file = new File(getPath());
return file.exists();
}

@Override
public int readFile(ByteBufferCollector metaBufferCollector, String fileName, long offset, long maxCount)
throws IOException,
RetryAgainException {
public int readFile(final ByteBufferCollector metaBufferCollector, final String fileName, final long offset,
final long maxCount) throws IOException, RetryAgainException {
// read the whole meta file.
if (fileName.equals(Snapshot.JRAFT_SNAPSHOT_META_FILE)) {
final ByteBuffer metaBuf = this.metaTable.saveToByteBufferAsRemote();
//because bufRef will flip the buffer before using, so we must set the meta buffer position to it's limit.
// because bufRef will flip the buffer before using, so we must set the meta buffer position to it's limit.
metaBuf.position(metaBuf.limit());
metaBufferCollector.setBuffer(metaBuf);
return -1;
return EOF;
}
final LocalFileMeta fileMeta = this.metaTable.getFileMeta(fileName);
if (fileMeta == null) {
Expand All @@ -78,7 +77,7 @@ public int readFile(ByteBufferCollector metaBufferCollector, String fileName, lo
// go through throttle
long newMaxCount = maxCount;
if (this.snapshotThrottle != null) {
newMaxCount = snapshotThrottle.throttledByThroughput(maxCount);
newMaxCount = this.snapshotThrottle.throttledByThroughput(maxCount);
if (newMaxCount < maxCount) {
// if it's not allowed to read partly or it's allowed but
// throughput is throttled to 0, try again.
Expand All @@ -88,7 +87,7 @@ public int readFile(ByteBufferCollector metaBufferCollector, String fileName, lo
}
}

return this.readFileWithMeta(metaBufferCollector, fileName, fileMeta, offset, newMaxCount);
return super.readFileWithMeta(metaBufferCollector, fileName, fileMeta, offset, newMaxCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个 super 改动是不必要的,并且会引入风险,未来如果这个类覆写了上面这个方法,反而可能遗忘修改这里。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

恩,有道理

}

}
Loading