Skip to content

Commit

Permalink
Replace chunked write API (micronaut-projects#10138)
Browse files Browse the repository at this point in the history
* Replace chunked write API
This PR replaces the writeChunked and writeFile APIs with a new writeStream API that takes an InputStream. This removes the need for the ChunkedWriteHandler.

Chunked writes were used for two purposes: Sending file regions and sending InputStreams. This has always complicated the HTTP pipeline somewhat as the pipeline had to deal with not just HttpContent objects but also ChunkedInput and FileRegion objects.

This PR replaces the machinery for InputStream writing with a more straight-forward solution that reads the data on the IO thread and then sends it down the channel.

Additionally, the file-specific APIs based on RandomAccessFile are removed. The body writer now just creates an InputStream for the file region in question and sends that. This removes support for zero-copy transfers, however that is a niche feature anyway because it doesn't work with TLS or HTTP/2. If someone wants a performant HTTP server, HTTP/2 takes priority over zero-copy so it makes little sense.

This PR may have small conflicts with micronaut-projects#10131 as that PR changed the PipeliningServerHandler body handling a little bit. Otherwise this PR should have no visible impact on users.

* remove unused class

* remove unused class
  • Loading branch information
yawkat authored Dec 8, 2023
1 parent 3bffffa commit cfc3092
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import io.micronaut.core.annotation.NonNull;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import org.reactivestreams.Publisher;

import java.io.RandomAccessFile;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;

/**
* This interface is used to write the different kinds of netty responses.
Expand Down Expand Up @@ -68,20 +68,11 @@ default void writeFull(@NonNull FullHttpResponse response) {
void writeStreamed(@NonNull HttpResponse response, @NonNull Publisher<HttpContent> content);

/**
* Write a response with a {@link HttpChunkedInput} body.
* Write a response with a body that is a blocking stream.
*
* @param response The response. <b>Must not</b> be a {@link FullHttpResponse}
* @param chunkedInput The response body
* @param response The response. <b>Must not</b> be a {@link FullHttpResponse}
* @param stream The stream to read from
* @param executorService The executor for IO operations
*/
void writeChunked(@NonNull HttpResponse response, @NonNull HttpChunkedInput chunkedInput);

/**
* Write a response with a body that is a section of a {@link RandomAccessFile}.
*
* @param response The response. <b>Must not</b> be a {@link FullHttpResponse}
* @param randomAccessFile File to read from
* @param position Start position
* @param contentLength Length of the section to send
*/
void writeFile(@NonNull HttpResponse response, @NonNull RandomAccessFile randomAccessFile, long position, long contentLength);
void writeStream(@NonNull HttpResponse response, @NonNull InputStream stream, @NonNull ExecutorService executorService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.incubator.codec.http3.Http3;
import io.netty.incubator.codec.http3.Http3FrameToHttpObjectCodec;
Expand Down Expand Up @@ -587,23 +586,20 @@ private void insertHttp2DownstreamHandlers() {

registerMicronautChannelHandlers();

insertMicronautHandlers(false);
insertMicronautHandlers();
}

/**
* Insert the handlers that manage the micronaut message handling, e.g. conversion between micronaut requests
* and netty requests, and routing.
*/
private void insertMicronautHandlers(boolean zeroCopySupported) {
private void insertMicronautHandlers() {
channel.attr(STREAM_PIPELINE_ATTRIBUTE.get()).set(this);
if (sslHandler != null) {
channel.attr(CERTIFICATE_SUPPLIER_ATTRIBUTE.get()).set(sslHandler.findPeerCert());
}

SmartHttpContentCompressor contentCompressor = new SmartHttpContentCompressor(embeddedServices.getHttpCompressionStrategy());
if (zeroCopySupported) {
channel.attr(PipeliningServerHandler.ZERO_COPY_PREDICATE.get()).set(contentCompressor);
}
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_COMPRESSOR, contentCompressor);
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_DECOMPRESSOR, new HttpContentDecompressor());

Expand All @@ -619,7 +615,6 @@ private void insertMicronautHandlers(boolean zeroCopySupported) {
)
);
}
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK, new ChunkedWriteHandler()); // todo: move to PipeliningServerHandler

