diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 076c09cd3c775b..32b2d76d8b8e1e 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -550,19 +550,22 @@ public ByteBuffer pread(TBrokerFD fd, long offset, long length) { currentStreamOffset, offset); } } - byte[] buf; + ByteBuffer buf; if (length > readBufferSize) { - buf = new byte[readBufferSize]; + buf = ByteBuffer.allocate(readBufferSize); } else { - buf = new byte[(int) length]; + buf = ByteBuffer.allocate((int) length); } try { - int readLength = fsDataInputStream.read(buf); + int readLength = readByteBufferFully(fsDataInputStream, buf); if (readLength < 0) { throw new BrokerException(TBrokerOperationStatusCode.END_OF_FILE, "end of file reached"); } - return ByteBuffer.wrap(buf, 0, readLength); + if (logger.isDebugEnable()) { + logger.debug("read buffer from input stream, buffer size:" + buf.capacity() + ", read length:" + readLength); + } + return buf; } catch (IOException e) { logger.error("errors while read data from stream", e); throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, @@ -655,6 +658,29 @@ public void ping(String clientId) { private static TBrokerFD parseUUIDToFD(UUID uuid) { return new TBrokerFD(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); } + + private int readByteBuffer(FSDataInputStream is, ByteBuffer dest) throws IOException { + int pos = dest.position(); + int result = is.read(dest); + if (result > 0) { + // Ensure this explicitly since versions before 2.7 read doesn't do it. + dest.position(pos + result); + } + return result; + } + + private int readByteBufferFully(FSDataInputStream is, ByteBuffer dest) throws IOException { + int result = 0; + while (dest.remaining() > 0) { + int n = readByteBuffer(is, dest); + if (n <= 0) { + break; + } + result += n; + } + dest.flip(); + return result; + } class FileSystemExpirationChecker implements Runnable { @Override