diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWriteContext.java b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWriteContext.java index 1493a528788..f3b81043a57 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWriteContext.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWriteContext.java @@ -20,12 +20,12 @@ import io.micronaut.core.annotation.NonNull; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpChunkedInput; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpResponse; import org.reactivestreams.Publisher; -import java.io.RandomAccessFile; +import java.io.InputStream; +import java.util.concurrent.ExecutorService; /** * This interface is used to write the different kinds of netty responses. @@ -68,20 +68,11 @@ default void writeFull(@NonNull FullHttpResponse response) { void writeStreamed(@NonNull HttpResponse response, @NonNull Publisher content); /** - * Write a response with a {@link HttpChunkedInput} body. + * Write a response with a body that is a blocking stream. * - * @param response The response. Must not be a {@link FullHttpResponse} - * @param chunkedInput The response body + * @param response The response. Must not be a {@link FullHttpResponse} + * @param stream The stream to read from + * @param executorService The executor for IO operations */ - void writeChunked(@NonNull HttpResponse response, @NonNull HttpChunkedInput chunkedInput); - - /** - * Write a response with a body that is a section of a {@link RandomAccessFile}. - * - * @param response The response. Must not be a {@link FullHttpResponse} - * @param randomAccessFile File to read from - * @param position Start position - * @param contentLength Length of the section to send - */ - void writeFile(@NonNull HttpResponse response, @NonNull RandomAccessFile randomAccessFile, long position, long contentLength); + void writeStream(@NonNull HttpResponse response, @NonNull InputStream stream, @NonNull ExecutorService executorService); } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java index eb472a7325e..fe6fd2e9a20 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java @@ -61,7 +61,6 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandshakeCompletionEvent; -import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import io.netty.incubator.codec.http3.Http3; import io.netty.incubator.codec.http3.Http3FrameToHttpObjectCodec; @@ -587,23 +586,20 @@ private void insertHttp2DownstreamHandlers() { registerMicronautChannelHandlers(); - insertMicronautHandlers(false); + insertMicronautHandlers(); } /** * Insert the handlers that manage the micronaut message handling, e.g. conversion between micronaut requests * and netty requests, and routing. */ - private void insertMicronautHandlers(boolean zeroCopySupported) { + private void insertMicronautHandlers() { channel.attr(STREAM_PIPELINE_ATTRIBUTE.get()).set(this); if (sslHandler != null) { channel.attr(CERTIFICATE_SUPPLIER_ATTRIBUTE.get()).set(sslHandler.findPeerCert()); } SmartHttpContentCompressor contentCompressor = new SmartHttpContentCompressor(embeddedServices.getHttpCompressionStrategy()); - if (zeroCopySupported) { - channel.attr(PipeliningServerHandler.ZERO_COPY_PREDICATE.get()).set(contentCompressor); - } pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_COMPRESSOR, contentCompressor); pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_DECOMPRESSOR, new HttpContentDecompressor()); @@ -619,7 +615,6 @@ private void insertMicronautHandlers(boolean zeroCopySupported) { ) ); } - pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK, new ChunkedWriteHandler()); // todo: move to PipeliningServerHandler RequestHandler requestHandler = routingInBoundHandler; if (webSocketUpgradeHandler.isPresent()) { @@ -645,7 +640,7 @@ private void insertHttp1DownstreamHandlers() { registerMicronautChannelHandlers(); pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_KEEP_ALIVE, new HttpServerKeepAliveHandler()); - insertMicronautHandlers(sslHandler == null); + insertMicronautHandlers(); } /** diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java index 37d12372161..3ed2d76d223 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java @@ -27,13 +27,14 @@ import io.micronaut.http.netty.body.NettyBodyWriter; import io.micronaut.http.netty.body.NettyWriteContext; import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; +import io.micronaut.scheduling.TaskExecutors; import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.HttpChunkedInput; -import io.netty.handler.stream.ChunkedStream; +import jakarta.inject.Named; import jakarta.inject.Singleton; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.ExecutorService; /** * Body writer for {@link InputStream}s. @@ -45,9 +46,11 @@ @Experimental @Singleton public final class InputStreamBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter { + private final ExecutorService executorService; - InputStreamBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) { + InputStreamBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService executorService) { super(configuration); + this.executorService = executorService; } @Override @@ -59,7 +62,7 @@ public void writeTo(HttpRequest request, MutableHttpResponse out nettyResponse.getNettyHeaders() ); // can be null if the stream was closed - nettyContext.writeChunked(finalResponse, new HttpChunkedInput(new ChunkedStream(object))); + nettyContext.writeStream(finalResponse, object, executorService); } else { throw new IllegalArgumentException("Unsupported response type. Not a Netty response: " + outgoingResponse); } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java index d5d0b27f780..4ec84d7fdf0 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java @@ -28,16 +28,17 @@ import io.micronaut.http.netty.body.NettyWriteContext; import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; import io.micronaut.http.server.types.files.StreamedFile; +import io.micronaut.scheduling.TaskExecutors; import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.HttpChunkedInput; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.stream.ChunkedStream; +import jakarta.inject.Named; import jakarta.inject.Singleton; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.ExecutorService; /** * Body writer for {@link StreamedFile}s. @@ -49,8 +50,11 @@ @Experimental @Internal public final class StreamFileBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter { - StreamFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) { + private final ExecutorService ioExecutor; + + StreamFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) { super(configuration); + this.ioExecutor = ioExecutor; } @Override @@ -72,8 +76,7 @@ public void writeTo(HttpRequest request, MutableHttpResponse ou nettyHeaders ); InputStream inputStream = object.getInputStream(); - HttpChunkedInput chunkedInput = new HttpChunkedInput(new ChunkedStream(inputStream)); - nettyContext.writeChunked(finalResponse, chunkedInput); + nettyContext.writeStream(finalResponse, inputStream, ioExecutor); } } else { diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java index 1c90b09a826..fcaa64bff59 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java @@ -33,15 +33,22 @@ import io.micronaut.http.netty.body.NettyWriteContext; import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; import io.micronaut.http.server.types.files.SystemFile; +import io.micronaut.scheduling.TaskExecutors; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; +import jakarta.inject.Named; import jakarta.inject.Singleton; +import org.jetbrains.annotations.NotNull; +import java.io.EOFException; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; -import java.io.RandomAccessFile; +import java.util.concurrent.ExecutorService; import static io.micronaut.http.HttpHeaders.CONTENT_RANGE; @@ -57,8 +64,11 @@ public final class SystemFileBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter { private static final String UNIT_BYTES = "bytes"; - public SystemFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) { + private final ExecutorService ioExecutor; + + public SystemFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) { super(configuration); + this.ioExecutor = ioExecutor; } @Override @@ -113,34 +123,22 @@ public void writeTo(HttpRequest request, MutableHttpResponse resp // Write the request data final DefaultHttpResponse finalResponse = new DefaultHttpResponse(nettyResponse.getNettyHttpVersion(), nettyResponse.getNettyHttpStatus(), nettyResponse.getNettyHeaders()); - writeFile(systemFile, nettyContext, position, contentLength, finalResponse); + + File file = systemFile.getFile(); + InputStream is; + try { + is = new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new MessageBodyException("Could not find file", e); + } + + nettyContext.writeStream(finalResponse, new RangeInputStream(is, position, contentLength), ioExecutor); } } else { throw new IllegalArgumentException("Unsupported response type. Not a Netty response: " + response); } } - private static void writeFile(SystemFile systemFile, NettyWriteContext context, long position, long contentLength, DefaultHttpResponse finalResponse) { - // Write the content. - File file = systemFile.getFile(); - RandomAccessFile randomAccessFile = open(file); - - context.writeFile( - finalResponse, - randomAccessFile, - position, - contentLength - ); - } - - private static RandomAccessFile open(File file) { - try { - return new RandomAccessFile(file, "r"); - } catch (FileNotFoundException e) { - throw new MessageBodyException("Could not find file", e); - } - } - @Nullable private static IntRange parseRangeHeader(String value, long contentLength) { int equalsIdx = value.indexOf('='); @@ -175,4 +173,72 @@ private static class IntRange { } } + private static final class RangeInputStream extends InputStream { + private final InputStream delegate; + private final long toSkip; + private long remainingLength; + private boolean skipped = false; + private boolean skipSuccess = false; + + private RangeInputStream(InputStream delegate, long toSkip, long length) { + this.delegate = delegate; + this.toSkip = toSkip; + this.remainingLength = length; + + if (toSkip == 0) { + skipped = true; + skipSuccess = true; + } + } + + private boolean doSkip() throws IOException { + if (!skipped) { + skipped = true; + try { + delegate.skipNBytes(toSkip); + skipSuccess = true; + } catch (EOFException ignored) { + } + } + return skipSuccess; + } + + @Override + public int read() throws IOException { + if (!doSkip()) { + return -1; + } + if (remainingLength <= 0) { + return -1; + } + int read = delegate.read(); + if (read != -1) { + remainingLength--; + } + return read; + } + + @Override + public int read(@NotNull byte[] b, int off, int len) throws IOException { + if (!doSkip()) { + return -1; + } + if (remainingLength <= 0) { + return -1; + } + if (len > remainingLength) { + len = (int) remainingLength; + } + int n = delegate.read(b, off, len); + if (n != -1) { + remainingLength -= n; + } + return n; + } + + @Override + public void close() throws IOException { + delegate.close(); + } + } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java index d2121ce377d..20e33d055d3 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java @@ -18,13 +18,10 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; -import io.micronaut.core.util.SupplierUtil; -import io.micronaut.http.exceptions.MessageBodyException; import io.micronaut.http.netty.body.NettyWriteContext; import io.micronaut.http.netty.stream.DelegateStreamedHttpRequest; import io.micronaut.http.netty.stream.EmptyHttpRequest; import io.micronaut.http.netty.stream.StreamedHttpResponse; -import io.micronaut.http.server.netty.SmartHttpContentCompressor; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; @@ -32,14 +29,12 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.DefaultFileRegion; import io.netty.channel.EventLoop; -import io.netty.channel.FileRegion; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpChunkedInput; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; @@ -49,15 +44,8 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.stream.ChunkedFile; -import io.netty.handler.stream.ChunkedInput; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.AttributeKey; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.ResourceLeakDetector; -import io.netty.util.ResourceLeakDetectorFactory; -import io.netty.util.ResourceLeakTracker; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -67,15 +55,15 @@ import reactor.core.publisher.Sinks; import reactor.util.concurrent.Queues; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; +import java.io.InputStream; +import java.io.InterruptedIOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Queue; -import java.util.function.Supplier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; /** * Netty handler that handles incoming {@link HttpRequest}s and forwards them to a @@ -86,9 +74,6 @@ */ @Internal public final class PipeliningServerHandler extends ChannelInboundHandlerAdapter { - public static final Supplier> ZERO_COPY_PREDICATE = - SupplierUtil.memoized(() -> AttributeKey.newInstance("zero-copy-predicate")); - private static final int LENGTH_8K = 8192; private static final Logger LOG = LoggerFactory.getLogger(PipeliningServerHandler.class); @@ -319,24 +304,6 @@ void readComplete() { } } - /** - * Wrapper class for a netty response with a special body type, like - * {@link HttpChunkedInput} or - * {@link FileRegion}. - * - * @param response The response - * @param body The body, or {@code null} if there is no body - * @param needLast Whether to finish the response with a - * {@link LastHttpContent} - */ - private record CustomResponse(HttpResponse response, @Nullable Object body, boolean needLast) { - CustomResponse { - if (response instanceof FullHttpResponse) { - throw new IllegalArgumentException("Response must not be a FullHttpResponse to send a special body"); - } - } - } - /** * Base {@link InboundHandler} that handles {@link HttpRequest}s and then determines how to * deal with the body. @@ -673,39 +640,10 @@ public void writeStreamed(HttpResponse response, Publisher content) content.subscribe(new StreamingOutboundHandler(this, response)); } - /** - * Write a response with a special body - * ({@link io.netty.handler.codec.http.HttpChunkedInput}, - * {@link io.micronaut.http.server.types.files.SystemFile}). - * - * @param response The response to write - */ - private void writeStreamed(CustomResponse response) { - preprocess(response.response()); - write(new ChunkedOutboundHandler(this, response)); - } - - @Override - public void writeChunked(HttpResponse response, HttpChunkedInput chunkedInput) { - writeStreamed(new CustomResponse(response, chunkedInput, false)); - } - @Override - public void writeFile(HttpResponse response, RandomAccessFile randomAccessFile, long position, long contentLength) { - SmartHttpContentCompressor predicate = ctx.channel().attr(ZERO_COPY_PREDICATE.get()).get(); - if (predicate != null && predicate.shouldSkip(response)) { - // SSL not enabled - can use zero-copy file transfer. - writeStreamed(new CustomResponse(response, new TrackedDefaultFileRegion(randomAccessFile.getChannel(), position, contentLength), true)); - } else { - // SSL enabled - cannot use zero-copy file transfer. - try { - // HttpChunkedInput will write the end marker (LastHttpContent) for us. - final HttpChunkedInput chunkedInput = new HttpChunkedInput(new TrackedChunkedFile(randomAccessFile, position, contentLength, LENGTH_8K)); - writeStreamed(new CustomResponse(response, chunkedInput, false)); - } catch (IOException e) { - throw new MessageBodyException("Could not read file", e); - } - } + public void writeStream(HttpResponse response, InputStream stream, ExecutorService executorService) { + preprocess(response); + write(new BlockingOutboundHandler(this, response, stream, executorService)); } } @@ -912,89 +850,154 @@ void discard() { } } - /** - * Handler that writes a files etc. - */ - private final class ChunkedOutboundHandler extends OutboundHandler { - private final CustomResponse message; - - ChunkedOutboundHandler(OutboundAccess outboundAccess, CustomResponse message) { + private final class BlockingOutboundHandler extends OutboundHandler { + private static final int QUEUE_SIZE = 2; + + private final HttpResponse response; + private final InputStream stream; + private final ExecutorService blockingExecutor; + + private final Queue queue = new ArrayDeque<>(QUEUE_SIZE); + private Future worker = null; + private boolean workerReady = false; + private boolean discard = false; + private boolean done = false; + private boolean producerWaiting = false; + private boolean consumerWaiting = false; + + BlockingOutboundHandler( + OutboundAccess outboundAccess, + HttpResponse response, + InputStream stream, + ExecutorService blockingExecutor) { super(outboundAccess); - this.message = message; + this.response = response; + this.stream = stream; + this.blockingExecutor = blockingExecutor; } @Override void writeSome() { - boolean responseIsLast = message.body() == null && !message.needLast(); - write(message.response(), responseIsLast, responseIsLast && outboundAccess.closeAfterWrite); - if (message.body() != null) { - boolean bodyIsLast = !message.needLast(); - write(message.body(), bodyIsLast, bodyIsLast && outboundAccess.closeAfterWrite); - } - if (message.needLast()) { - write(LastHttpContent.EMPTY_LAST_CONTENT, true, outboundAccess.closeAfterWrite); - } - outboundHandler = null; - requestHandler.responseWritten(outboundAccess.attachment); - PipeliningServerHandler.this.writeSome(); + if (worker == null) { + write(response, false, false); + worker = blockingExecutor.submit(this::work); + } + do { + ByteBuf msg; + synchronized (this) { + if (producerWaiting) { + producerWaiting = false; + notifyAll(); + } + msg = queue.poll(); + if (msg == null && !this.done) { + consumerWaiting = true; + break; + } + } + if (msg == null) { + // this.done == true inside the synchronized block + write(LastHttpContent.EMPTY_LAST_CONTENT, true, false); + + outboundHandler = null; + requestHandler.responseWritten(outboundAccess.attachment); + PipeliningServerHandler.this.writeSome(); + break; + } else { + write(new DefaultHttpContent(msg), true, false); + } + } while (ctx.channel().isWritable()); } @Override void discard() { - ReferenceCountUtil.release(message.response()); - if (message.body() instanceof ChunkedInput ci) { - try { - ci.close(); - } catch (Exception e) { - if (LOG.isWarnEnabled()) { - LOG.warn("Failed to close ChunkedInput", e); - } + discard = true; + if (worker == null) { + worker = blockingExecutor.submit(this::work); + } else { + synchronized (this) { + if (workerReady) { + worker.cancel(true); + // in case the worker was already done, drain buffers + drain(); + } // else worker is still setting up and will see the discard flag in due time } - } else if (message.body() instanceof FileRegion fr) { - fr.release(); } - outboundHandler = null; } - } - private static class TrackedDefaultFileRegion extends DefaultFileRegion { - //to avoid initializing Netty at build time - private static final Supplier> LEAK_DETECTOR = SupplierUtil.memoized(() -> - ResourceLeakDetectorFactory.instance().newResourceLeakDetector(TrackedDefaultFileRegion.class)); - - private final ResourceLeakTracker tracker; - - public TrackedDefaultFileRegion(FileChannel fileChannel, long position, long count) { - super(fileChannel, position, count); - this.tracker = LEAK_DETECTOR.get().track(this); - } + private void work() { + ByteBuf buf = null; + try (InputStream stream = this.stream) { + synchronized (this) { + this.workerReady = true; + if (this.discard) { + // don't read + return; + } + } + while (true) { + buf = ctx.alloc().heapBuffer(LENGTH_8K); + int n = buf.writeBytes(stream, LENGTH_8K); + synchronized (this) { + if (n == -1) { + done = true; + wakeConsumer(); + break; + } + while (queue.size() >= QUEUE_SIZE && !discard) { + producerWaiting = true; + wait(); + } + if (discard) { + break; + } + queue.add(buf); + // buf is now owned by the queue + buf = null; + + wakeConsumer(); + } + } + } catch (InterruptedException | InterruptedIOException ignored) { + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("InputStream threw an error during read. This error cannot be forwarded to the client. Please make sure any errors are thrown by the controller instead.", e); + } + } finally { + // if we failed to add a buffer to the queue, release it + if (buf != null) { + buf.release(); + } + synchronized (this) { + done = true; - @Override - protected void deallocate() { - super.deallocate(); - if (tracker != null) { - tracker.close(this); + if (discard) { + drain(); + } + } } } - } - - private static class TrackedChunkedFile extends ChunkedFile { - //to avoid initializing Netty at build time - private static final Supplier> LEAK_DETECTOR = SupplierUtil.memoized(() -> - ResourceLeakDetectorFactory.instance().newResourceLeakDetector(TrackedChunkedFile.class)); - private final ResourceLeakTracker tracker; + private void wakeConsumer() { + assert Thread.holdsLock(this); - public TrackedChunkedFile(RandomAccessFile file, long offset, long length, int chunkSize) throws IOException { - super(file, offset, length, chunkSize); - this.tracker = LEAK_DETECTOR.get().track(this); + if (!discard && consumerWaiting) { + consumerWaiting = false; + ctx.executor().execute(PipeliningServerHandler.this::writeSome); + } } - @Override - public void close() throws Exception { - super.close(); - if (tracker != null) { - tracker.close(this); + private void drain() { + assert Thread.holdsLock(this); + + ByteBuf buf; + while (true) { + buf = queue.poll(); + if (buf != null) { + buf.release(); + } else { + break; + } } } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/accesslog/HttpAccessLogHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/accesslog/HttpAccessLogHandler.java index 5c84f273165..1a1252f9173 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/accesslog/HttpAccessLogHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/accesslog/HttpAccessLogHandler.java @@ -151,10 +151,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } - private void log(ChannelHandlerContext ctx, Object msg, ChannelPromise promise, AccessLog accessLog) { + private void log(ChannelHandlerContext ctx, Object msg, ChannelPromise promise, AccessLog accessLog, AccessLogHolder accessLogHolder) { ctx.write(msg, promise.unvoid()).addListener(future -> { if (future.isSuccess()) { accessLog.log(logger); + accessLogHolder.logForReuse = accessLog; } }); } @@ -171,7 +172,7 @@ private void processWriteEvent(ChannelHandlerContext ctx, Object msg, ChannelPro } if (msg instanceof LastHttpContent content) { accessLogger.onLastResponseWrite(content.content().readableBytes()); - log(ctx, msg, promise, accessLogger); + log(ctx, msg, promise, accessLogger, accessLogHolder); return; } else if (msg instanceof ByteBufHolder holder) { accessLogger.onResponseWrite(holder.content().readableBytes()); @@ -224,9 +225,7 @@ void excludeRequest() { @Nullable AccessLog getLogForResponse(boolean finishResponse) { if (finishResponse) { - AccessLog accessLog = liveLogs.poll(); - logForReuse = accessLog; - return accessLog; + return liveLogs.poll(); } else { return liveLogs.peek(); } diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/StreamPressureSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/StreamPressureSpec.groovy new file mode 100644 index 00000000000..6df809d4800 --- /dev/null +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/stream/StreamPressureSpec.groovy @@ -0,0 +1,101 @@ +package io.micronaut.http.server.netty.stream + +import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Requires +import io.micronaut.core.io.buffer.ByteBuffer +import io.micronaut.http.HttpRequest +import io.micronaut.http.annotation.Controller +import io.micronaut.http.annotation.Get +import io.micronaut.http.client.HttpClient +import io.micronaut.http.client.StreamingHttpClient +import io.micronaut.runtime.server.EmbeddedServer +import reactor.core.publisher.Flux +import spock.lang.Specification + +import java.util.concurrent.ThreadLocalRandom + +class StreamPressureSpec extends Specification { + def 'producer pressure'() { + given: + def data = new byte[1024 * 1024 * 4] + ThreadLocalRandom.current().nextBytes(data) + + def ctx = ApplicationContext.run(['spec.name': 'StreamPressureSpec']) + ctx.getBean(MyController).stream = new ByteArrayInputStream(data) + + def server = ctx.getBean(EmbeddedServer) + server.start() + def client = ctx.createBean(HttpClient, server.URI).toBlocking() + + expect: + client.retrieve("/stream-pressure", byte[]) == data + + cleanup: + server.stop() + client.close() + ctx.close() + } + + def 'consumer pressure'() { + given: + def ctx = ApplicationContext.run(['spec.name': 'StreamPressureSpec']) + + byte[] data = new byte[1024 * 1024] + ThreadLocalRandom.current().nextBytes(data) + def serverStream = new PipedOutputStream() + ctx.getBean(MyController).stream = new PipedInputStream(serverStream) + + def clientOStream = new PipedOutputStream() + def clientIStream = new PipedInputStream(clientOStream) + + def server = ctx.getBean(EmbeddedServer) + server.start() + def client = ctx.createBean(StreamingHttpClient, server.URI) + + when: + Flux.from(client.dataStream(HttpRequest.GET("/stream-pressure"))).subscribe { + clientOStream.write(it.toByteArray()) + } + serverStream.write(data) + serverStream.flush() + then: + clientIStream.readNBytes(data.length) == data + + when: + serverStream.write(data) + serverStream.flush() + then: + clientIStream.readNBytes(data.length) == data + + cleanup: + serverStream.close() + clientIStream.close() + server.stop() + client.close() + ctx.close() + } + + private byte[] read(Iterator> itr, int n) { + byte[] out = new byte[n] + int off = 0 + while (n > 0) { + def buf = itr.next() + def chunkN = buf.readableBytes() + buf.read(out, off, chunkN) + off += chunkN + n -= chunkN + } + return out + } + + @Requires(property = "spec.name", value = "StreamPressureSpec") + @Controller + static class MyController { + InputStream stream + + @Get("/stream-pressure") + InputStream get() { + return this.stream + } + } +}