Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some code cleanup and optimizations when writing data from Jersey #463

Merged
merged 2 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -87,7 +87,7 @@ private void publish(byte[] buffer, int offset, int length) throws IOException {
throw new IOException("Output stream already closed.");
}

sub.onNext(ByteBuffer.wrap(buffer, offset, length));
sub.onNext(createBuffer(buffer, offset, length));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -114,4 +114,20 @@ private void complete(Throwable t) {
}
});
}

/**
* Creates a {@link ByteBuffer} by making a copy of the underlying
* byte array. Jersey will reuse this array, so it needs to be
* copied here.
*
* @param buffer The buffer.
* @param offset Offset in buffer.
* @param length Length of buffer.
* @return Newly created {@link ByteBuffer}.
*/
private ByteBuffer createBuffer(byte[] buffer, int offset, int length) {
ByteBuffer byteBuffer = ByteBuffer.allocate(length - offset);
byteBuffer.put(buffer, offset, length);
return (ByteBuffer) byteBuffer.clear(); // resets counters
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,8 @@

package io.helidon.webserver.jersey;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -142,28 +138,9 @@ public void write(int b) throws IOException {
// in case of SSE every response chunk needs to be flushed
boolean doFlush = MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(context.getMediaType());

res.send(
ReactiveStreamsAdapter.publisherToFlow(
ReactiveStreamsAdapter.publisherFromFlow(publisher)
/*
The problem is that Jersey reuses the byte[] array it uses for writing
to the OutputStream.write() method. As such we must duplicate the array
because we need to return from the 'write()' method sooner than when
the underlying TCP server flushes the bytes.

Following fails when writing large amount of data to the response:
.map(byteBuffer -> new ResponseChunk(false, byteBuffer))));
*/
.map(byteBuffer -> {
try {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
WritableByteChannel ch = Channels.newChannel(stream);
ch.write(byteBuffer);
return DataChunk.create(doFlush, ByteBuffer.wrap(stream.toByteArray()));
} catch (IOException e) {
throw new IllegalStateException("this never happens", e);
}
})));
res.send(ReactiveStreamsAdapter.publisherToFlow(
ReactiveStreamsAdapter.publisherFromFlow(publisher)
.map(byteBuffer -> DataChunk.create(doFlush, byteBuffer))));

return publisher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.logging.Level;
Expand All @@ -36,7 +34,6 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
Expand All @@ -46,7 +43,6 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

Expand Down Expand Up @@ -79,11 +75,11 @@ class BareResponseImpl implements BareResponse {
private volatile Flow.Subscription subscription;

/**
* @param ctx the channel handler context
* @param request the request
* @param ctx the channel handler context
* @param request the request
* @param requestContentConsumed whether the request content is consumed
* @param thread the outbound event loop thread which will be used to write the response
* @param requestId the correlation ID that is added to the log statements
* @param thread the outbound event loop thread which will be used to write the response
* @param requestId the correlation ID that is added to the log statements
*/
BareResponseImpl(ChannelHandlerContext ctx,
HttpRequest request,
Expand All @@ -103,10 +99,9 @@ class BareResponseImpl implements BareResponse {
this.ctx = ctx;
this.requestId = requestId;
ctx.channel()
.closeFuture()
// to make this work, when programmatically closing the channel, the responseFuture must be closed beforehand!
.addListener(channelFuture -> responseFuture
.completeExceptionally(CLOSED));
.closeFuture()
// to make this work, when programmatically closing the channel, the responseFuture must be closed beforehand!
.addListener(channelFuture -> responseFuture.completeExceptionally(CLOSED));
this.keepAlive = HttpUtil.isKeepAlive(request);
this.requestHeaders = request.headers();
}
Expand Down Expand Up @@ -137,19 +132,15 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}

runOnOutboundEventLoopThread(() -> {
ctx.writeAndFlush(response)
.addListener(future -> {
LOGGER.finest(() -> log("Writing headers: " + status));
ctx.writeAndFlush(response)
.addListener(future -> {
if (future.isSuccess()) {
headersFuture.complete(this);
}
})
.addListener(completeOnFailureListener("An exception occurred when writing headers."))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

LOGGER.finest(() -> log("Writing headers: " + status));
});
headersFuture.complete(this);
.addListener(completeOnFailureListener("An exception occurred when writing headers."))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}

/**
Expand Down Expand Up @@ -180,36 +171,31 @@ private void completeInternal(Throwable throwable) {
}

if (keepAlive) {
runOnOutboundEventLoopThread(() -> {
LOGGER.finest(() -> log("Writing an empty last http content; keep-alive: true"));
LOGGER.finest(() -> log("Writing an empty last http content; keep-alive: true"));

if (!requestContentConsumed.getAsBoolean()) {
// the request content wasn't read, close the connection once the content is fully written.
LOGGER.finer(() -> log("Request content not fully read; trying to keep the connection; keep-alive: true"));
if (!requestContentConsumed.getAsBoolean()) {
// the request content wasn't read, close the connection once the content is fully written.
LOGGER.finer(() -> log("Request content not fully read; trying to keep the connection; keep-alive: true"));

// if content is not consumed, we need to trigger next chunk read in order to not get stuck forever; the
// connection will be closed in the ForwardingHandler in case there is more than just small amount of data
ctx.channel().read();
}
// if content is not consumed, we need to trigger next chunk read in order to not get stuck forever; the
// connection will be closed in the ForwardingHandler in case there is more than just small amount of data
ctx.channel().read();
}

writeLastContent(throwable, ChannelFutureListener.CLOSE_ON_FAILURE);
});
writeLastContent(throwable, ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
// If keep-alive is off, close the connection once the content is fully written.
runOnOutboundEventLoopThread(() -> {
LOGGER.finest(() -> log("Closing with an empty buffer; keep-alive: " + keepAlive));

writeLastContent(throwable, ChannelFutureListener.CLOSE);
});
LOGGER.finest(() -> log("Closing with an empty buffer; keep-alive: " + keepAlive));

writeLastContent(throwable, ChannelFutureListener.CLOSE);
}
}

private void writeLastContent(final Throwable throwable, final ChannelFutureListener closeAction) {
ctx.writeAndFlush(LAST_HTTP_CONTENT)
.addListener(completeOnFailureListener("An exception occurred when writing last http content."))
.addListener(preventMaskingExceptionOnFailureListener(throwable))
.addListener(completeOnSuccessListener(throwable))
.addListener(closeAction);
.addListener(completeOnFailureListener("An exception occurred when writing last http content."))
.addListener(completeOnSuccessListener(throwable))
.addListener(closeAction);
}

private GenericFutureListener<Future<? super Void>> completeOnFailureListener(String message) {
Expand All @@ -220,14 +206,6 @@ private GenericFutureListener<Future<? super Void>> completeOnFailureListener(St
};
}

private GenericFutureListener<Future<? super Void>> preventMaskingExceptionOnFailureListener(Throwable throwable) {
return future -> {
if (!future.isSuccess() && throwable != null) {
LOGGER.log(Level.FINE, throwable, () -> log("Response completion failed when handling an error."));
}
};
}

private GenericFutureListener<Future<? super Void>> completeOnSuccessListener(Throwable throwable) {
return future -> {
if (future.isSuccess()) {
Expand All @@ -254,24 +232,22 @@ public void onNext(DataChunk data) {

DefaultHttpContent httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(data.data()));

runOnOutboundEventLoopThread(() -> {
LOGGER.finest(() -> log("Sending data chunk on event loop thread."));

ChannelFuture channelFuture;
if (data.flush()) {
channelFuture = ctx.writeAndFlush(httpContent);
} else {
channelFuture = ctx.write(httpContent);
}

channelFuture
.addListener(future -> {
data.release();
LOGGER.finest(() -> log("Data chunk sent with result: " + future.isSuccess()));
})
.addListener(completeOnFailureListener("Failure when sending a content!"))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
});
LOGGER.finest(() -> log("Sending data chunk on event loop thread."));

ChannelFuture channelFuture;
if (data.flush()) {
channelFuture = ctx.writeAndFlush(httpContent);
} else {
channelFuture = ctx.write(httpContent);
}

channelFuture
.addListener(future -> {
data.release();
LOGGER.finest(() -> log("Data chunk sent with result: " + future.isSuccess()));
})
.addListener(completeOnFailureListener("Failure when sending a content!"))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

}
}
Expand All @@ -280,46 +256,6 @@ private String log(String s) {
return "(reqID: " + requestId + ") " + s;
}

/**
* Runs the given runnable on an outbound event loop {@link #thread}.
*
* @param runnable the runnable to run
*/
private void runOnOutboundEventLoopThread(Runnable runnable) {
if (Thread.currentThread() != thread) {
// not executing in the originating thread
ChannelHandlerContext context = ctx.pipeline().context(ChannelOutboundHandler.class);
if (context == null) {
throw new ConnectionClosedException("The connection was closed.");
}
EventExecutor executor = context.executor();

CountDownLatch latch = new CountDownLatch(1);
executor.execute(() -> {
if (Thread.currentThread() != thread) {
throw new IllegalStateException(String.format("Assertion error! Current thread '%s' != expected one '%s'",
Thread.currentThread(),
thread));
}
// it is safe to count down before the runnable itself as it is guarantied the
// runnable will be executed before anything else
latch.countDown();
runnable.run();
});

try {
if (!latch.await(30, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out while waiting for a message to be written on the event loop.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for the task to be executed on an event loop thread", e);
}
} else {
runnable.run();
}
}

@Override
public void onError(Throwable thr) {
completeInternal(thr);
Expand Down