From e9d43314c2ae4c1a3c73c6d2d96826bb071d6de4 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 26 Aug 2024 18:56:35 +0200 Subject: [PATCH 1/8] Reworked HttpReceiverOverHTTP state machine, in particular: * Introduced a boolean parameter to parseAndFill() and parse(), that specifies whether to notify the application demand callback. This is necessary because reads may happen from any threads, and must not notify the application demand callback. Only when there is no data, and fill interest is set, then the application demand callback must be notified. * Removed action field to avoid lambda allocation. * Now the application is called directly from the parse() method. * Reading -1 from the network drives the parser by calling again parse(), rather than the parser directly. This allows to have a central place to notify the response success event. Signed-off-by: Simone Bordet --- .../jetty/client/transport/HttpReceiver.java | 4 + .../internal/HttpReceiverOverHTTP.java | 192 +++++++++--------- .../transport/HttpClientDemandTest.java | 21 +- 3 files changed, 109 insertions(+), 108 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index 93859d695d7c..911c8099cd60 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -342,6 +342,7 @@ protected void responseContentAvailable(HttpExchange exchange) * This method directly invokes the demand callback, assuming the caller * is already serialized with other events. */ + // TODO: remove this after FCGI fix. protected void responseContentAvailable() { contentSource.onDataAvailable(); @@ -720,6 +721,9 @@ public Content.Chunk read() current = HttpReceiver.this.read(false); + if (LOG.isDebugEnabled()) + LOG.debug("Read {} from {}", current, this); + try (AutoLock ignored = lock.lock()) { if (currentChunk != null) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java index de6ff4973a3d..93f41b743d7c 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java @@ -43,17 +43,18 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP.class); + private final Runnable receiveNext = this::receiveNext; private final LongAdder inMessages = new LongAdder(); private final HttpParser parser; private final ByteBufferPool byteBufferPool; private RetainableByteBuffer networkBuffer; - private boolean shutdown; - private boolean complete; + private State state = State.STATUS; private boolean unsolicited; - private String method; private int status; + private String method; private Content.Chunk chunk; - private Runnable action; + private boolean shutdown; + private boolean disposed; public HttpReceiverOverHTTP(HttpChannelOverHTTP channel) { @@ -73,7 +74,7 @@ void receive() { if (!hasContent()) { - boolean setFillInterest = parseAndFill(); + boolean setFillInterest = parseAndFill(true); if (!hasContent() && setFillInterest) fillInterested(); } @@ -97,10 +98,8 @@ protected void reset() super.reset(); parser.reset(); if (chunk != null) - { chunk.release(); - chunk = null; - } + chunk = null; } @Override @@ -109,10 +108,9 @@ protected void dispose() super.dispose(); parser.close(); if (chunk != null) - { chunk.release(); - chunk = null; - } + chunk = null; + disposed = true; } @Override @@ -124,7 +122,7 @@ public Content.Chunk read(boolean fillInterestIfNeeded) Content.Chunk chunk = consumeChunk(); if (chunk != null) return chunk; - boolean needFillInterest = parseAndFill(); + boolean needFillInterest = parseAndFill(false); if (LOG.isDebugEnabled()) LOG.debug("ParseAndFill needFillInterest {} in {}", needFillInterest, this); chunk = consumeChunk(); @@ -236,7 +234,7 @@ protected ByteBuffer onUpgradeFrom() * If this method depletes the buffer, it will always try to re-fill until fill generates 0 byte. * @return true if no bytes were filled. */ - private boolean parseAndFill() + private boolean parseAndFill(boolean notifyContentAvailable) { HttpConnectionOverHTTP connection = getHttpConnection(); EndPoint endPoint = connection.getEndPoint(); @@ -246,23 +244,22 @@ private boolean parseAndFill() acquireNetworkBuffer(); while (true) { - if (LOG.isDebugEnabled()) - LOG.debug("Parsing {} in {}", BufferUtil.toDetailString(networkBuffer.getByteBuffer()), this); // Always parse even empty buffers to advance the parser. - if (parse()) + boolean stopParsing = parse(notifyContentAvailable); + if (LOG.isDebugEnabled()) + LOG.debug("Parsed stop={} in {}", stopParsing, this); + if (stopParsing) { // Return immediately, as this thread may be in a race // with e.g. another thread demanding more content. return false; } - if (LOG.isDebugEnabled()) - LOG.debug("Parser willing to advance in {}", this); // Connection may be closed in a parser callback. - if (connection.isClosed()) + if (connection.isClosed() || isShutdown()) { if (LOG.isDebugEnabled()) - LOG.debug("Closed {} in {}", connection, this); + LOG.debug("Closed/Shutdown {} in {}", connection, this); releaseNetworkBuffer(); return false; } @@ -286,9 +283,9 @@ else if (read == 0) } else { - releaseNetworkBuffer(); shutdown(); - return false; + // Loop around to parse again to advance the parser, + // for example for HTTP/1.0 connection-delimited content. } } } @@ -307,62 +304,80 @@ else if (read == 0) * * @return true to indicate that parsing should be interrupted (and will be resumed by another thread). */ - private boolean parse() + private boolean parse(boolean notifyContentAvailable) { + // HttpParser is not reentrant, so we cannot invoke the + // application from the parser event callbacks. + // However, the mechanism in general (and this method) + // is reentrant: it notifies the application which may + // read response content, which reenters here. + + ByteBuffer byteBuffer = networkBuffer.getByteBuffer(); while (true) { - boolean handle = parser.parseNext(networkBuffer.getByteBuffer()); + boolean handle = parser.parseNext(byteBuffer); if (LOG.isDebugEnabled()) - LOG.debug("Parse result={} on {}", handle, this); - Runnable action = getAndSetAction(null); - if (action != null) - { - if (LOG.isDebugEnabled()) - LOG.debug("Executing action after parser returned: {} on {}", action, this); - action.run(); - if (LOG.isDebugEnabled()) - LOG.debug("Action executed after Parse result={} on {}", handle, this); - } - if (handle) - { - // When the receiver is aborted, the parser is closed in dispose() which changes - // its state to State.CLOSE; so checking parser.isClose() is just a way to check - // if the receiver was aborted or not. - return !parser.isClose(); - } + LOG.debug("Parse state={} result={} {} {} on {}", state, handle, BufferUtil.toDetailString(byteBuffer), parser, this); + if (!handle) + return false; - boolean complete = this.complete; - this.complete = false; - if (LOG.isDebugEnabled()) - LOG.debug("Parse complete={}, {} {} in {}", complete, networkBuffer, parser, this); + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + throw new IllegalStateException("No exchange"); - if (complete) + switch (state) { - int status = this.status; - this.status = 0; - // Connection upgrade due to 101, bail out. - if (status == HttpStatus.SWITCHING_PROTOCOLS_101) - return true; - // Connection upgrade due to CONNECT + 200, bail out. - String method = this.method; - this.method = null; - if (getHttpChannel().isTunnel(method, status)) - return true; - - if (!networkBuffer.hasRemaining()) - return false; - - if (!HttpStatus.isInformational(status)) + case HEADERS -> responseHeaders(exchange); + case CONTENT -> { - if (LOG.isDebugEnabled()) - LOG.debug("Discarding unexpected content after response {}: {} in {}", status, networkBuffer, this); - networkBuffer.clear(); + if (notifyContentAvailable) + responseContentAvailable(exchange); } - return false; + case COMPLETE -> + { + boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101; + boolean isTunnel = getHttpChannel().isTunnel(method, status); + + Runnable task = isUpgrade || isTunnel ? null : this.receiveNext; + responseSuccess(exchange, task); + + // Connection upgrade, bail out. + if (isUpgrade || isTunnel) + return true; + + if (byteBuffer.hasRemaining()) + { + if (HttpStatus.isInterim(status)) + { + // There may be multiple interim responses in + // the same network buffer, continue parsing. + continue; + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Discarding unexpected content after response {}: {} in {}", status, BufferUtil.toDetailString(byteBuffer), this); + BufferUtil.clear(byteBuffer); + return false; + } + } + + // Continue to read from the network. + return false; + } + default -> throw new IllegalStateException("Invalid state " + state); } - if (!networkBuffer.hasRemaining()) + // The application may have aborted the request. + if (disposed) + { + BufferUtil.clear(byteBuffer); return false; + } + + // The application has been invoked, + // and it is now driving the parsing. + return true; } } @@ -386,7 +401,6 @@ private void shutdown() // header, the connection will be closed at exchange termination // thanks to the flag we have set above. parser.atEOF(); - parser.parseNext(BufferUtil.EMPTY_BUFFER); } protected boolean isShutdown() @@ -406,6 +420,7 @@ public void startResponse(HttpVersion version, int status, String reason) this.status = status; parser.setHeadResponse(HttpMethod.HEAD.is(method) || getHttpChannel().isTunnel(method, status)); exchange.getResponse().version(version).status(status).reason(reason); + state = State.STATUS; responseBegin(exchange); } @@ -432,10 +447,7 @@ public boolean headerComplete() // Store the EndPoint is case of upgrades, tunnels, etc. exchange.getRequest().getConversation().setAttribute(EndPoint.class.getName(), getHttpConnection().getEndPoint()); getHttpConnection().onResponseHeaders(exchange); - if (LOG.isDebugEnabled()) - LOG.debug("Setting action to responseHeaders(exchange, boolean) on {}", this); - if (getAndSetAction(() -> responseHeaders(exchange)) != null) - throw new IllegalStateException(); + state = State.HEADERS; return true; } @@ -451,17 +463,13 @@ public boolean content(ByteBuffer buffer) if (chunk != null) throw new IllegalStateException("Content generated with unconsumed content left"); + if (getHttpConnection().isFillInterested()) + throw new IllegalStateException("Fill interested while parsing for content"); // Retain the chunk because it is stored for later use. networkBuffer.retain(); chunk = Content.Chunk.asChunk(buffer, false, networkBuffer); - - if (LOG.isDebugEnabled()) - LOG.debug("Setting action to responseContentAvailable on {}", this); - if (getAndSetAction(this::responseContentAvailable) != null) - throw new IllegalStateException(); - if (getHttpConnection().isFillInterested()) - throw new IllegalStateException(); + state = State.CONTENT; return true; } @@ -491,28 +499,20 @@ public boolean messageComplete() if (exchange == null || unsolicited) { // We received an unsolicited response from the server. + networkBuffer.clear(); getHttpConnection().close(); return false; } int status = exchange.getResponse().getStatus(); if (!HttpStatus.isInterim(status)) - { inMessages.increment(); - complete = true; - } if (chunk != null) throw new IllegalStateException(); chunk = Content.Chunk.EOF; - - boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101; - boolean isTunnel = getHttpChannel().isTunnel(method, status); - Runnable task = isUpgrade || isTunnel ? null : this::receiveNext; - if (LOG.isDebugEnabled()) - LOG.debug("Message complete, calling response success with task {} in {}", task, this); - responseSuccess(exchange, task); - return false; + state = State.COMPLETE; + return true; } private void receiveNext() @@ -524,7 +524,7 @@ private void receiveNext() if (LOG.isDebugEnabled()) LOG.debug("Receiving next request in {}", this); - boolean setFillInterest = parseAndFill(); + boolean setFillInterest = parseAndFill(true); if (!hasContent() && setFillInterest) fillInterested(); } @@ -556,13 +556,6 @@ public void badMessage(HttpException failure) } } - private Runnable getAndSetAction(Runnable action) - { - Runnable r = this.action; - this.action = action; - return r; - } - long getMessagesIn() { return inMessages.longValue(); @@ -573,4 +566,9 @@ public String toString() { return String.format("%s[%s]", super.toString(), parser); } + + private enum State + { + STATUS, HEADERS, CONTENT, COMPLETE + } } diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java index 2403239119fb..4bbf288a1abd 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java @@ -42,7 +42,6 @@ import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.NanoTime; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -263,7 +262,7 @@ public boolean handle(Request request, org.eclipse.jetty.server.Response respons .timeout(5, TimeUnit.SECONDS) .send(result -> { - Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure())); + assertFalse(result.isFailed(), String.valueOf(result.getFailure())); Response response = result.getResponse(); assertEquals(HttpStatus.OK_200, response.getStatus()); resultLatch.countDown(); @@ -346,7 +345,7 @@ public boolean handle(Request request, org.eclipse.jetty.server.Response respons .onResponseContentAsync(listener2) .send(result -> { - Assertions.assertFalse(result.isFailed(), String.valueOf(result.getFailure())); + assertFalse(result.isFailed(), String.valueOf(result.getFailure())); Response response = result.getResponse(); assertEquals(HttpStatus.OK_200, response.getStatus()); resultLatch.countDown(); @@ -415,8 +414,8 @@ public boolean handle(Request request, org.eclipse.jetty.server.Response respons }) .send(result -> { - Assertions.assertTrue(result.isSucceeded()); - Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); resultLatch.countDown(); }); assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); @@ -480,8 +479,8 @@ public void onContentSource(Response response, Content.Source contentSource) }) .send(result -> { - Assertions.assertTrue(result.isSucceeded()); - Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); resultLatch.countDown(); }); @@ -540,8 +539,8 @@ public void onContentSource(Response response, Content.Source contentSource) }) .send(result -> { - Assertions.assertTrue(result.isSucceeded()); - Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); resultLatch.countDown(); }); @@ -572,8 +571,8 @@ public void testReadDemandInSpawnedThread(Transport transport) throws Exception .onResponseContentSource((response, contentSource) -> contentSource.demand(() -> new Thread(new Accumulator(contentSource, chunks)).start())) .send(result -> { - Assertions.assertTrue(result.isSucceeded()); - Assertions.assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); resultLatch.countDown(); }); From f633ccc35ec211df3228390342915a312da4dd62 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 26 Aug 2024 19:01:20 +0200 Subject: [PATCH 2/8] Fixed test flakyness: consume the request content before sending the response. Signed-off-by: Simone Bordet --- .../org/eclipse/jetty/client/NetworkTrafficListenerTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java index cb9e242ccc56..70be097bc97e 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/NetworkTrafficListenerTest.java @@ -332,7 +332,10 @@ public void testTrafficWithRequestContentWithResponseRedirectOnPersistentConnect @Override public boolean handle(Request request, Response response, Callback callback) { - Response.sendRedirect(request, response, callback, location); + Content.Source.consumeAll(request, Callback.from( + () -> Response.sendRedirect(request, response, callback, location), + callback::failed + )); return true; } }); From d84cc66548275b6d58789e0bf0096c22a7fca896 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 26 Aug 2024 19:04:42 +0200 Subject: [PATCH 3/8] Follow up after #10880: only abort the request if there is request content. This avoids the rare case where the response arrives before the request thread has modified the request state, even if the request has been fully sent over the network, causing the request to be failed even if it should not. Signed-off-by: Simone Bordet --- .../eclipse/jetty/client/AuthenticationProtocolHandler.java | 3 ++- .../java/org/eclipse/jetty/client/RedirectProtocolHandler.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java index 894461c8fda0..1d018b89a211 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java @@ -127,7 +127,8 @@ public void onSuccess(Response response) { // The request may still be sending content, stop it. Request request = response.getRequest(); - request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); + if (request.getBody() != null) + request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); } @Override diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java index 161a29948a64..56692b7a2003 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java @@ -61,7 +61,8 @@ public void onSuccess(Response response) { // The request may still be sending content, stop it. Request request = response.getRequest(); - request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); + if (request.getBody() != null) + request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request)); } @Override From a7cf43f60d1c546f54d9e7ea352e26c2bee47476 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 28 Aug 2024 10:47:10 +0200 Subject: [PATCH 4/8] Fixed FastCGI similarly to HTTP/1.1. Signed-off-by: Simone Bordet --- .../jetty/client/transport/HttpReceiver.java | 14 +--- .../internal/HttpReceiverOverHTTP.java | 1 + .../internal/HttpChannelOverFCGI.java | 16 ++++- .../internal/HttpConnectionOverFCGI.java | 66 ++++++++++++------- .../internal/HttpReceiverOverFCGI.java | 24 ++++--- 5 files changed, 74 insertions(+), 47 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index 911c8099cd60..e0001c19e463 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -332,22 +332,10 @@ protected void responseContentAvailable(HttpExchange exchange) if (exchange.isResponseCompleteOrTerminated()) return; - responseContentAvailable(); + contentSource.onDataAvailable(); }); } - /** - * Method to be invoked when response content is available to be read. - *