RequestHandler requestHandler = routingInBoundHandler;
if (webSocketUpgradeHandler.isPresent()) {
Expand All @@ -645,7 +640,7 @@ private void insertHttp1DownstreamHandlers() {
registerMicronautChannelHandlers();
pipeline.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_KEEP_ALIVE, new HttpServerKeepAliveHandler());

insertMicronautHandlers(sslHandler == null);
insertMicronautHandlers();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import io.micronaut.http.netty.body.NettyBodyWriter;
import io.micronaut.http.netty.body.NettyWriteContext;
import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration;
import io.micronaut.scheduling.TaskExecutors;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.stream.ChunkedStream;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;

/**
* Body writer for {@link InputStream}s.
Expand All @@ -45,9 +46,11 @@
@Experimental
@Singleton
public final class InputStreamBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter<InputStream> {
private final ExecutorService executorService;

InputStreamBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) {
InputStreamBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService executorService) {
super(configuration);
this.executorService = executorService;
}

@Override
Expand All @@ -59,7 +62,7 @@ public void writeTo(HttpRequest<?> request, MutableHttpResponse<InputStream> out
nettyResponse.getNettyHeaders()
);
// can be null if the stream was closed
nettyContext.writeChunked(finalResponse, new HttpChunkedInput(new ChunkedStream(object)));
nettyContext.writeStream(finalResponse, object, executorService);
} else {
throw new IllegalArgumentException("Unsupported response type. Not a Netty response: " + outgoingResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@
import io.micronaut.http.netty.body.NettyWriteContext;
import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration;
import io.micronaut.http.server.types.files.StreamedFile;
import io.micronaut.scheduling.TaskExecutors;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.stream.ChunkedStream;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutorService;

/**
* Body writer for {@link StreamedFile}s.
Expand All @@ -49,8 +50,11 @@
@Experimental
@Internal
public final class StreamFileBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter<StreamedFile> {
StreamFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) {
private final ExecutorService ioExecutor;

StreamFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) {
super(configuration);
this.ioExecutor = ioExecutor;
}

@Override
Expand All @@ -72,8 +76,7 @@ public void writeTo(HttpRequest<?> request, MutableHttpResponse<StreamedFile> ou
nettyHeaders
);
InputStream inputStream = object.getInputStream();
HttpChunkedInput chunkedInput = new HttpChunkedInput(new ChunkedStream(inputStream));
nettyContext.writeChunked(finalResponse, chunkedInput);
nettyContext.writeStream(finalResponse, inputStream, ioExecutor);
}

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,22 @@
import io.micronaut.http.netty.body.NettyWriteContext;
import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration;
import io.micronaut.http.server.types.files.SystemFile;
import io.micronaut.scheduling.TaskExecutors;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.jetbrains.annotations.NotNull;

import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;

import static io.micronaut.http.HttpHeaders.CONTENT_RANGE;

Expand All @@ -57,8 +64,11 @@
public final class SystemFileBodyWriter extends AbstractFileBodyWriter implements NettyBodyWriter<SystemFile> {
private static final String UNIT_BYTES = "bytes";

public SystemFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) {
private final ExecutorService ioExecutor;

public SystemFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) {
super(configuration);
this.ioExecutor = ioExecutor;
}

@Override
Expand Down Expand Up @@ -113,34 +123,22 @@ public void writeTo(HttpRequest<?> request, MutableHttpResponse<SystemFile> resp

// Write the request data
final DefaultHttpResponse finalResponse = new DefaultHttpResponse(nettyResponse.getNettyHttpVersion(), nettyResponse.getNettyHttpStatus(), nettyResponse.getNettyHeaders());
writeFile(systemFile, nettyContext, position, contentLength, finalResponse);

File file = systemFile.getFile();
InputStream is;
try {
is = new FileInputStream(file);
} catch (FileNotFoundException e) {
throw new MessageBodyException("Could not find file", e);
}

nettyContext.writeStream(finalResponse, new RangeInputStream(is, position, contentLength), ioExecutor);
}
} else {
throw new IllegalArgumentException("Unsupported response type. Not a Netty response: " + response);
}
}

private static void writeFile(SystemFile systemFile, NettyWriteContext context, long position, long contentLength, DefaultHttpResponse finalResponse) {
// Write the content.
File file = systemFile.getFile();
RandomAccessFile randomAccessFile = open(file);

context.writeFile(
finalResponse,
randomAccessFile,
position,
contentLength
);
}

private static RandomAccessFile open(File file) {
try {
return new RandomAccessFile(file, "r");
} catch (FileNotFoundException e) {
throw new MessageBodyException("Could not find file", e);
}
}

@Nullable
private static IntRange parseRangeHeader(String value, long contentLength) {
int equalsIdx = value.indexOf('=');
Expand Down Expand Up @@ -175,4 +173,72 @@ private static class IntRange {
}
}

private static final class RangeInputStream extends InputStream {
private final InputStream delegate;
private final long toSkip;
private long remainingLength;
private boolean skipped = false;
private boolean skipSuccess = false;

private RangeInputStream(InputStream delegate, long toSkip, long length) {
this.delegate = delegate;
this.toSkip = toSkip;
this.remainingLength = length;

if (toSkip == 0) {
skipped = true;
skipSuccess = true;
}
}

private boolean doSkip() throws IOException {
if (!skipped) {
skipped = true;
try {
delegate.skipNBytes(toSkip);
skipSuccess = true;
} catch (EOFException ignored) {
}
}
return skipSuccess;
}

@Override
public int read() throws IOException {
if (!doSkip()) {
return -1;
}
if (remainingLength <= 0) {
return -1;
}
int read = delegate.read();
if (read != -1) {
remainingLength--;
}
return read;
}

@Override
public int read(@NotNull byte[] b, int off, int len) throws IOException {
if (!doSkip()) {
return -1;
}
if (remainingLength <= 0) {
return -1;
}
if (len > remainingLength) {
len = (int) remainingLength;
}
int n = delegate.read(b, off, len);
if (n != -1) {
remainingLength -= n;
}
return n;
}

@Override
public void close() throws IOException {
delegate.close();
}
}
}
Loading

0 comments on commit cfc3092

Please sign in to comment.