Skip to content

Commit

Permalink
Rewritten Netty Jersey implementation using direct ByteBuf consumption
Browse files Browse the repository at this point in the history
Signed-off-by: Venkat Ganesh 010gvr@gmail.com
  • Loading branch information
010gvr authored and Venkat committed Nov 6, 2019
1 parent 72c27bc commit 9e9a096
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.glassfish.jersey.netty.connector;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
Expand All @@ -31,6 +30,7 @@
import org.glassfish.jersey.netty.connector.internal.NettyInputStream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
Expand All @@ -50,7 +50,7 @@
class JerseyClientHandler extends SimpleChannelInboundHandler<HttpObject> {

private final NettyConnector connector;
private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();

private final AsyncConnectorCallback asyncConnectorCallback;
private final ClientRequest jerseyRequest;
Expand Down Expand Up @@ -89,15 +89,15 @@ public String getReasonPhrase() {
for (Map.Entry<String, String> entry : response.headers().entries()) {
jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue());
}

isList.clear(); // clearing the content - possible leftover from previous request processing.
// request entity handling.
if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0)
|| HttpUtil.isTransferEncodingChunked(response)) {

ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
isList.add(NettyInputStream.END_OF_INPUT_ERROR);
isList.add(Unpooled.EMPTY_BUFFER);
}
});

Expand All @@ -123,21 +123,16 @@ public void run() {

}
if (msg instanceof HttpContent) {

HttpContent httpContent = (HttpContent) msg;

ByteBuf content = httpContent.content();

if (content.isReadable()) {
// copy bytes - when netty reads last chunk, it automatically closes the channel, which invalidates all
// relates ByteBuffs.
byte[] bytes = new byte[content.readableBytes()];
content.getBytes(content.readerIndex(), bytes);
isList.add(new ByteArrayInputStream(bytes));
content.retain();
isList.add(content);
}

if (msg instanceof LastHttpContent) {
isList.add(NettyInputStream.END_OF_INPUT);
isList.add(Unpooled.EMPTY_BUFFER);
}
}
}
Expand All @@ -153,6 +148,6 @@ public void run() {
});
}
future.completeExceptionally(cause);
isList.add(NettyInputStream.END_OF_INPUT_ERROR);
ctx.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@
public class JerseyChunkedInput extends OutputStream implements ChunkedInput<ByteBuf>, ChannelFutureListener {

private static final ByteBuffer VOID = ByteBuffer.allocate(0);
private static final int CAPACITY = 8;
// TODO this needs to be configurable, see JERSEY-3228
private static final int WRITE_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 10000;
private static final int CAPACITY = Integer.getInteger("jersey.ci.capacity", 8);
private static final int WRITE_TIMEOUT = Integer.getInteger("jersey.ci.read.timeout", 10000);
private static final int READ_TIMEOUT = Integer.getInteger("jersey.ci.write.timeout", 10000);

private final LinkedBlockingDeque<ByteBuffer> queue = new LinkedBlockingDeque<>(CAPACITY);
private final Channel ctx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,77 +20,47 @@
import java.io.InputStream;
import java.util.concurrent.LinkedBlockingDeque;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

/**
* Input stream which servers as Request entity input.
* <p>
* Converts Netty NIO buffers to an input streams and stores them in the queue,
* waiting for Jersey to process it.
*
* @author Pavel Bucek
* Consumes a list of pending {@link ByteBuf}s and processes them on request by Jersey
*/
public class NettyInputStream extends InputStream {

private volatile boolean end = false;

/**
* End of input.
*/
public static final InputStream END_OF_INPUT = new InputStream() {
@Override
public int read() throws IOException {
return 0;
}

@Override
public String toString() {
return "END_OF_INPUT " + super.toString();
}
};

/**
* Unexpected end of input.
*/
public static final InputStream END_OF_INPUT_ERROR = new InputStream() {
@Override
public int read() throws IOException {
return 0;
}

@Override
public String toString() {
return "END_OF_INPUT_ERROR " + super.toString();
}
};

private final LinkedBlockingDeque<InputStream> isList;
private final LinkedBlockingDeque<ByteBuf> isList;

public NettyInputStream(LinkedBlockingDeque<InputStream> isList) {
public NettyInputStream(LinkedBlockingDeque<ByteBuf> isList) {
this.isList = isList;
}

private interface ISReader {
int readFrom(InputStream take) throws IOException;
}

private int readInternal(ISReader isReader) throws IOException {
if (end) {
return -1;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {

InputStream take;
ByteBuf take;
try {
take = isList.take();

if (checkEndOfInput(take)) {
boolean isReadable = take.isReadable();
int read = -1;
if (checkEndOfInputOrError(take)) {
take.release();
return -1;
}

int read = isReader.readFrom(take);

if (take.available() > 0) {
isList.addFirst(take);
if (isReadable) {
int readableBytes = take.readableBytes();
read = Math.min(readableBytes, len);
take.readBytes(b, off, read);
if (read < len) {
take.release();
} else {
isList.addFirst(take);
}
} else {
take.close();
read = 0;
take.release(); //We don't need `0`
}

return read;
Expand All @@ -100,33 +70,53 @@ private int readInternal(ISReader isReader) throws IOException {
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return readInternal(take -> take.read(b, off, len));
public int read() throws IOException {

ByteBuf take;
try {
take = isList.take();
boolean isReadable = take.isReadable();
if (checkEndOfInputOrError(take)) {
take.release();
return -1;
}

if (isReadable) {
return take.readInt();
} else {
take.release(); //We don't need `0`
}

return 0;
} catch (InterruptedException e) {
throw new IOException("Interrupted.", e);
}
}

@Override
public int read() throws IOException {
return readInternal(InputStream::read);
public void close() throws IOException {
if (isList != null) {
while (!isList.isEmpty()) {
try {
isList.take().release();
} catch (InterruptedException e) {
throw new IOException("Interrupted. Potential ByteBuf Leak.", e);
}
}
}
super.close();
}

@Override
public int available() throws IOException {
InputStream peek = isList.peek();
if (peek != null) {
return peek.available();
ByteBuf peek = isList.peek();
if (peek != null && peek.isReadable()) {
return peek.readableBytes();
}

return 0;
}

private boolean checkEndOfInput(InputStream take) throws IOException {
if (take == END_OF_INPUT) {
end = true;
return true;
} else if (take == END_OF_INPUT_ERROR) {
end = true;
throw new IOException("Connection was closed prematurely.");
}
return false;
private boolean checkEndOfInputOrError(ByteBuf take) throws IOException {
return take == Unpooled.EMPTY_BUFFER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import java.util.concurrent.LinkedBlockingDeque;

import javax.ws.rs.core.SecurityContext;

import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -55,7 +55,7 @@
class JerseyHttp2ServerHandler extends ChannelDuplexHandler {

private final URI baseUri;
private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
private final NettyHttpContainer container;
private final ResourceConfig resourceConfig;

Expand Down Expand Up @@ -92,9 +92,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
* Process incoming data.
*/
private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
isList.add(new ByteBufInputStream(data.content(), true));
isList.add(data.content());
if (data.isEndStream()) {
isList.add(NettyInputStream.END_OF_INPUT);
isList.add(Unpooled.EMPTY_BUFFER);
}
}

Expand Down Expand Up @@ -163,7 +163,7 @@ public void removeProperty(String name) {
ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
isList.add(NettyInputStream.END_OF_INPUT_ERROR);
isList.add(Unpooled.EMPTY_BUFFER);
}
});

Expand Down
Loading

0 comments on commit 9e9a096

Please sign in to comment.