diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/OutputStreamPublisher.java b/common/reactive/src/main/java/io/helidon/common/reactive/OutputStreamPublisher.java index b725fc7d455..8cbbbfd0e66 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/OutputStreamPublisher.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/OutputStreamPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -87,7 +87,7 @@ private void publish(byte[] buffer, int offset, int length) throws IOException { throw new IOException("Output stream already closed."); } - sub.onNext(ByteBuffer.wrap(buffer, offset, length)); + sub.onNext(createBuffer(buffer, offset, length)); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -114,4 +114,20 @@ private void complete(Throwable t) { } }); } + + /** + * Creates a {@link ByteBuffer} by making a copy of the underlying + * byte array. Jersey will reuse this array, so it needs to be + * copied here. + * + * @param buffer The buffer. + * @param offset Offset in buffer. + * @param length Length of buffer. + * @return Newly created {@link ByteBuffer}. + */ + private ByteBuffer createBuffer(byte[] buffer, int offset, int length) { + ByteBuffer byteBuffer = ByteBuffer.allocate(length - offset); + byteBuffer.put(buffer, offset, length); + return (ByteBuffer) byteBuffer.clear(); // resets counters + } } diff --git a/webserver/jersey/src/main/java/io/helidon/webserver/jersey/ResponseWriter.java b/webserver/jersey/src/main/java/io/helidon/webserver/jersey/ResponseWriter.java index 8c1827dc64e..8274e0ea6cf 100644 --- a/webserver/jersey/src/main/java/io/helidon/webserver/jersey/ResponseWriter.java +++ b/webserver/jersey/src/main/java/io/helidon/webserver/jersey/ResponseWriter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,8 @@ package io.helidon.webserver.jersey; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -142,28 +138,9 @@ public void write(int b) throws IOException { // in case of SSE every response chunk needs to be flushed boolean doFlush = MediaType.SERVER_SENT_EVENTS_TYPE.isCompatible(context.getMediaType()); - res.send( - ReactiveStreamsAdapter.publisherToFlow( - ReactiveStreamsAdapter.publisherFromFlow(publisher) - /* - The problem is that Jersey reuses the byte[] array it uses for writing - to the OutputStream.write() method. As such we must duplicate the array - because we need to return from the 'write()' method sooner than when - the underlying TCP server flushes the bytes. - - Following fails when writing large amount of data to the response: - .map(byteBuffer -> new ResponseChunk(false, byteBuffer)))); - */ - .map(byteBuffer -> { - try { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - WritableByteChannel ch = Channels.newChannel(stream); - ch.write(byteBuffer); - return DataChunk.create(doFlush, ByteBuffer.wrap(stream.toByteArray())); - } catch (IOException e) { - throw new IllegalStateException("this never happens", e); - } - }))); + res.send(ReactiveStreamsAdapter.publisherToFlow( + ReactiveStreamsAdapter.publisherFromFlow(publisher) + .map(byteBuffer -> DataChunk.create(doFlush, byteBuffer)))); return publisher; } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java index afab6caae90..f59e4d86669 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java @@ -21,8 +21,6 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; import java.util.logging.Level; @@ -36,7 +34,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandler; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.DefaultLastHttpContent; @@ -46,7 +43,6 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @@ -79,11 +75,11 @@ class BareResponseImpl implements BareResponse { private volatile Flow.Subscription subscription; /** - * @param ctx the channel handler context - * @param request the request + * @param ctx the channel handler context + * @param request the request * @param requestContentConsumed whether the request content is consumed - * @param thread the outbound event loop thread which will be used to write the response - * @param requestId the correlation ID that is added to the log statements + * @param thread the outbound event loop thread which will be used to write the response + * @param requestId the correlation ID that is added to the log statements */ BareResponseImpl(ChannelHandlerContext ctx, HttpRequest request, @@ -103,10 +99,9 @@ class BareResponseImpl implements BareResponse { this.ctx = ctx; this.requestId = requestId; ctx.channel() - .closeFuture() - // to make this work, when programmatically closing the channel, the responseFuture must be closed beforehand! - .addListener(channelFuture -> responseFuture - .completeExceptionally(CLOSED)); + .closeFuture() + // to make this work, when programmatically closing the channel, the responseFuture must be closed beforehand! + .addListener(channelFuture -> responseFuture.completeExceptionally(CLOSED)); this.keepAlive = HttpUtil.isKeepAlive(request); this.requestHeaders = request.headers(); } @@ -137,19 +132,15 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map { - ctx.writeAndFlush(response) - .addListener(future -> { + LOGGER.finest(() -> log("Writing headers: " + status)); + ctx.writeAndFlush(response) + .addListener(future -> { if (future.isSuccess()) { headersFuture.complete(this); } }) - .addListener(completeOnFailureListener("An exception occurred when writing headers.")) - .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - - LOGGER.finest(() -> log("Writing headers: " + status)); - }); - headersFuture.complete(this); + .addListener(completeOnFailureListener("An exception occurred when writing headers.")) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } /** @@ -180,36 +171,31 @@ private void completeInternal(Throwable throwable) { } if (keepAlive) { - runOnOutboundEventLoopThread(() -> { - LOGGER.finest(() -> log("Writing an empty last http content; keep-alive: true")); + LOGGER.finest(() -> log("Writing an empty last http content; keep-alive: true")); - if (!requestContentConsumed.getAsBoolean()) { - // the request content wasn't read, close the connection once the content is fully written. - LOGGER.finer(() -> log("Request content not fully read; trying to keep the connection; keep-alive: true")); + if (!requestContentConsumed.getAsBoolean()) { + // the request content wasn't read, close the connection once the content is fully written. + LOGGER.finer(() -> log("Request content not fully read; trying to keep the connection; keep-alive: true")); - // if content is not consumed, we need to trigger next chunk read in order to not get stuck forever; the - // connection will be closed in the ForwardingHandler in case there is more than just small amount of data - ctx.channel().read(); - } + // if content is not consumed, we need to trigger next chunk read in order to not get stuck forever; the + // connection will be closed in the ForwardingHandler in case there is more than just small amount of data + ctx.channel().read(); + } - writeLastContent(throwable, ChannelFutureListener.CLOSE_ON_FAILURE); - }); + writeLastContent(throwable, ChannelFutureListener.CLOSE_ON_FAILURE); } else { - // If keep-alive is off, close the connection once the content is fully written. - runOnOutboundEventLoopThread(() -> { - LOGGER.finest(() -> log("Closing with an empty buffer; keep-alive: " + keepAlive)); - writeLastContent(throwable, ChannelFutureListener.CLOSE); - }); + LOGGER.finest(() -> log("Closing with an empty buffer; keep-alive: " + keepAlive)); + + writeLastContent(throwable, ChannelFutureListener.CLOSE); } } private void writeLastContent(final Throwable throwable, final ChannelFutureListener closeAction) { ctx.writeAndFlush(LAST_HTTP_CONTENT) - .addListener(completeOnFailureListener("An exception occurred when writing last http content.")) - .addListener(preventMaskingExceptionOnFailureListener(throwable)) - .addListener(completeOnSuccessListener(throwable)) - .addListener(closeAction); + .addListener(completeOnFailureListener("An exception occurred when writing last http content.")) + .addListener(completeOnSuccessListener(throwable)) + .addListener(closeAction); } private GenericFutureListener> completeOnFailureListener(String message) { @@ -220,14 +206,6 @@ private GenericFutureListener> completeOnFailureListener(St }; } - private GenericFutureListener> preventMaskingExceptionOnFailureListener(Throwable throwable) { - return future -> { - if (!future.isSuccess() && throwable != null) { - LOGGER.log(Level.FINE, throwable, () -> log("Response completion failed when handling an error.")); - } - }; - } - private GenericFutureListener> completeOnSuccessListener(Throwable throwable) { return future -> { if (future.isSuccess()) { @@ -254,24 +232,22 @@ public void onNext(DataChunk data) { DefaultHttpContent httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(data.data())); - runOnOutboundEventLoopThread(() -> { - LOGGER.finest(() -> log("Sending data chunk on event loop thread.")); - - ChannelFuture channelFuture; - if (data.flush()) { - channelFuture = ctx.writeAndFlush(httpContent); - } else { - channelFuture = ctx.write(httpContent); - } - - channelFuture - .addListener(future -> { - data.release(); - LOGGER.finest(() -> log("Data chunk sent with result: " + future.isSuccess())); - }) - .addListener(completeOnFailureListener("Failure when sending a content!")) - .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - }); + LOGGER.finest(() -> log("Sending data chunk on event loop thread.")); + + ChannelFuture channelFuture; + if (data.flush()) { + channelFuture = ctx.writeAndFlush(httpContent); + } else { + channelFuture = ctx.write(httpContent); + } + + channelFuture + .addListener(future -> { + data.release(); + LOGGER.finest(() -> log("Data chunk sent with result: " + future.isSuccess())); + }) + .addListener(completeOnFailureListener("Failure when sending a content!")) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } @@ -280,46 +256,6 @@ private String log(String s) { return "(reqID: " + requestId + ") " + s; } - /** - * Runs the given runnable on an outbound event loop {@link #thread}. - * - * @param runnable the runnable to run - */ - private void runOnOutboundEventLoopThread(Runnable runnable) { - if (Thread.currentThread() != thread) { - // not executing in the originating thread - ChannelHandlerContext context = ctx.pipeline().context(ChannelOutboundHandler.class); - if (context == null) { - throw new ConnectionClosedException("The connection was closed."); - } - EventExecutor executor = context.executor(); - - CountDownLatch latch = new CountDownLatch(1); - executor.execute(() -> { - if (Thread.currentThread() != thread) { - throw new IllegalStateException(String.format("Assertion error! Current thread '%s' != expected one '%s'", - Thread.currentThread(), - thread)); - } - // it is safe to count down before the runnable itself as it is guarantied the - // runnable will be executed before anything else - latch.countDown(); - runnable.run(); - }); - - try { - if (!latch.await(30, TimeUnit.SECONDS)) { - throw new IllegalStateException("Timed out while waiting for a message to be written on the event loop."); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for the task to be executed on an event loop thread", e); - } - } else { - runnable.run(); - } - } - @Override public void onError(Throwable thr) { completeInternal(thr);