Skip to content

Commit 647613b

Browse files
chenxu14saintstack
authored andcommitted
HBASE-22905 Avoid temp ByteBuffer allocation in (#538)
BlockingRpcConnection#writeRequest
1 parent 0e87e86 commit 647613b

File tree

2 files changed

+41
-34
lines changed

2 files changed

+41
-34
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.net.Socket;
3636
import java.net.SocketTimeoutException;
3737
import java.net.UnknownHostException;
38-
import java.nio.ByteBuffer;
3938
import java.security.PrivilegedExceptionAction;
4039
import java.util.ArrayDeque;
4140
import java.util.Locale;
@@ -70,6 +69,8 @@
7069
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
7170
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
7271
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
72+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
73+
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
7374
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
7475
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
7576
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -599,37 +600,44 @@ private void tracedWriteRequest(Call call) throws IOException {
599600
* @see #readResponse()
600601
*/
601602
private void writeRequest(Call call) throws IOException {
602-
ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
603-
this.compressor, call.cells);
604-
CellBlockMeta cellBlockMeta;
605-
if (cellBlock != null) {
606-
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
607-
} else {
608-
cellBlockMeta = null;
609-
}
610-
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
603+
ByteBuf cellBlock = null;
604+
try {
605+
cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
606+
call.cells, PooledByteBufAllocator.DEFAULT);
607+
CellBlockMeta cellBlockMeta;
608+
if (cellBlock != null) {
609+
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
610+
} else {
611+
cellBlockMeta = null;
612+
}
613+
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
611614

612-
setupIOstreams();
615+
setupIOstreams();
613616

614-
// Now we're going to write the call. We take the lock, then check that the connection
615-
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
616-
// know where we stand, we have to close the connection.
617-
if (Thread.interrupted()) {
618-
throw new InterruptedIOException();
619-
}
617+
// Now we're going to write the call. We take the lock, then check that the connection
618+
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
619+
// know where we stand, we have to close the connection.
620+
if (Thread.interrupted()) {
621+
throw new InterruptedIOException();
622+
}
620623

621-
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
622-
// from here, we do not throw any exception to upper layer as the call has been tracked in the
623-
// pending calls map.
624-
try {
625-
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
626-
} catch (Throwable t) {
627-
if(LOG.isTraceEnabled()) {
628-
LOG.trace("Error while writing call, call_id:" + call.id, t);
624+
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
625+
// from here, we do not throw any exception to upper layer as the call has been tracked in
626+
// the pending calls map.
627+
try {
628+
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
629+
} catch (Throwable t) {
630+
if(LOG.isTraceEnabled()) {
631+
LOG.trace("Error while writing call, call_id:" + call.id, t);
632+
}
633+
IOException e = IPCUtil.toIOE(t);
634+
closeConn(e);
635+
return;
636+
}
637+
} finally {
638+
if (cellBlock != null) {
639+
cellBlock.release();
629640
}
630-
IOException e = IPCUtil.toIOE(t);
631-
closeConn(e);
632-
return;
633641
}
634642
notifyAll();
635643
}

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.net.ConnectException;
2424
import java.net.InetSocketAddress;
2525
import java.net.SocketTimeoutException;
26-
import java.nio.ByteBuffer;
2726
import java.nio.channels.ClosedChannelException;
2827
import java.util.concurrent.TimeoutException;
2928
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -41,7 +40,7 @@
4140
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
4241
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
4342
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
44-
43+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
4544
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
4645
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
4746
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@@ -62,19 +61,19 @@ class IPCUtil {
6261
* @throws IOException if write action fails
6362
*/
6463
public static int write(final OutputStream dos, final Message header, final Message param,
65-
final ByteBuffer cellBlock) throws IOException {
64+
final ByteBuf cellBlock) throws IOException {
6665
// Must calculate total size and write that first so other side can read it all in in one
6766
// swoop. This is dictated by how the server is currently written. Server needs to change
6867
// if we are to be able to write without the length prefixing.
6968
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
7069
if (cellBlock != null) {
71-
totalSize += cellBlock.remaining();
70+
totalSize += cellBlock.readableBytes();
7271
}
7372
return write(dos, header, param, cellBlock, totalSize);
7473
}
7574

7675
private static int write(final OutputStream dos, final Message header, final Message param,
77-
final ByteBuffer cellBlock, final int totalSize) throws IOException {
76+
final ByteBuf cellBlock, final int totalSize) throws IOException {
7877
// I confirmed toBytes does same as DataOutputStream#writeInt.
7978
dos.write(Bytes.toBytes(totalSize));
8079
// This allocates a buffer that is the size of the message internally.
@@ -83,7 +82,7 @@ private static int write(final OutputStream dos, final Message header, final Mes
8382
param.writeDelimitedTo(dos);
8483
}
8584
if (cellBlock != null) {
86-
dos.write(cellBlock.array(), 0, cellBlock.remaining());
85+
cellBlock.readBytes(dos, cellBlock.readableBytes());
8786
}
8887
dos.flush();
8988
return totalSize;

0 commit comments

Comments
 (0)