From d901e5581f89f09d61c78b9fd4f3540333ac436f Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 7 Nov 2023 21:50:59 +1100 Subject: [PATCH] Transient timeouts. Fixes #10234 and #10277 ongoing review --- .../java/org/eclipse/jetty/client/Response.java | 2 ++ .../jetty/client/transport/HttpReceiver.java | 13 ++++++++++++- .../jetty/client/transport/HttpSender.java | 6 +++++- .../client/transport/ResponseListeners.java | 17 +++++++++++++---- .../server/internal/HttpStreamOverFCGI.java | 4 +++- .../org/eclipse/jetty/io/ChunkAccumulator.java | 2 ++ .../content/ContentSourceCompletableFuture.java | 14 +++++++++++--- .../io/content/ContentSourceInputStream.java | 14 ++++++++------ .../io/content/ContentSourcePublisher.java | 2 ++ .../io/content/ContentSourceTransformer.java | 7 ++++++- .../io/internal/ContentSourceByteBuffer.java | 2 ++ .../io/internal/ContentSourceConsumer.java | 2 ++ .../jetty/io/internal/ContentSourceString.java | 2 ++ .../jetty/server/handler/gzip/GzipRequest.java | 6 +++++- .../jetty/server/internal/HttpConnection.java | 12 +++++++++++- .../eclipse/jetty/ee10/proxy/ProxyServlet.java | 2 ++ .../eclipse/jetty/ee10/servlet/HttpInput.java | 4 ---- .../eclipse/jetty/ee9/proxy/ProxyServlet.java | 2 ++ 18 files changed, 90 insertions(+), 23 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java index d30081949229..935e9cbc0763 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java @@ -190,6 +190,8 @@ default void onContentSource(Response response, Content.Source contentSource) if (Content.Chunk.isFailure(chunk)) { response.abort(chunk.getFailure()); + if (!chunk.isLast()) + contentSource.fail(chunk.getFailure()); return; } if (chunk.isLast() && !chunk.hasRemaining()) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index a8808c65abb9..cb9921d62e59 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.io.content.ContentSourceTransformer; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.Destroyable; import org.eclipse.jetty.util.thread.AutoLock; @@ -587,7 +588,11 @@ protected Content.Chunk transform(Content.Chunk inputChunk) if (_chunk == null) return null; if (Content.Chunk.isFailure(_chunk)) - return _chunk; + { + Content.Chunk failure = _chunk; + _chunk = Content.Chunk.next(failure); + return failure; + } // Retain the input chunk because its ByteBuffer will be referenced by the Inflater. if (retain) @@ -788,7 +793,13 @@ public boolean error(Throwable failure) try (AutoLock ignored = lock.lock()) { if (Content.Chunk.isFailure(currentChunk)) + { + Throwable cause = currentChunk.getFailure(); + if (!currentChunk.isLast()) + currentChunk = Content.Chunk.from(cause, true); + ExceptionUtil.addSuppressedIfNotAssociated(cause, failure); return false; + } if (currentChunk != null) currentChunk.release(); currentChunk = Content.Chunk.from(failure); diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java index 328ee0df9209..9e7d4dcd6344 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpSender.java @@ -504,7 +504,11 @@ protected Action process() throws Throwable } if (Content.Chunk.isFailure(chunk)) - throw chunk.getFailure(); + { + Content.Chunk failure = chunk; + chunk = Content.Chunk.next(failure); + throw failure.getFailure(); + } ByteBuffer buffer = chunk.getByteBuffer(); contentBuffer = buffer.asReadOnlyBuffer(); diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/ResponseListeners.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/ResponseListeners.java index cd56720e4c11..257a4a4a815e 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/ResponseListeners.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/ResponseListeners.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.content.ByteBufferContentSource; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -679,10 +680,18 @@ public void fail(Throwable failure) if (LOG.isDebugEnabled()) LOG.debug("Content source #{} fail while current chunk is {}", index, currentChunk); if (Content.Chunk.isFailure(currentChunk)) - return; - if (currentChunk != null && currentChunk != ALREADY_READ_CHUNK) - currentChunk.release(); - this.chunk = Content.Chunk.from(failure); + { + Throwable cause = currentChunk.getFailure(); + if (!currentChunk.isLast()) + chunk = Content.Chunk.from(cause, true); + ExceptionUtil.addSuppressedIfNotAssociated(cause, failure); + } + else + { + if (currentChunk != null && currentChunk != ALREADY_READ_CHUNK) + currentChunk.release(); + this.chunk = Content.Chunk.from(failure); + } onDemandCallback(); registerFailure(this, failure); } diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java index 680531847e58..dad5be63bae2 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java @@ -196,7 +196,9 @@ public void onComplete() { if (_chunk == null) _chunk = Content.Chunk.EOF; - else if (!_chunk.isLast() && !(Content.Chunk.isFailure(_chunk))) + else if (Content.Chunk.isFailure(_chunk, false)) + _chunk = Content.Chunk.from(_chunk.getFailure(), true); + else if (!_chunk.isLast()) throw new IllegalStateException(); } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java index cb036274c09a..4978c5f21ea3 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java @@ -191,6 +191,8 @@ public void run() if (Chunk.isFailure(chunk)) { completeExceptionally(chunk.getFailure()); + if (!chunk.isLast()) + _source.fail(chunk.getFailure()); break; } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceCompletableFuture.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceCompletableFuture.java index f44d92832618..384b6240f11e 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceCompletableFuture.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceCompletableFuture.java @@ -83,9 +83,17 @@ public void parse() } if (Content.Chunk.isFailure(chunk)) { - if (!chunk.isLast() && onTransientFailure(chunk.getFailure())) - continue; - completeExceptionally(chunk.getFailure()); + if (chunk.isLast()) + { + completeExceptionally(chunk.getFailure()); + } + else + { + if (onTransientFailure(chunk.getFailure())) + continue; + completeExceptionally(chunk.getFailure()); + _content.fail(chunk.getFailure()); + } return; } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceInputStream.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceInputStream.java index d25137481cf6..46dfbf74f32a 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceInputStream.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceInputStream.java @@ -56,9 +56,9 @@ public int read(byte[] b, int off, int len) throws IOException { if (Content.Chunk.isFailure(chunk)) { - Content.Chunk c = chunk; - chunk = Content.Chunk.next(c); - throw IO.rethrow(c.getFailure()); + Content.Chunk failure = chunk; + chunk = Content.Chunk.next(failure); + throw IO.rethrow(failure.getFailure()); } ByteBuffer byteBuffer = chunk.getByteBuffer(); @@ -125,9 +125,11 @@ public void close() throws IOException // Handle a failure as read would if (Content.Chunk.isFailure(chunk)) { - Content.Chunk c = chunk; - chunk = Content.Chunk.next(c); - throw IO.rethrow(c.getFailure()); + Content.Chunk failure = chunk; + chunk = Content.Chunk.next(failure); + if (!failure.isLast()) + content.fail(failure.getFailure()); + throw IO.rethrow(failure.getFailure()); } contentSkipped = chunk.hasRemaining(); diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java index 15858a928ff1..b4c202bc8054 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourcePublisher.java @@ -129,6 +129,8 @@ private void process() { terminate(); subscriber.onError(chunk.getFailure()); + if (!chunk.isLast()) + content.fail(chunk.getFailure()); return; } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java index 7f2d6330f896..e34d5903c6c8 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java @@ -66,7 +66,12 @@ public Content.Chunk read() } if (Content.Chunk.isFailure(rawChunk)) - return rawChunk; + { + Content.Chunk failure = rawChunk; + rawChunk = Content.Chunk.next(rawChunk); + needsRawRead = rawChunk == null; + return failure; + } if (Content.Chunk.isFailure(transformedChunk)) return transformedChunk; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java index 9d286e425964..9db5923b287f 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java @@ -47,6 +47,8 @@ public void run() if (Content.Chunk.isFailure(chunk)) { promise.failed(chunk.getFailure()); + if (!chunk.isLast()) + source.fail(chunk.getFailure()); return; } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceConsumer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceConsumer.java index 5992ee06e216..7b802ae2fbb4 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceConsumer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceConsumer.java @@ -44,6 +44,8 @@ public void run() if (Content.Chunk.isFailure(chunk)) { callback.failed(chunk.getFailure()); + if (!chunk.isLast()) + source.fail(chunk.getFailure()); return; } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceString.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceString.java index 68096a005710..9ae9605dd254 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceString.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceString.java @@ -45,6 +45,8 @@ public void convert() if (Content.Chunk.isFailure(chunk)) { promise.failed(chunk.getFailure()); + if (!chunk.isLast()) + content.fail(chunk.getFailure()); return; } text.append(chunk.getByteBuffer()); diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipRequest.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipRequest.java index 424b2d0e677c..71452f935b32 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipRequest.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipRequest.java @@ -159,7 +159,11 @@ protected Content.Chunk transform(Content.Chunk inputChunk) if (_chunk == null) return null; if (Content.Chunk.isFailure(_chunk)) - return _chunk; + { + Content.Chunk failure = _chunk; + _chunk = Content.Chunk.next(failure); + return failure; + } if (_chunk.isLast() && !_chunk.hasRemaining()) return Content.Chunk.EOF; diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 6e4dcd93cb6f..54ab4b3f40ee 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -1100,7 +1100,17 @@ public void earlyEOF() BadMessageException bad = new BadMessageException("Early EOF"); if (Content.Chunk.isFailure(stream._chunk)) - stream._chunk.getFailure().addSuppressed(bad); + { + if (stream._chunk.isLast()) + { + stream._chunk.getFailure().addSuppressed(bad); + } + else + { + bad.addSuppressed(stream._chunk.getFailure()); + stream._chunk = Content.Chunk.from(bad); + } + } else { if (stream._chunk != null) diff --git a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/ProxyServlet.java b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/ProxyServlet.java index b422a681dc9c..e68a229e87a9 100644 --- a/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/ProxyServlet.java +++ b/jetty-ee10/jetty-ee10-proxy/src/main/java/org/eclipse/jetty/ee10/proxy/ProxyServlet.java @@ -260,6 +260,8 @@ public Content.Chunk read() if (Content.Chunk.isFailure(chunk)) { onClientRequestFailure(request, proxyRequest, response, chunk.getFailure()); + if (!chunk.isLast()) + super.fail(chunk.getFailure(), true); } else { diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpInput.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpInput.java index d93e2e44c9bb..6dfff5f33881 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpInput.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpInput.java @@ -20,7 +20,6 @@ import jakarta.servlet.ReadListener; import jakarta.servlet.ServletInputStream; -import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.server.Context; import org.eclipse.jetty.util.thread.AutoLock; @@ -348,9 +347,6 @@ public void run() Throwable failure = chunk.getFailure(); if (LOG.isDebugEnabled()) LOG.debug("running failure={} {}", failure, this); - // TODO is this necessary to add here? - if (chunk.isLast()) - _servletChannel.getServletContextResponse().getHeaders().add(HttpFields.CONNECTION_CLOSE); _readListener.onError(failure); } else if (chunk.isLast() && !chunk.hasRemaining()) diff --git a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/ProxyServlet.java b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/ProxyServlet.java index a58bf1ef72e8..ccd658ec3a67 100644 --- a/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/ProxyServlet.java +++ b/jetty-ee9/jetty-ee9-proxy/src/main/java/org/eclipse/jetty/ee9/proxy/ProxyServlet.java @@ -260,6 +260,8 @@ public Content.Chunk read() if (Content.Chunk.isFailure(chunk)) { onClientRequestFailure(request, proxyRequest, response, chunk.getFailure()); + if (!chunk.isLast()) + super.fail(chunk.getFailure()); } else {