diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index b152a1dbe33f..ee0aeeaad2fb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -35,7 +35,6 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.net.UnknownHostException; -import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayDeque; import java.util.Locale; @@ -70,6 +69,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; @@ -599,37 +600,44 @@ private void tracedWriteRequest(Call call) throws IOException { * @see #readResponse() */ private void writeRequest(Call call) throws IOException { - ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, - this.compressor, call.cells); - CellBlockMeta cellBlockMeta; - if (cellBlock != null) { - cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build(); - } else { - cellBlockMeta = null; - } - RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); + ByteBuf cellBlock = null; + try { + cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, + call.cells, PooledByteBufAllocator.DEFAULT); + CellBlockMeta cellBlockMeta; + if (cellBlock != null) { + cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build(); + } else { + cellBlockMeta = null; + } + RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); - setupIOstreams(); + setupIOstreams(); - // Now we're going to write the call. We take the lock, then check that the connection - // is still valid, and, if so we do the write to the socket. If the write fails, we don't - // know where we stand, we have to close the connection. - if (Thread.interrupted()) { - throw new InterruptedIOException(); - } + // Now we're going to write the call. We take the lock, then check that the connection + // is still valid, and, if so we do the write to the socket. If the write fails, we don't + // know where we stand, we have to close the connection. + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } - calls.put(call.id, call); // We put first as we don't want the connection to become idle. - // from here, we do not throw any exception to upper layer as the call has been tracked in the - // pending calls map. - try { - call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); - } catch (Throwable t) { - if(LOG.isTraceEnabled()) { - LOG.trace("Error while writing call, call_id:" + call.id, t); + calls.put(call.id, call); // We put first as we don't want the connection to become idle. + // from here, we do not throw any exception to upper layer as the call has been tracked in + // the pending calls map. + try { + call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); + } catch (Throwable t) { + if(LOG.isTraceEnabled()) { + LOG.trace("Error while writing call, call_id:" + call.id, t); + } + IOException e = IPCUtil.toIOE(t); + closeConn(e); + return; + } + } finally { + if (cellBlock != null) { + cellBlock.release(); } - IOException e = IPCUtil.toIOE(t); - closeConn(e); - return; } notifyAll(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index c6bbd0ba5eba..46562c514fbf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -23,7 +23,6 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -41,7 +40,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Message; - +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -62,19 +61,19 @@ class IPCUtil { * @throws IOException if write action fails */ public static int write(final OutputStream dos, final Message header, final Message param, - final ByteBuffer cellBlock) throws IOException { + final ByteBuf cellBlock) throws IOException { // Must calculate total size and write that first so other side can read it all in in one // swoop. This is dictated by how the server is currently written. Server needs to change // if we are to be able to write without the length prefixing. int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); if (cellBlock != null) { - totalSize += cellBlock.remaining(); + totalSize += cellBlock.readableBytes(); } return write(dos, header, param, cellBlock, totalSize); } private static int write(final OutputStream dos, final Message header, final Message param, - final ByteBuffer cellBlock, final int totalSize) throws IOException { + final ByteBuf cellBlock, final int totalSize) throws IOException { // I confirmed toBytes does same as DataOutputStream#writeInt. dos.write(Bytes.toBytes(totalSize)); // This allocates a buffer that is the size of the message internally. @@ -83,7 +82,7 @@ private static int write(final OutputStream dos, final Message header, final Mes param.writeDelimitedTo(dos); } if (cellBlock != null) { - dos.write(cellBlock.array(), 0, cellBlock.remaining()); + cellBlock.readBytes(dos, cellBlock.readableBytes()); } dos.flush(); return totalSize;