- * This method directly invokes the demand callback, assuming the caller - * is already serialized with other events. - */ - // TODO: remove this after FCGI fix. - protected void responseContentAvailable() - { - contentSource.onDataAvailable(); - } - /** * Method to be invoked when the response is successful. *

diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java index 93f41b743d7c..f74dbc6c149f 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java @@ -268,6 +268,7 @@ private boolean parseAndFill(boolean notifyContentAvailable) reacquireNetworkBuffer(); // The networkBuffer may have been reacquired. + assert !networkBuffer.hasRemaining(); int read = endPoint.fill(networkBuffer.getByteBuffer()); if (LOG.isDebugEnabled()) LOG.debug("Read {} bytes in {} from {} in {}", read, networkBuffer, endPoint, this); diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpChannelOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpChannelOverFCGI.java index 0d96492a6549..8d636a63de49 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpChannelOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpChannelOverFCGI.java @@ -119,11 +119,25 @@ protected void content(Content.Chunk chunk) receiver.content(chunk); } + protected void responseContentAvailable() + { + HttpExchange exchange = getHttpExchange(); + if (exchange != null) + receiver.responseContentAvailable(exchange); + } + protected void end() { HttpExchange exchange = getHttpExchange(); if (exchange != null) - receiver.end(exchange); + receiver.end(); + } + + protected void responseSuccess() + { + HttpExchange exchange = getHttpExchange(); + if (exchange != null) + receiver.responseSuccess(exchange); } protected void responseFailure(Throwable failure, Promise promise) diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java index 0309c2872b68..4bdc222b125e 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java @@ -67,7 +67,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne private final HttpChannelOverFCGI channel; private RetainableByteBuffer networkBuffer; private Object attachment; - private Runnable action; + private State state = State.STATUS; private long idleTimeout; private boolean shutdown; @@ -168,7 +168,7 @@ private void releaseNetworkBuffer() this.networkBuffer = null; } - boolean parseAndFill() + boolean parseAndFill(boolean notifyContentAvailable) { if (LOG.isDebugEnabled()) LOG.debug("parseAndFill {}", networkBuffer); @@ -179,7 +179,7 @@ boolean parseAndFill() { while (true) { - if (parse(networkBuffer.getByteBuffer())) + if (parse(networkBuffer.getByteBuffer(), notifyContentAvailable)) return false; if (networkBuffer.isRetained()) @@ -214,13 +214,35 @@ else if (read < 0) } } - private boolean parse(ByteBuffer buffer) + private boolean parse(ByteBuffer buffer, boolean notifyContentAvailable) { - boolean parse = parser.parse(buffer); - Runnable action = getAndSetAction(null); - if (action != null) - action.run(); - return parse; + boolean handle = parser.parse(buffer); + + switch (state) + { + case STATUS -> + { + // Nothing to do. + } + case HEADERS -> channel.responseHeaders(); + case CONTENT -> + { + if (notifyContentAvailable) + channel.responseContentAvailable(); + } + case COMPLETE -> + { + // For the complete event, handle==false, and cannot + // differentiate between a complete event and a parse() + // with zero or not enough bytes, so the state is reset + // here to avoid calling responseSuccess() again. + state = State.STATUS; + channel.responseSuccess(); + } + default -> throw new IllegalStateException("Invalid state " + state); + } + + return handle; } private void shutdown() @@ -318,13 +340,6 @@ private void failAndClose(Throwable failure) }, x -> close(failure))); } - private Runnable getAndSetAction(Runnable action) - { - Runnable r = this.action; - this.action = action; - return r; - } - protected HttpChannelOverFCGI newHttpChannel() { return new HttpChannelOverFCGI(this); @@ -414,6 +429,7 @@ public void onBegin(int request, int code, String reason) { if (LOG.isDebugEnabled()) LOG.debug("onBegin r={},c={},reason={}", request, code, reason); + state = State.STATUS; channel.responseBegin(code, reason); } @@ -430,8 +446,7 @@ public boolean onHeaders(int request) { if (LOG.isDebugEnabled()) LOG.debug("onHeaders r={} {}", request, networkBuffer); - if (getAndSetAction(channel::responseHeaders) != null) - throw new IllegalStateException(); + state = State.HEADERS; return true; } @@ -444,13 +459,10 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { case STD_OUT -> { - // No need to call networkBuffer.retain() here, since we know - // that the action will be run before releasing the networkBuffer. - // The receiver of the chunk decides whether to consume/retain it. Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer); - if (getAndSetAction(() -> channel.content(chunk)) == null) - return true; - throw new IllegalStateException(); + channel.content(chunk); + state = State.CONTENT; + return true; } case STD_ERR -> LOG.info(BufferUtil.toUTF8String(buffer)); default -> throw new IllegalArgumentException(); @@ -464,6 +476,7 @@ public void onEnd(int request) if (LOG.isDebugEnabled()) LOG.debug("onEnd r={}", request); channel.end(); + state = State.COMPLETE; } @Override @@ -474,4 +487,9 @@ public void onFailure(int request, Throwable failure) failAndClose(failure); } } + + private enum State + { + STATUS, HEADERS, CONTENT, COMPLETE + } } diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java index 8c810e215acd..805cff268acc 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java @@ -34,7 +34,7 @@ void receive() if (!hasContent()) { HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); - boolean setFillInterest = httpConnection.parseAndFill(); + boolean setFillInterest = httpConnection.parseAndFill(true); if (!hasContent() && setFillInterest) httpConnection.fillInterested(); } @@ -81,7 +81,7 @@ public Content.Chunk read(boolean fillInterestIfNeeded) if (chunk != null) return chunk; HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); - boolean needFillInterest = httpConnection.parseAndFill(); + boolean needFillInterest = httpConnection.parseAndFill(false); chunk = consumeChunk(); if (chunk != null) return chunk; @@ -109,23 +109,23 @@ public void failAndClose(Throwable failure) void content(Content.Chunk chunk) { - HttpExchange exchange = getHttpExchange(); - if (exchange == null) - return; if (this.chunk != null) throw new IllegalStateException(); // Retain the chunk because it is stored for later reads. chunk.retain(); this.chunk = chunk; - responseContentAvailable(); } - void end(HttpExchange exchange) + void end() { if (chunk != null) throw new IllegalStateException(); chunk = Content.Chunk.EOF; - responseSuccess(exchange, this::receiveNext); + } + + void responseSuccess(HttpExchange exchange) + { + super.responseSuccess(exchange, this::receiveNext); } private void receiveNext() @@ -136,7 +136,7 @@ private void receiveNext() throw new IllegalStateException(); HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); - boolean setFillInterest = httpConnection.parseAndFill(); + boolean setFillInterest = httpConnection.parseAndFill(true); if (!hasContent() && setFillInterest) httpConnection.fillInterested(); } @@ -165,6 +165,12 @@ protected void responseHeaders(HttpExchange exchange) super.responseHeaders(exchange); } + @Override + protected void responseContentAvailable(HttpExchange exchange) + { + super.responseContentAvailable(exchange); + } + @Override protected void responseFailure(Throwable failure, Promise promise) { From 241147820238af7b4b46b043f4f07d34a18579a5 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 28 Aug 2024 11:07:04 +0200 Subject: [PATCH 5/8] Removed leftover of the multiplex implementation. Signed-off-by: Simone Bordet --- .../fcgi/parser/ResponseContentParser.java | 64 ++++++++----------- 1 file changed, 27 insertions(+), 37 deletions(-) diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java index d86c22b78d9e..85eec1b85bad 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java @@ -15,8 +15,6 @@ import java.io.EOFException; import java.nio.ByteBuffer; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.eclipse.jetty.fcgi.FCGI; import org.eclipse.jetty.http.HttpCompliance; @@ -43,13 +41,12 @@ public class ResponseContentParser extends StreamContentParser { private static final Logger LOG = LoggerFactory.getLogger(ResponseContentParser.class); - private final Map parsers = new ConcurrentHashMap<>(); - private final ClientParser.Listener listener; + private final ResponseParser parser; public ResponseContentParser(HeaderParser headerParser, ClientParser.Listener listener) { super(headerParser, FCGI.StreamType.STD_OUT, listener); - this.listener = listener; + this.parser = new ResponseParser(listener); } @Override @@ -63,13 +60,6 @@ public boolean noContent() @Override protected boolean onContent(ByteBuffer buffer) { - int request = getRequest(); - ResponseParser parser = parsers.get(request); - if (parser == null) - { - parser = new ResponseParser(listener, request); - parsers.put(request, parser); - } return parser.parse(buffer); } @@ -77,37 +67,44 @@ protected boolean onContent(ByteBuffer buffer) protected void end(int request) { super.end(request); - parsers.remove(request); + parser.reset(); } - private static class ResponseParser implements HttpParser.ResponseHandler + private class ResponseParser implements HttpParser.ResponseHandler { private final HttpFields.Mutable fields = HttpFields.build(); private final ClientParser.Listener listener; - private final int request; private final FCGIHttpParser httpParser; private State state = State.HEADERS; private boolean seenResponseCode; private boolean stalled; - private ResponseParser(ClientParser.Listener listener, int request) + private ResponseParser(ClientParser.Listener listener) { this.listener = listener; - this.request = request; this.httpParser = new FCGIHttpParser(this); } + private void reset() + { + fields.clear(); + httpParser.reset(); + state = State.HEADERS; + seenResponseCode = false; + stalled = false; + } + public boolean parse(ByteBuffer buffer) { int remaining = buffer.remaining(); while (remaining > 0) { if (LOG.isDebugEnabled()) - LOG.debug("Response {} {}, state {} {}", request, FCGI.StreamType.STD_OUT, state, BufferUtil.toDetailString(buffer)); + LOG.debug("Response {} {}, state {} {}", getRequest(), FCGI.StreamType.STD_OUT, state, BufferUtil.toDetailString(buffer)); switch (state) { - case HEADERS: + case HEADERS -> { if (httpParser.parseNext(buffer)) { @@ -116,40 +113,33 @@ public boolean parse(ByteBuffer buffer) return true; } remaining = buffer.remaining(); - break; } - case CONTENT_MODE: + case CONTENT_MODE -> { // If we have no indication of the content, then // the HTTP parser will assume there is no content // and will not parse it even if it is provided, // so we have to parse it raw ourselves here. boolean rawContent = fields.size() == 0 || - (fields.get(HttpHeader.CONTENT_LENGTH) == null && - fields.get(HttpHeader.TRANSFER_ENCODING) == null); + (fields.get(HttpHeader.CONTENT_LENGTH) == null && + fields.get(HttpHeader.TRANSFER_ENCODING) == null); state = rawContent ? State.RAW_CONTENT : State.HTTP_CONTENT; - break; } - case RAW_CONTENT: + case RAW_CONTENT -> { ByteBuffer content = buffer.asReadOnlyBuffer(); buffer.position(buffer.limit()); if (notifyContent(content)) return true; remaining = 0; - break; } - case HTTP_CONTENT: + case HTTP_CONTENT -> { if (httpParser.parseNext(buffer)) return true; remaining = buffer.remaining(); - break; - } - default: - { - throw new IllegalStateException(); } + default -> throw new IllegalStateException(); } } return false; @@ -205,7 +195,7 @@ private void notifyBegin(int code, String reason) { try { - listener.onBegin(request, code, reason); + listener.onBegin(getRequest(), code, reason); } catch (Throwable x) { @@ -218,7 +208,7 @@ private void notifyHeader(HttpField httpField) { try { - listener.onHeader(request, httpField); + listener.onHeader(getRequest(), httpField); } catch (Throwable x) { @@ -242,7 +232,7 @@ private boolean notifyHeaders() { try { - return listener.onHeaders(request); + return listener.onHeaders(getRequest()); } catch (Throwable x) { @@ -278,7 +268,7 @@ private boolean notifyContent(ByteBuffer buffer) { try { - return listener.onContent(request, FCGI.StreamType.STD_OUT, buffer); + return listener.onContent(getRequest(), FCGI.StreamType.STD_OUT, buffer); } catch (Throwable x) { @@ -318,7 +308,7 @@ protected void fail(Throwable failure) { try { - listener.onFailure(request, failure); + listener.onFailure(getRequest(), failure); } catch (Throwable x) { From 01fe947bdfecd11a3e4e12607a161e8345fb1764 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Wed, 28 Aug 2024 19:52:09 +0200 Subject: [PATCH 6/8] add SerializedInvoker assertions about current thread invoking Signed-off-by: Ludovic Orban --- .../jetty/client/transport/HttpReceiver.java | 8 ++ .../jetty/util/thread/SerializedInvoker.java | 127 +++++++++++++++--- .../util/thread/SerializedInvokerTest.java | 54 +++++--- 3 files changed, 154 insertions(+), 35 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index e0001c19e463..33fa1aa8c210 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -731,6 +731,7 @@ public void onDataAvailable() { if (LOG.isDebugEnabled()) LOG.debug("onDataAvailable on {}", this); + invoker.assertCurrentThreadInvoking(); // The onDataAvailable() method is only ever called // by the invoker so avoid using the invoker again. invokeDemandCallback(false); @@ -755,6 +756,8 @@ private void processDemand() if (LOG.isDebugEnabled()) LOG.debug("Processing demand on {}", this); + invoker.assertCurrentThreadInvoking(); + Content.Chunk current; try (AutoLock ignored = lock.lock()) { @@ -794,9 +797,14 @@ private void invokeDemandCallback(boolean invoke) try { if (invoke) + { invoker.run(demandCallback); + } else + { + invoker.assertCurrentThreadInvoking(); demandCallback.run(); + } } catch (Throwable x) { diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java index 72bcd693a732..e7ea03eb41ae 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java @@ -13,8 +13,12 @@ package org.eclipse.jetty.util.thread; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.thread.Invocable.InvocationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +39,51 @@ public class SerializedInvoker private static final Logger LOG = LoggerFactory.getLogger(SerializedInvoker.class); private final AtomicReference _tail = new AtomicReference<>(); + private final String _name; + private volatile Thread _invokerThread; + + /** + * Create a new instance whose name is {@code anonymous}. + */ + public SerializedInvoker() + { + this("anonymous"); + } + + /** + * Create a new instance whose name is derived from the given class. + * @param nameFrom the class to use as a name. + */ + public SerializedInvoker(Class nameFrom) + { + this(nameFrom.getSimpleName()); + } + + /** + * Create a new instance with the given name. + * @param name the name. + */ + public SerializedInvoker(String name) + { + _name = name; + } + + /** + * @return whether the current thread is currently executing a task using this invoker + */ + boolean isCurrentThreadInvoking() + { + return _invokerThread == Thread.currentThread(); + } + + /** + * @throws IllegalStateException when the current thread is not currently executing a task using this invoker + */ + public void assertCurrentThreadInvoking() throws IllegalStateException + { + if (!isCurrentThreadInvoking()) + throw new IllegalStateException(); + } /** * Arrange for a task to be invoked, mutually excluded from other tasks. @@ -59,7 +108,7 @@ public Runnable offer(Runnable task) { // Wrap the given task with another one that's going to delegate run() to the wrapped task while the // wrapper's toString() returns a description of the place in code where SerializedInvoker.run() was called. - task = new NamedRunnable(task, deriveTaskName(task)); + task = new NamedRunnable(task); } } Link link = new Link(task); @@ -72,18 +121,6 @@ public Runnable offer(Runnable task) return null; } - protected String deriveTaskName(Runnable task) - { - StackTraceElement[] stackTrace = new Exception().getStackTrace(); - for (StackTraceElement stackTraceElement : stackTrace) - { - String className = stackTraceElement.getClassName(); - if (!className.equals(SerializedInvoker.class.getName()) && !className.equals(getClass().getName())) - return "Queued at " + stackTraceElement; - } - return task.toString(); - } - /** * Arrange for tasks to be invoked, mutually excluded from other tasks. * @param tasks The tasks to invoke @@ -116,8 +153,8 @@ public void run(Runnable task) if (todo != null) todo.run(); else - if (LOG.isDebugEnabled()) - LOG.debug("Queued link in {}", this); + if (LOG.isDebugEnabled()) + LOG.debug("Queued link in {}", this); } /** @@ -131,14 +168,14 @@ public void run(Runnable... tasks) if (todo != null) todo.run(); else - if (LOG.isDebugEnabled()) - LOG.debug("Queued links in {}", this); + if (LOG.isDebugEnabled()) + LOG.debug("Queued links in {}", this); } @Override public String toString() { - return String.format("%s@%x{tail=%s}", getClass().getSimpleName(), hashCode(), _tail); + return String.format("%s@%x{name=%s,tail=%s,invoker=%s}", getClass().getSimpleName(), hashCode(), _name, _tail, _invokerThread); } protected void onError(Runnable task, Throwable t) @@ -146,7 +183,7 @@ protected void onError(Runnable task, Throwable t) LOG.warn("Serialized invocation error", t); } - private class Link implements Runnable, Invocable + private class Link implements Runnable, Invocable, Dumpable { private final Runnable _task; private final AtomicReference _next = new AtomicReference<>(); @@ -156,6 +193,24 @@ public Link(Runnable task) _task = task; } + @Override + public void dump(Appendable out, String indent) throws IOException + { + if (_task instanceof NamedRunnable nr) + { + StringWriter sw = new StringWriter(); + nr.stack.printStackTrace(new PrintWriter(sw)); + Dumpable.dumpObjects(out, indent, nr.toString(), sw.toString()); + } + else + { + Dumpable.dumpObjects(out, indent, _task); + } + Link link = _next.get(); + if (link != null) + link.dump(out, indent); + } + @Override public InvocationType getInvocationType() { @@ -186,6 +241,7 @@ public void run() { if (LOG.isDebugEnabled()) LOG.debug("Running link {} of {}", link, SerializedInvoker.this); + _invokerThread = Thread.currentThread(); try { link._task.run(); @@ -196,6 +252,12 @@ public void run() LOG.debug("Failed while running link {} of {}", link, SerializedInvoker.this, t); onError(link._task, t); } + finally + { + // _invokerThread must be nulled before calling link.next() as + // once the latter has executed, another thread can enter Link.run(). + _invokerThread = null; + } link = link.next(); if (link == null && LOG.isDebugEnabled()) LOG.debug("Next link is null, execution is over in {}", SerializedInvoker.this); @@ -209,10 +271,35 @@ public String toString() } } - private record NamedRunnable(Runnable delegate, String name) implements Runnable + private class NamedRunnable implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(NamedRunnable.class); + private final Runnable delegate; + private final String name; + private final Throwable stack; + + public NamedRunnable(Runnable delegate) + { + this.delegate = delegate; + this.stack = new Throwable(); + this.name = deriveTaskName(delegate, stack); + } + + protected String deriveTaskName(Runnable task, Throwable stack) + { + StackTraceElement[] stackTrace = stack.getStackTrace(); + for (StackTraceElement stackTraceElement : stackTrace) + { + String className = stackTraceElement.getClassName(); + if (!className.equals(SerializedInvoker.class.getName()) && + !className.equals(SerializedInvoker.this.getClass().getName()) && + !className.equals(getClass().getName())) + return "Queued by " + Thread.currentThread().getName() + " at " + stackTraceElement; + } + return task.toString(); + } + @Override public void run() { diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SerializedInvokerTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SerializedInvokerTest.java index 2fc9011d7794..16a5dff80d23 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SerializedInvokerTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SerializedInvokerTest.java @@ -14,6 +14,8 @@ package org.eclipse.jetty.util.thread; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -25,17 +27,20 @@ public class SerializedInvokerTest { - SerializedInvoker _serialedInvoker; + private SerializedInvoker _serializedInvoker; + private ExecutorService _executor; @BeforeEach public void beforeEach() { - _serialedInvoker = new SerializedInvoker(); + _serializedInvoker = new SerializedInvoker(SerializedInvokerTest.class); + _executor = Executors.newSingleThreadExecutor(); } @AfterEach public void afterEach() { + _executor.shutdownNow(); } @Test @@ -45,24 +50,27 @@ public void testSimple() throws Exception Task task2 = new Task(); Task task3 = new Task(); - Runnable todo = _serialedInvoker.offer(task1); - assertNull(_serialedInvoker.offer(task2)); - assertNull(_serialedInvoker.offer(task3)); + Runnable todo = _serializedInvoker.offer(task1); + assertNull(_serializedInvoker.offer(task2)); + assertNull(_serializedInvoker.offer(task3)); assertFalse(task1.hasRun()); assertFalse(task2.hasRun()); assertFalse(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); todo.run(); assertTrue(task1.hasRun()); assertTrue(task2.hasRun()); assertTrue(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); Task task4 = new Task(); - todo = _serialedInvoker.offer(task4); + todo = _serializedInvoker.offer(task4); todo.run(); assertTrue(task4.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); } @Test @@ -72,22 +80,25 @@ public void testMulti() Task task2 = new Task(); Task task3 = new Task(); - Runnable todo = _serialedInvoker.offer(null, task1, null, task2, null, task3, null); + Runnable todo = _serializedInvoker.offer(null, task1, null, task2, null, task3, null); assertFalse(task1.hasRun()); assertFalse(task2.hasRun()); assertFalse(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); todo.run(); assertTrue(task1.hasRun()); assertTrue(task2.hasRun()); assertTrue(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); Task task4 = new Task(); - todo = _serialedInvoker.offer(task4); + todo = _serializedInvoker.offer(task4); todo.run(); assertTrue(task4.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); } @Test @@ -99,7 +110,7 @@ public void testRecursive() @Override public void run() { - assertNull(_serialedInvoker.offer(task3)); + assertNull(_serializedInvoker.offer(task3)); super.run(); } }; @@ -108,32 +119,35 @@ public void run() @Override public void run() { - assertNull(_serialedInvoker.offer(task2)); + assertNull(_serializedInvoker.offer(task2)); super.run(); } }; - Runnable todo = _serialedInvoker.offer(task1); + Runnable todo = _serializedInvoker.offer(task1); assertFalse(task1.hasRun()); assertFalse(task2.hasRun()); assertFalse(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); todo.run(); assertTrue(task1.hasRun()); assertTrue(task2.hasRun()); assertTrue(task3.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); Task task4 = new Task(); - todo = _serialedInvoker.offer(task4); + todo = _serializedInvoker.offer(task4); todo.run(); assertTrue(task4.hasRun()); + assertFalse(_serializedInvoker.isCurrentThreadInvoking()); } - public static class Task implements Runnable + public class Task implements Runnable { - CountDownLatch _run = new CountDownLatch(1); + final CountDownLatch _run = new CountDownLatch(1); boolean hasRun() { @@ -143,7 +157,17 @@ boolean hasRun() @Override public void run() { - _run.countDown(); + try + { + assertTrue(_serializedInvoker.isCurrentThreadInvoking()); + assertFalse(_executor.submit(() -> _serializedInvoker.isCurrentThreadInvoking()).get()); + + _run.countDown(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } } } From 694d0884989874f67110cb4d1bc27a39e47ecee1 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Wed, 28 Aug 2024 19:55:01 +0200 Subject: [PATCH 7/8] name all SerializedInvoker instances Signed-off-by: Ludovic Orban --- .../org/eclipse/jetty/client/transport/HttpReceiver.java | 2 +- .../src/main/java/org/eclipse/jetty/http/MultiPart.java | 2 +- .../java/org/eclipse/jetty/io/content/AsyncContent.java | 2 +- .../jetty/io/content/ByteBufferContentSource.java | 2 +- .../eclipse/jetty/io/content/ChunksContentSource.java | 2 +- .../jetty/io/content/ContentSourceTransformer.java | 2 +- .../jetty/io/content/InputStreamContentSource.java | 2 +- .../org/eclipse/jetty/io/content/PathContentSource.java | 2 +- .../jetty/io/internal/ByteChannelContentSource.java | 2 +- .../eclipse/jetty/server/internal/HttpChannelState.java | 9 +++++++-- .../eclipse/jetty/util/thread/SerializedExecutor.java | 2 +- 11 files changed, 17 insertions(+), 12 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index 33fa1aa8c210..1da157b97f22 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -67,7 +67,7 @@ public abstract class HttpReceiver { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(HttpReceiver.class); private final HttpChannel channel; private ResponseState responseState = ResponseState.IDLE; private NotifiableContentSource contentSource; diff --git a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java index 452cd394bbd5..fd9d89673f34 100644 --- a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java +++ b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java @@ -564,7 +564,7 @@ public String toString() public abstract static class AbstractContentSource implements Content.Source, Closeable { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(AbstractContentSource.class); private final Queue parts = new ArrayDeque<>(); private final String boundary; private final ByteBuffer firstBoundary; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java index 6bd5eeebc32a..02bc1d0cf6b2 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java @@ -50,7 +50,7 @@ public String toString() }; private final AutoLock.WithCondition lock = new AutoLock.WithCondition(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(AsyncContent.class); private final Queue chunks = new ArrayDeque<>(); private Content.Chunk persistentFailure; private boolean readClosed; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteBufferContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteBufferContentSource.java index 23a4e6fb97d3..6273cab9e76a 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteBufferContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ByteBufferContentSource.java @@ -31,7 +31,7 @@ public class ByteBufferContentSource implements Content.Source { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(ByteBufferContentSource.class); private final long length; private final Collection byteBuffers; private Iterator iterator; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java index a00680251cf4..b065efeb618b 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ChunksContentSource.java @@ -33,7 +33,7 @@ public class ChunksContentSource implements Content.Source { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(ChunksContentSource.class); private final long length; private final Collection chunks; private Iterator iterator; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java index 760e4f1f42d6..c4a6a9fe9c54 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/ContentSourceTransformer.java @@ -39,7 +39,7 @@ public abstract class ContentSourceTransformer implements Content.Source protected ContentSourceTransformer(Content.Source rawSource) { - this(rawSource, new SerializedInvoker()); + this(rawSource, new SerializedInvoker(ContentSourceTransformer.class)); } protected ContentSourceTransformer(Content.Source rawSource, SerializedInvoker invoker) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java index bc14ba9d9e95..dcce4d5c7ce2 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/InputStreamContentSource.java @@ -38,7 +38,7 @@ public class InputStreamContentSource implements Content.Source { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(InputStreamContentSource.class); private final InputStream inputStream; private ByteBufferPool.Sized bufferPool; private Runnable demandCallback; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java index f731280f737e..0481761f5b98 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/PathContentSource.java @@ -41,7 +41,7 @@ public class PathContentSource implements Content.Source // TODO in 12.1.x reimplement this class based on ByteChannelContentSource private final AutoLock lock = new AutoLock(); - private final SerializedInvoker invoker = new SerializedInvoker(); + private final SerializedInvoker invoker = new SerializedInvoker(PathContentSource.class); private final Path path; private final long length; private final ByteBufferPool byteBufferPool; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java index 65336ac6520d..c8713b2b39bf 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteChannelContentSource.java @@ -39,7 +39,7 @@ public class ByteChannelContentSource implements Content.Source { private final AutoLock lock = new AutoLock(); - private final SerializedInvoker _invoker = new SerializedInvoker(); + private final SerializedInvoker _invoker = new SerializedInvoker(ByteChannelContentSource.class); private final ByteBufferPool.Sized _byteBufferPool; private ByteChannel _byteChannel; private final long _offset; diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index af8ca07824ed..9ebb86ddac6e 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -128,8 +128,8 @@ public HttpChannelState(ConnectionMetaData connectionMetaData) { _connectionMetaData = connectionMetaData; // The SerializedInvoker is used to prevent infinite recursion of callbacks calling methods calling callbacks etc. - _readInvoker = new HttpChannelSerializedInvoker(); - _writeInvoker = new HttpChannelSerializedInvoker(); + _readInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "_readInvoker"); + _writeInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "_writeInvoker"); } @Override @@ -1812,6 +1812,11 @@ private void completing() private class HttpChannelSerializedInvoker extends SerializedInvoker { + public HttpChannelSerializedInvoker(String name) + { + super(name); + } + @Override protected void onError(Runnable task, Throwable failure) { diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedExecutor.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedExecutor.java index 5c09087c5226..9377d4cad70b 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedExecutor.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedExecutor.java @@ -27,7 +27,7 @@ */ public class SerializedExecutor implements Executor { - private final SerializedInvoker _invoker = new SerializedInvoker() + private final SerializedInvoker _invoker = new SerializedInvoker(SerializedExecutor.class) { @Override protected void onError(Runnable task, Throwable t) From 3e2791925c6d609e6cb6d6b96755527b1fe1fed6 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Wed, 28 Aug 2024 23:30:21 +0200 Subject: [PATCH 8/8] handle review comments Signed-off-by: Ludovic Orban --- .../eclipse/jetty/util/thread/SerializedInvoker.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java index e7ea03eb41ae..8da7c9861f1e 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SerializedInvoker.java @@ -152,8 +152,7 @@ public void run(Runnable task) Runnable todo = offer(task); if (todo != null) todo.run(); - else - if (LOG.isDebugEnabled()) + else if (LOG.isDebugEnabled()) LOG.debug("Queued link in {}", this); } @@ -167,8 +166,7 @@ public void run(Runnable... tasks) Runnable todo = offer(tasks); if (todo != null) todo.run(); - else - if (LOG.isDebugEnabled()) + else if (LOG.isDebugEnabled()) LOG.debug("Queued links in {}", this); } @@ -279,14 +277,14 @@ private class NamedRunnable implements Runnable private final String name; private final Throwable stack; - public NamedRunnable(Runnable delegate) + private NamedRunnable(Runnable delegate) { this.delegate = delegate; this.stack = new Throwable(); this.name = deriveTaskName(delegate, stack); } - protected String deriveTaskName(Runnable task, Throwable stack) + private String deriveTaskName(Runnable task, Throwable stack) { StackTraceElement[] stackTrace = stack.getStackTrace(); for (StackTraceElement stackTraceElement : stackTrace)