diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index 94d204fe5594..f90feaa12155 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -290,11 +290,11 @@ public String toString() { synchronized (this) { - return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h", + return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}", HttpExchange.class.getSimpleName(), hashCode(), - requestState, requestFailure, requestFailure, - responseState, responseFailure, responseFailure); + request, requestState, requestFailure, + response, responseState, responseFailure); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 271273887685..35df5596fdcc 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -187,7 +187,7 @@ protected boolean responseBegin(HttpExchange exchange) { handlerListener = protocolHandler.getResponseListener(); if (LOG.isDebugEnabled()) - LOG.debug("Found protocol handler {}", protocolHandler); + LOG.debug("Response {} found protocol handler {}", response, protocolHandler); } exchange.getConversation().updateResponseListeners(handlerListener); @@ -218,19 +218,8 @@ protected boolean responseBegin(HttpExchange exchange) */ protected boolean responseHeader(HttpExchange exchange, HttpField field) { - while (true) - { - ResponseState current = responseState.get(); - if (current == ResponseState.BEGIN || current == ResponseState.HEADER) - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break; - } - else - { - return false; - } - } + if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT)) + return false; HttpResponse response = exchange.getResponse(); ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); @@ -296,19 +285,8 @@ protected void storeCookie(URI uri, HttpField field) */ protected boolean responseHeaders(HttpExchange exchange) { - while (true) - { - ResponseState current = responseState.get(); - if (current == ResponseState.BEGIN || current == ResponseState.HEADER) - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break; - } - else - { - return false; - } - } + if (!updateResponseState(ResponseState.BEGIN, ResponseState.HEADER, ResponseState.TRANSIENT)) + return false; HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) @@ -342,7 +320,7 @@ protected boolean responseHeaders(HttpExchange exchange) { boolean hasDemand = hasDemandOrStall(); if (LOG.isDebugEnabled()) - LOG.debug("Response headers {}, hasDemand={}", response, hasDemand); + LOG.debug("Response headers hasDemand={} {}", hasDemand, response); return hasDemand; } @@ -363,71 +341,39 @@ protected boolean responseHeaders(HttpExchange exchange) */ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback) { - while (true) - { - ResponseState current = responseState.get(); - if (current == ResponseState.HEADERS || current == ResponseState.CONTENT) - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break; - } - else - { - callback.failed(new IllegalStateException("Invalid response state " + current)); - return false; - } - } - - boolean proceed = true; + if (LOG.isDebugEnabled()) + LOG.debug("Response content {}{}{}", exchange.getResponse(), System.lineSeparator(), BufferUtil.toDetailString(buffer)); if (demand() <= 0) { callback.failed(new IllegalStateException("No demand for response content")); - proceed = false; + return false; } + if (decoder == null) + return plainResponseContent(exchange, buffer, callback); + else + return decodeResponseContent(buffer, callback); + } - HttpResponse response = exchange.getResponse(); - if (proceed) + private boolean plainResponseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback) + { + if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT)) { - if (LOG.isDebugEnabled()) - LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer)); - if (contentListeners.isEmpty()) - { - callback.succeeded(); - } - else - { - if (decoder == null) - { - contentListeners.notifyContent(response, buffer, callback); - } - else - { - try - { - proceed = decoder.decode(buffer, callback); - } - catch (Throwable x) - { - callback.failed(x); - proceed = false; - } - } - } + callback.failed(new IllegalStateException("Invalid response state " + responseState)); + return false; } + HttpResponse response = exchange.getResponse(); + if (contentListeners.isEmpty()) + callback.succeeded(); + else + contentListeners.notifyContent(response, buffer, callback); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) { - if (proceed) - { - boolean hasDemand = hasDemandOrStall(); - if (LOG.isDebugEnabled()) - LOG.debug("Response content {}, hasDemand={}", response, hasDemand); - return hasDemand; - } - else - { - return false; - } + boolean hasDemand = hasDemandOrStall(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content {}, hasDemand={}", response, hasDemand); + return hasDemand; } dispose(); @@ -435,6 +381,11 @@ protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Call return false; } + private boolean decodeResponseContent(ByteBuffer buffer, Callback callback) + { + return decoder.decode(buffer, callback); + } + /** * Method to be invoked when the response is successful. *
@@ -614,15 +565,42 @@ public boolean abort(HttpExchange exchange, Throwable failure) } } + private boolean updateResponseState(ResponseState from1, ResponseState from2, ResponseState to) + { + while (true) + { + ResponseState current = responseState.get(); + if (current == from1 || current == from2) + { + if (updateResponseState(current, to)) + return true; + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("State update failed: [{},{}] -> {}: {}", from1, from2, to, current); + return false; + } + } + } + private boolean updateResponseState(ResponseState from, ResponseState to) { - boolean updated = responseState.compareAndSet(from, to); - if (!updated) + while (true) { - if (LOG.isDebugEnabled()) - LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get()); + ResponseState current = responseState.get(); + if (current == from) + { + if (responseState.compareAndSet(current, to)) + return true; + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("State update failed: {} -> {}: {}", from, to, current); + return false; + } } - return updated; } @Override @@ -778,14 +756,62 @@ private Decoder(HttpExchange exchange, ContentDecoder decoder) private boolean decode(ByteBuffer encoded, Callback callback) { + // Store the buffer to decode in case the + // decoding produces multiple decoded buffers. this.encoded = encoded; this.callback = callback; - return decode(); + + HttpResponse response = exchange.getResponse(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content decoding {} with {}{}{}", response, decoder, System.lineSeparator(), BufferUtil.toDetailString(encoded)); + + boolean needInput = decode(); + if (!needInput) + return false; + + boolean hasDemand = hasDemandOrStall(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content decoded, hasDemand={} {}", hasDemand, response); + return hasDemand; } private boolean decode() { while (true) + { + if (!updateResponseState(ResponseState.HEADERS, ResponseState.CONTENT, ResponseState.TRANSIENT)) + { + callback.failed(new IllegalStateException("Invalid response state " + responseState)); + return false; + } + + DecodeResult result = decodeChunk(); + + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) + { + if (result == DecodeResult.NEED_INPUT) + return true; + if (result == DecodeResult.ABORT) + return false; + + boolean hasDemand = hasDemandOrStall(); + if (LOG.isDebugEnabled()) + LOG.debug("Response content decoded chunk, hasDemand={} {}", hasDemand, exchange.getResponse()); + if (hasDemand) + continue; + else + return false; + } + + dispose(); + terminateResponse(exchange); + return false; + } + } + + private DecodeResult decodeChunk() + { + try { ByteBuffer buffer; while (true) @@ -798,27 +824,30 @@ private boolean decode() callback.succeeded(); encoded = null; callback = null; - return true; + return DecodeResult.NEED_INPUT; } } + ByteBuffer decoded = buffer; + HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded ({}) {}{}{}", decoder, exchange, System.lineSeparator(), BufferUtil.toDetailString(decoded)); + LOG.debug("Response content decoded chunk {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(decoded)); - contentListeners.notifyContent(exchange.getResponse(), decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); + contentListeners.notifyContent(response, decoded, Callback.from(() -> decoder.release(decoded), callback::failed)); - boolean hasDemand = hasDemandOrStall(); - if (LOG.isDebugEnabled()) - LOG.debug("Response content decoded {}, hasDemand={}", exchange, hasDemand); - if (!hasDemand) - return false; + return DecodeResult.DECODE; + } + catch (Throwable x) + { + callback.failed(x); + return DecodeResult.ABORT; } } private void resume() { if (LOG.isDebugEnabled()) - LOG.debug("Response content resuming decoding {}", exchange); + LOG.debug("Response content resume decoding {} with {}", exchange.getResponse(), decoder); // The content and callback may be null // if there is no initial content demand. @@ -828,40 +857,9 @@ private void resume() return; } - while (true) - { - ResponseState current = responseState.get(); - if (current == ResponseState.HEADERS || current == ResponseState.CONTENT) - { - if (updateResponseState(current, ResponseState.TRANSIENT)) - break; - } - else - { - callback.failed(new IllegalStateException("Invalid response state " + current)); - return; - } - } - - boolean decoded = false; - try - { - decoded = decode(); - } - catch (Throwable x) - { - callback.failed(x); - } - - if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) - { - if (decoded) - receive(); - return; - } - - dispose(); - terminateResponse(exchange); + boolean needInput = decode(); + if (needInput) + receive(); } @Override @@ -871,4 +869,9 @@ public void destroy() ((Destroyable)decoder).destroy(); } } + + private enum DecodeResult + { + DECODE, NEED_INPUT, ABORT + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java index e29f0a3b573a..3120a97619d9 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientGZIPTest.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -32,11 +33,14 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.util.InputStreamResponseListener; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.IO; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -266,6 +270,47 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r assertThat(heapMemory, lessThan((long)content.length)); } + @ParameterizedTest + @ArgumentsSource(ScenarioProvider.class) + public void testLargeGZIPContentAsync(Scenario scenario) throws Exception + { + String digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + Random random = new Random(); + byte[] content = new byte[32 * 1024 * 1024]; + for (int i = 0; i < content.length; ++i) + { + content[i] = (byte)digits.charAt(random.nextInt(digits.length())); + } + start(scenario, new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.setContentType("text/plain;charset=" + StandardCharsets.US_ASCII.name()); + response.setHeader(HttpHeader.CONTENT_ENCODING.asString(), "gzip"); + GZIPOutputStream gzip = new GZIPOutputStream(response.getOutputStream()); + gzip.write(content); + gzip.finish(); + } + }); + + InputStreamResponseListener listener = new InputStreamResponseListener(); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scenario.getScheme()) + .timeout(5, TimeUnit.SECONDS) + .send(listener); + + Response response = listener.get(5, TimeUnit.SECONDS); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + try (InputStream input = listener.getInputStream()) + { + IO.copy(input, output); + } + assertArrayEquals(content, output.toByteArray()); + } + private static void sleep(long ms) throws IOException { try