From 3f997c089a5d1019f221a2008c569f9371ec759f Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 27 Jan 2021 11:32:33 +0200 Subject: [PATCH] fix #1478 Schedule the first channel.read() once the HttpHeaders are sent There are use cases when the target server may want to reject the incoming request immediately and to close the connection. Instead of scheduling the first channel.read() when the request body is sent, the first channel.read() will be scheduled once the HttpHeaders are sent. --- .../reactor/netty/http/HttpOperations.java | 5 ++- .../http/client/HttpClientOperations.java | 8 ++++ .../http/server/HttpServerOperations.java | 5 +++ .../netty/http/client/HttpClientTest.java | 37 +++++++++++++++++++ 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java index 9f0e22cdba..5afe059e4e 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpOperations.java @@ -197,7 +197,8 @@ public Mono then() { throw e; } - return channel().writeAndFlush(msg); + return channel().writeAndFlush(msg) + .addListener(f -> onHeadersSent()); } else { return channel().newSucceededFuture(); @@ -209,6 +210,8 @@ public Mono then() { protected abstract void afterMarkSentHeaders(); + protected abstract void onHeadersSent(); + protected abstract HttpMessage newFullBodyMessage(ByteBuf body); @Override diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java index 6011904d33..3917cfae3a 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java @@ -524,6 +524,14 @@ protected void beforeMarkSentHeaders() { } } + @Override + protected void onHeadersSent() { + channel().read(); + if (channel().parent() != null) { + channel().parent().read(); + } + } + @Override @SuppressWarnings("FutureReturnValueIgnored") protected void onOutboundComplete() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java index fadc15510c..63698e3dfa 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java @@ -576,6 +576,11 @@ protected void beforeMarkSentHeaders() { //noop } + @Override + protected void onHeadersSent() { + //noop + } + @Override protected void onOutboundComplete() { if (isWebsocket()) { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 5805ce9839..d795fa3ab9 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -27,8 +27,10 @@ import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.security.cert.CertificateException; import java.time.Duration; import java.util.ArrayList; @@ -2552,4 +2554,39 @@ private void doTestEvictInBackground(int expectation, boolean evict) throws Exce assertThat(m.get()).isNotNull(); assertThat(m.get().idleSize()).isEqualTo(expectation); } + + @Test + void testIssue1478() throws Exception { + disposableServer = + HttpServer.create() + .handle((req, res) -> res.addHeader(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) + .status(HttpResponseStatus.BAD_REQUEST) + .send(req.receive() + .retain() + .next())) + .bindNow(); + + Path largeFile = Paths.get(getClass().getResource("/largeFile.txt").toURI()); + + Path largeFileParent = largeFile.getParent(); + assertThat(largeFileParent).isNotNull(); + + Path tempFile = Files.createTempFile(largeFileParent, "temp", ".txt"); + tempFile.toFile().deleteOnExit(); + + byte[] fileBytes = Files.readAllBytes(largeFile); + for (int i = 0; i < 1000; i++) { + Files.write(tempFile, fileBytes, StandardOpenOption.APPEND); + } + + HttpClient.create() + .port(disposableServer.port()) + .post() + .send((req, out) -> out.sendFile(tempFile)) + .responseSingle((res, bytes) -> Mono.just(res.status().code())) + .as(StepVerifier::create) + .expectNext(400) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } }