From 3de8b68c799abc6faceee5881580204dddc46a5b Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Fri, 2 Oct 2020 11:58:30 -0400 Subject: [PATCH] Read the response even if writing the request fails (#6295) Closes: 1001 (cherry picked from commit 9533117948e81bd5298dd30de86d2ffdc90e9907) --- .../okhttp3/mockwebserver/MockResponse.kt | 3 +- .../okhttp3/mockwebserver/MockWebServer.kt | 24 +- .../okhttp3/mockwebserver/SocketPolicy.kt | 7 + .../okhttp3/internal/connection/Exchange.kt | 7 +- .../internal/http/CallServerInterceptor.kt | 167 ++++++----- .../internal/http1/Http1ExchangeCodec.kt | 4 +- .../internal/http2/Http2ExchangeCodec.kt | 3 +- .../okhttp3/internal/http2/Http2Stream.kt | 2 +- .../test/java/okhttp3/EventListenerTest.java | 2 +- .../okhttp3/ServerTruncatesRequestTest.kt | 270 ++++++++++++++++++ 10 files changed, 403 insertions(+), 86 deletions(-) create mode 100644 okhttp/src/test/java/okhttp3/ServerTruncatesRequestTest.kt diff --git a/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/MockResponse.kt b/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/MockResponse.kt index 1c2b4f58ac2b..e61479bcb569 100644 --- a/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/MockResponse.kt +++ b/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/MockResponse.kt @@ -60,7 +60,8 @@ class MockResponse : Cloneable { /** * Sets the [HTTP/2 error code](https://tools.ietf.org/html/rfc7540#section-7) to be * returned when resetting the stream. - * This is only valid with [SocketPolicy.RESET_STREAM_AT_START]. + * This is only valid with [SocketPolicy.RESET_STREAM_AT_START] and + * [SocketPolicy.DO_NOT_READ_REQUEST_BODY]. */ @set:JvmName("http2ErrorCode") var http2ErrorCode = -1 diff --git a/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/MockWebServer.kt b/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/MockWebServer.kt index f72a849ef65f..65622104cc9a 100644 --- a/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/MockWebServer.kt +++ b/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/MockWebServer.kt @@ -72,6 +72,7 @@ import okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AT_END import okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AT_START import okhttp3.mockwebserver.SocketPolicy.DISCONNECT_DURING_REQUEST_BODY import okhttp3.mockwebserver.SocketPolicy.DISCONNECT_DURING_RESPONSE_BODY +import okhttp3.mockwebserver.SocketPolicy.DO_NOT_READ_REQUEST_BODY import okhttp3.mockwebserver.SocketPolicy.EXPECT_CONTINUE import okhttp3.mockwebserver.SocketPolicy.FAIL_HANDSHAKE import okhttp3.mockwebserver.SocketPolicy.NO_RESPONSE @@ -627,7 +628,7 @@ class MockWebServer : ExternalResource(), Closeable { // See warnings associated with these socket policies in SocketPolicy. when (response.socketPolicy) { - DISCONNECT_AT_END -> { + DISCONNECT_AT_END, DO_NOT_READ_REQUEST_BODY -> { socket.close() return false } @@ -719,7 +720,9 @@ class MockWebServer : ExternalResource(), Closeable { var hasBody = false val policy = dispatcher.peek() - if (contentLength != -1L) { + if (policy.socketPolicy == DO_NOT_READ_REQUEST_BODY) { + // Ignore the body completely. + } else if (contentLength != -1L) { hasBody = contentLength > 0L throttledTransfer(policy, socket, source, requestBody.buffer(), contentLength, true) } else if (chunked) { @@ -966,7 +969,8 @@ class MockWebServer : ExternalResource(), Closeable { val response: MockResponse = dispatcher.dispatch(request) - if (response.socketPolicy === DISCONNECT_AFTER_REQUEST) { + val socketPolicy = response.socketPolicy + if (socketPolicy === DISCONNECT_AFTER_REQUEST) { socket.close() return } @@ -977,9 +981,15 @@ class MockWebServer : ExternalResource(), Closeable { "and responded: $response protocol is $protocol") } - if (response.socketPolicy === DISCONNECT_AT_END) { - val connection = stream.connection - connection.shutdown(ErrorCode.NO_ERROR) + when (socketPolicy) { + DISCONNECT_AT_END -> { + stream.connection.shutdown(ErrorCode.NO_ERROR) + } + DO_NOT_READ_REQUEST_BODY -> { + stream.close(ErrorCode.fromHttp2(response.http2ErrorCode)!!, null) + } + else -> { + } } } @@ -1019,7 +1029,7 @@ class MockWebServer : ExternalResource(), Closeable { val body = Buffer() val requestLine = "$method $path HTTP/1.1" var exception: IOException? = null - if (readBody && !peek.isDuplex) { + if (readBody && !peek.isDuplex && peek.socketPolicy !== DO_NOT_READ_REQUEST_BODY) { try { val contentLengthString = headers["content-length"] val byteCount = contentLengthString?.toLong() ?: Long.MAX_VALUE diff --git a/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/SocketPolicy.kt b/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/SocketPolicy.kt index 60f26e5fdcba..1a60b6e4d72e 100644 --- a/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/SocketPolicy.kt +++ b/mockwebserver/src/main/kotlin/okhttp3/mockwebserver/SocketPolicy.kt @@ -75,6 +75,13 @@ enum class SocketPolicy { /** Close connection after writing half of the response body (if present). */ DISCONNECT_DURING_RESPONSE_BODY, + /** + * Process the response without even attempting to reading the request body. For HTTP/2 this will + * send [MockResponse.getHttp2ErrorCode] after the response body or trailers. For HTTP/1 this will + * close the socket after the response body or trailers. + */ + DO_NOT_READ_REQUEST_BODY, + /** Don't trust the client during the SSL handshake. */ FAIL_HANDSHAKE, diff --git a/okhttp/src/main/kotlin/okhttp3/internal/connection/Exchange.kt b/okhttp/src/main/kotlin/okhttp3/internal/connection/Exchange.kt index ca93f27c4b4c..8d369627011e 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/connection/Exchange.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/connection/Exchange.kt @@ -43,10 +43,14 @@ class Exchange( internal val finder: ExchangeFinder, private val codec: ExchangeCodec ) { - /** Returns true if the request body need not complete before the response body starts. */ + /** True if the request body need not complete before the response body starts. */ internal var isDuplex: Boolean = false private set + /** True if there was an exception on the connection to the peer. */ + internal var hasFailure: Boolean = false + private set + internal val connection: RealConnection = codec.connection internal val isCoalescedConnection: Boolean @@ -163,6 +167,7 @@ class Exchange( } private fun trackFailure(e: IOException) { + hasFailure = true finder.trackFailure(e) codec.connection.trackFailure(call, e) } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http/CallServerInterceptor.kt b/okhttp/src/main/kotlin/okhttp3/internal/http/CallServerInterceptor.kt index 7fdf9feee170..6801154b35c7 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http/CallServerInterceptor.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http/CallServerInterceptor.kt @@ -20,6 +20,7 @@ import java.net.ProtocolException import okhttp3.Interceptor import okhttp3.Response import okhttp3.internal.EMPTY_RESPONSE +import okhttp3.internal.http2.ConnectionShutdownException import okio.buffer /** This is the last interceptor in the chain. It makes a network call to the server. */ @@ -33,98 +34,118 @@ class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor { val requestBody = request.body val sentRequestMillis = System.currentTimeMillis() - exchange.writeRequestHeaders(request) - var invokeStartEvent = true var responseBuilder: Response.Builder? = null - if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { - // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 - // Continue" response before transmitting the request body. If we don't get that, return - // what we did get (such as a 4xx response) without ever transmitting the request body. - if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { - exchange.flushRequest() - responseBuilder = exchange.readResponseHeaders(expectContinue = true) - exchange.responseHeadersStart() - invokeStartEvent = false - } - if (responseBuilder == null) { - if (requestBody.isDuplex()) { - // Prepare a duplex body so that the application can send a request body later. + var sendRequestException: IOException? = null + try { + exchange.writeRequestHeaders(request) + + if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { + // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 + // Continue" response before transmitting the request body. If we don't get that, return + // what we did get (such as a 4xx response) without ever transmitting the request body. + if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { exchange.flushRequest() - val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() - requestBody.writeTo(bufferedRequestBody) + responseBuilder = exchange.readResponseHeaders(expectContinue = true) + exchange.responseHeadersStart() + invokeStartEvent = false + } + if (responseBuilder == null) { + if (requestBody.isDuplex()) { + // Prepare a duplex body so that the application can send a request body later. + exchange.flushRequest() + val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() + requestBody.writeTo(bufferedRequestBody) + } else { + // Write the request body if the "Expect: 100-continue" expectation was met. + val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() + requestBody.writeTo(bufferedRequestBody) + bufferedRequestBody.close() + } } else { - // Write the request body if the "Expect: 100-continue" expectation was met. - val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() - requestBody.writeTo(bufferedRequestBody) - bufferedRequestBody.close() + exchange.noRequestBody() + if (!exchange.connection.isMultiplexed) { + // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection + // from being reused. Otherwise we're still obligated to transmit the request body to + // leave the connection in a consistent state. + exchange.noNewExchangesOnConnection() + } } } else { exchange.noRequestBody() - if (!exchange.connection.isMultiplexed) { - // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection - // from being reused. Otherwise we're still obligated to transmit the request body to - // leave the connection in a consistent state. - exchange.noNewExchangesOnConnection() - } } - } else { - exchange.noRequestBody() - } - if (requestBody == null || !requestBody.isDuplex()) { - exchange.finishRequest() - } - if (responseBuilder == null) { - responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! - if (invokeStartEvent) { - exchange.responseHeadersStart() - invokeStartEvent = false + if (requestBody == null || !requestBody.isDuplex()) { + exchange.finishRequest() + } + } catch (e: IOException) { + if (e is ConnectionShutdownException) { + throw e // No request was sent so there's no response to read. + } + if (!exchange.hasFailure) { + throw e // Don't attempt to read the response; we failed to send the request. } + sendRequestException = e } - var response = responseBuilder - .request(request) - .handshake(exchange.connection.handshake()) - .sentRequestAtMillis(sentRequestMillis) - .receivedResponseAtMillis(System.currentTimeMillis()) - .build() - var code = response.code - if (code == 100) { - // Server sent a 100-continue even though we did not request one. Try again to read the actual - // response status. - responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! - if (invokeStartEvent) { - exchange.responseHeadersStart() + + try { + if (responseBuilder == null) { + responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! + if (invokeStartEvent) { + exchange.responseHeadersStart() + invokeStartEvent = false + } } - response = responseBuilder + var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() - code = response.code - } + var code = response.code + if (code == 100) { + // Server sent a 100-continue even though we did not request one. Try again to read the + // actual response status. + responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! + if (invokeStartEvent) { + exchange.responseHeadersStart() + } + response = responseBuilder + .request(request) + .handshake(exchange.connection.handshake()) + .sentRequestAtMillis(sentRequestMillis) + .receivedResponseAtMillis(System.currentTimeMillis()) + .build() + code = response.code + } - exchange.responseHeadersEnd(response) + exchange.responseHeadersEnd(response) - response = if (forWebSocket && code == 101) { - // Connection is upgrading, but we need to ensure interceptors see a non-null response body. - response.newBuilder() - .body(EMPTY_RESPONSE) - .build() - } else { - response.newBuilder() - .body(exchange.openResponseBody(response)) - .build() - } - if ("close".equals(response.request.header("Connection"), ignoreCase = true) || - "close".equals(response.header("Connection"), ignoreCase = true)) { - exchange.noNewExchangesOnConnection() - } - if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) { - throw ProtocolException( - "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}") + response = if (forWebSocket && code == 101) { + // Connection is upgrading, but we need to ensure interceptors see a non-null response body. + response.newBuilder() + .body(EMPTY_RESPONSE) + .build() + } else { + response.newBuilder() + .body(exchange.openResponseBody(response)) + .build() + } + if ("close".equals(response.request.header("Connection"), ignoreCase = true) || + "close".equals(response.header("Connection"), ignoreCase = true)) { + exchange.noNewExchangesOnConnection() + } + if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) { + throw ProtocolException( + "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}") + } + return response + } catch (e: IOException) { + if (sendRequestException != null) { + sendRequestException.addSuppressed(e) + throw sendRequestException + } + throw e } - return response } } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http1/Http1ExchangeCodec.kt b/okhttp/src/main/kotlin/okhttp3/internal/http1/Http1ExchangeCodec.kt index 156085e14329..87f516a69a75 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http1/Http1ExchangeCodec.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http1/Http1ExchangeCodec.kt @@ -170,7 +170,9 @@ class Http1ExchangeCodec( } override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? { - check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) { + check(state == STATE_OPEN_REQUEST_BODY || + state == STATE_WRITING_REQUEST_BODY || + state == STATE_READ_RESPONSE_HEADERS) { "state: $state" } diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2ExchangeCodec.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2ExchangeCodec.kt index d0b6b20dee84..eddc78a61192 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2ExchangeCodec.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2ExchangeCodec.kt @@ -93,7 +93,8 @@ class Http2ExchangeCodec( } override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? { - val headers = stream!!.takeHeaders() + val stream = stream ?: throw IOException("stream wasn't created") + val headers = stream.takeHeaders() val responseBuilder = readHttp2HeadersList(headers, protocol) return if (expectContinue && responseBuilder.code == HTTP_CONTINUE) { null diff --git a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt index 73c261bfdd0a..e8a6d7499d10 100644 --- a/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -350,7 +350,7 @@ class Http2Stream internal constructor( synchronized(this@Http2Stream) { readTimeout.enter() try { - if (errorCode != null) { + if (errorCode != null && !finished) { // Prepare to deliver an error. errorExceptionToDeliver = errorException ?: StreamResetException(errorCode!!) } diff --git a/okhttp/src/test/java/okhttp3/EventListenerTest.java b/okhttp/src/test/java/okhttp3/EventListenerTest.java index afa1e9bdb172..72472012bded 100644 --- a/okhttp/src/test/java/okhttp3/EventListenerTest.java +++ b/okhttp/src/test/java/okhttp3/EventListenerTest.java @@ -1118,7 +1118,7 @@ private void writeChunk(BufferedSink sink) throws IOException { assertThat(listener.recordedEventTypes()).containsExactly( "CallStart", "ProxySelectStart", "ProxySelectEnd", "DnsStart", "DnsEnd", "ConnectStart", "ConnectEnd", "ConnectionAcquired", "RequestHeadersStart", "RequestHeadersEnd", - "RequestBodyStart", "RequestFailed", "ConnectionReleased", "CallFailed"); + "RequestBodyStart", "RequestFailed", "ResponseFailed", "ConnectionReleased", "CallFailed"); } @Test public void requestBodySuccessHttp1OverHttps() throws IOException { diff --git a/okhttp/src/test/java/okhttp3/ServerTruncatesRequestTest.kt b/okhttp/src/test/java/okhttp3/ServerTruncatesRequestTest.kt new file mode 100644 index 000000000000..170ea97142e3 --- /dev/null +++ b/okhttp/src/test/java/okhttp3/ServerTruncatesRequestTest.kt @@ -0,0 +1,270 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3 + +import okhttp3.Headers.Companion.headersOf +import okhttp3.internal.duplex.AsyncRequestBody +import okhttp3.internal.http2.ErrorCode +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import okhttp3.mockwebserver.SocketPolicy +import okhttp3.testing.PlatformRule +import okhttp3.tls.internal.TlsUtil.localhost +import okio.BufferedSink +import okio.IOException +import org.assertj.core.api.Assertions.assertThat +import org.junit.Assert.fail +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TestRule +import org.junit.rules.Timeout +import java.util.concurrent.TimeUnit + +class ServerTruncatesRequestTest { + @Rule @JvmField + val platform = PlatformRule() + + @Rule @JvmField + val timeout: TestRule = Timeout(30_000, TimeUnit.MILLISECONDS) + + @Rule @JvmField + val server = MockWebServer() + + @Rule @JvmField + var clientTestRule = OkHttpClientTestRule() + + private val listener = RecordingEventListener() + private val handshakeCertificates = localhost() + + private var client = clientTestRule.newClientBuilder() + .eventListenerFactory(clientTestRule.wrap(listener)) + .build() + + @Before fun setUp() { + platform.assumeNotOpenJSSE() + platform.assumeHttp2Support() + platform.assumeNotBouncyCastle() + } + + @Test fun serverTruncatesRequestOnLongPostHttp1() { + serverTruncatesRequestOnLongPost(https = false) + } + + @Test fun serverTruncatesRequestOnLongPostHttp2() { + enableProtocol(Protocol.HTTP_2) + serverTruncatesRequestOnLongPost(https = true) + } + + private fun serverTruncatesRequestOnLongPost(https: Boolean) { + server.enqueue(MockResponse() + .setSocketPolicy(SocketPolicy.DO_NOT_READ_REQUEST_BODY) + .setBody("abc") + .apply { this.http2ErrorCode = ErrorCode.NO_ERROR.httpCode }) + + val call = client.newCall(Request.Builder() + .url(server.url("/")) + .post(SlowRequestBody) + .build()) + + call.execute().use { response -> + assertThat(response.body!!.string()).isEqualTo("abc") + } + + val expectedEvents = mutableListOf() + // Start out with standard events... + expectedEvents += "CallStart" + expectedEvents += "ProxySelectStart" + expectedEvents += "ProxySelectEnd" + expectedEvents += "DnsStart" + expectedEvents += "DnsEnd" + expectedEvents += "ConnectStart" + if (https) { + expectedEvents += "SecureConnectStart" + expectedEvents += "SecureConnectEnd" + } + expectedEvents += "ConnectEnd" + expectedEvents += "ConnectionAcquired" + expectedEvents += "RequestHeadersStart" + expectedEvents += "RequestHeadersEnd" + expectedEvents += "RequestBodyStart" + // ... but we can read the response even after writing the request fails. + expectedEvents += "RequestFailed" + expectedEvents += "ResponseHeadersStart" + expectedEvents += "ResponseHeadersEnd" + expectedEvents += "ResponseBodyStart" + expectedEvents += "ResponseBodyEnd" + expectedEvents += "ConnectionReleased" + expectedEvents += "CallEnd" + assertThat(listener.recordedEventTypes()).isEqualTo(expectedEvents) + + // Confirm that the connection pool was not corrupted by making another call. + makeSimpleCall() + } + + /** + * If the server returns a full response, it doesn't really matter if the HTTP/2 stream is reset. + * Attempts to write the request body fails fast. + */ + @Test fun serverTruncatesRequestHttp2OnDuplexRequest() { + enableProtocol(Protocol.HTTP_2) + + server.enqueue(MockResponse() + .setSocketPolicy(SocketPolicy.DO_NOT_READ_REQUEST_BODY) + .setBody("abc") + .apply { this.http2ErrorCode = ErrorCode.NO_ERROR.httpCode }) + + val requestBody = AsyncRequestBody() + + val call = client.newCall(Request.Builder() + .url(server.url("/")) + .post(requestBody) + .build()) + + call.execute().use { response -> + assertThat(response.body!!.string()).isEqualTo("abc") + val requestBodyOut = requestBody.takeSink() + try { + SlowRequestBody.writeTo(requestBodyOut) + fail() + } catch (expected: IOException) { + } + try { + requestBodyOut.close() + fail() + } catch (expected: IOException) { + } + } + + // Confirm that the connection pool was not corrupted by making another call. + makeSimpleCall() + } + + @Test fun serverTruncatesRequestButTrailersCanStillBeReadHttp1() { + serverTruncatesRequestButTrailersCanStillBeRead(http2 = false) + } + + @Test fun serverTruncatesRequestButTrailersCanStillBeReadHttp2() { + enableProtocol(Protocol.HTTP_2) + serverTruncatesRequestButTrailersCanStillBeRead(http2 = true) + } + + private fun serverTruncatesRequestButTrailersCanStillBeRead(http2: Boolean) { + val mockResponse = MockResponse() + .setSocketPolicy(SocketPolicy.DO_NOT_READ_REQUEST_BODY) + .apply { + this.trailers = headersOf("caboose", "xyz") + this.http2ErrorCode = ErrorCode.NO_ERROR.httpCode + } + + // Trailers always work for HTTP/2, but only for chunked bodies in HTTP/1. + if (http2) { + mockResponse.setBody("abc") + } else { + mockResponse.setChunkedBody("abc", 1) + } + + server.enqueue(mockResponse) + + val call = client.newCall(Request.Builder() + .url(server.url("/")) + .post(SlowRequestBody) + .build()) + + call.execute().use { response -> + assertThat(response.body!!.string()).isEqualTo("abc") + assertThat(response.trailers()).isEqualTo(headersOf("caboose", "xyz")) + } + } + + @Test fun noAttemptToReadResponseIfLoadingRequestBodyIsSourceOfFailure() { + server.enqueue(MockResponse().setBody("abc")) + + val requestBody = object : RequestBody() { + override fun contentType(): MediaType? = null + + override fun writeTo(sink: BufferedSink) { + throw IOException("boom") // Despite this exception, 'sink' is healthy. + } + } + + val callA = client.newCall(Request.Builder() + .url(server.url("/")) + .post(requestBody) + .build()) + + try { + callA.execute() + fail() + } catch (expected: IOException) { + assertThat(expected).hasMessage("boom") + } + + assertThat(server.requestCount).isEqualTo(0) + + // Confirm that the connection pool was not corrupted by making another call. This doesn't use + // makeSimpleCall() because it uses the MockResponse enqueued above. + val callB = client.newCall(Request.Builder() + .url(server.url("/")) + .build()) + callB.execute().use { response -> + assertThat(response.body!!.string()).isEqualTo("abc") + } + } + + private fun makeSimpleCall() { + server.enqueue(MockResponse().setBody("healthy")) + val callB = client.newCall(Request.Builder() + .url(server.url("/")) + .build()) + callB.execute().use { response -> + assertThat(response.body!!.string()).isEqualTo("healthy") + } + } + + private fun enableProtocol(protocol: Protocol) { + enableTls() + client = client.newBuilder() + .protocols(listOf(protocol, Protocol.HTTP_1_1)) + .build() + server.protocols = client.protocols + } + + private fun enableTls() { + client = client.newBuilder() + .sslSocketFactory( + handshakeCertificates.sslSocketFactory(), + handshakeCertificates.trustManager + ) + .hostnameVerifier(RecordingHostnameVerifier()) + .build() + server.useHttps(handshakeCertificates.sslSocketFactory(), false) + } + + /** A request body that slowly trickles bytes, expecting to not complete. */ + private object SlowRequestBody : RequestBody() { + override fun contentType(): MediaType? = null + + override fun writeTo(sink: BufferedSink) { + for (i in 0 until 50) { + sink.writeUtf8("abc") + sink.flush() + Thread.sleep(100) + } + fail() + } + } +}