Skip to content

Commit

Permalink
Read the response even if writing the request fails (#6295) (#7453)
Browse files Browse the repository at this point in the history
Closes: 1001
(cherry picked from commit 9533117)

Co-authored-by: Jesse Wilson <jesse@swank.ca>
  • Loading branch information
yschimke and swankjesse authored Oct 16, 2022
1 parent cf088f8 commit e46a200
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class MockResponse : Cloneable {
/**
* Sets the [HTTP/2 error code](https://tools.ietf.org/html/rfc7540#section-7) to be
* returned when resetting the stream.
* This is only valid with [SocketPolicy.RESET_STREAM_AT_START].
* This is only valid with [SocketPolicy.RESET_STREAM_AT_START] and
* [SocketPolicy.DO_NOT_READ_REQUEST_BODY].
*/
@set:JvmName("http2ErrorCode")
var http2ErrorCode = -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AT_END
import okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AT_START
import okhttp3.mockwebserver.SocketPolicy.DISCONNECT_DURING_REQUEST_BODY
import okhttp3.mockwebserver.SocketPolicy.DISCONNECT_DURING_RESPONSE_BODY
import okhttp3.mockwebserver.SocketPolicy.DO_NOT_READ_REQUEST_BODY
import okhttp3.mockwebserver.SocketPolicy.EXPECT_CONTINUE
import okhttp3.mockwebserver.SocketPolicy.FAIL_HANDSHAKE
import okhttp3.mockwebserver.SocketPolicy.NO_RESPONSE
Expand Down Expand Up @@ -627,7 +628,7 @@ class MockWebServer : ExternalResource(), Closeable {

// See warnings associated with these socket policies in SocketPolicy.
when (response.socketPolicy) {
DISCONNECT_AT_END -> {
DISCONNECT_AT_END, DO_NOT_READ_REQUEST_BODY -> {
socket.close()
return false
}
Expand Down Expand Up @@ -719,7 +720,9 @@ class MockWebServer : ExternalResource(), Closeable {

var hasBody = false
val policy = dispatcher.peek()
if (contentLength != -1L) {
if (policy.socketPolicy == DO_NOT_READ_REQUEST_BODY) {
// Ignore the body completely.
} else if (contentLength != -1L) {
hasBody = contentLength > 0L
throttledTransfer(policy, socket, source, requestBody.buffer(), contentLength, true)
} else if (chunked) {
Expand Down Expand Up @@ -966,7 +969,8 @@ class MockWebServer : ExternalResource(), Closeable {

val response: MockResponse = dispatcher.dispatch(request)

if (response.socketPolicy === DISCONNECT_AFTER_REQUEST) {
val socketPolicy = response.socketPolicy
if (socketPolicy === DISCONNECT_AFTER_REQUEST) {
socket.close()
return
}
Expand All @@ -977,9 +981,15 @@ class MockWebServer : ExternalResource(), Closeable {
"and responded: $response protocol is $protocol")
}

if (response.socketPolicy === DISCONNECT_AT_END) {
val connection = stream.connection
connection.shutdown(ErrorCode.NO_ERROR)
when (socketPolicy) {
DISCONNECT_AT_END -> {
stream.connection.shutdown(ErrorCode.NO_ERROR)
}
DO_NOT_READ_REQUEST_BODY -> {
stream.close(ErrorCode.fromHttp2(response.http2ErrorCode)!!, null)
}
else -> {
}
}
}

Expand Down Expand Up @@ -1019,7 +1029,7 @@ class MockWebServer : ExternalResource(), Closeable {
val body = Buffer()
val requestLine = "$method $path HTTP/1.1"
var exception: IOException? = null
if (readBody && !peek.isDuplex) {
if (readBody && !peek.isDuplex && peek.socketPolicy !== DO_NOT_READ_REQUEST_BODY) {
try {
val contentLengthString = headers["content-length"]
val byteCount = contentLengthString?.toLong() ?: Long.MAX_VALUE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ enum class SocketPolicy {
/** Close connection after writing half of the response body (if present). */
DISCONNECT_DURING_RESPONSE_BODY,

/**
* Process the response without even attempting to reading the request body. For HTTP/2 this will
* send [MockResponse.getHttp2ErrorCode] after the response body or trailers. For HTTP/1 this will
* close the socket after the response body or trailers.
*/
DO_NOT_READ_REQUEST_BODY,

/** Don't trust the client during the SSL handshake. */
FAIL_HANDSHAKE,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ class Exchange(
internal val finder: ExchangeFinder,
private val codec: ExchangeCodec
) {
/** Returns true if the request body need not complete before the response body starts. */
/** True if the request body need not complete before the response body starts. */
internal var isDuplex: Boolean = false
private set

/** True if there was an exception on the connection to the peer. */
internal var hasFailure: Boolean = false
private set

internal val connection: RealConnection = codec.connection

internal val isCoalescedConnection: Boolean
Expand Down Expand Up @@ -163,6 +167,7 @@ class Exchange(
}

private fun trackFailure(e: IOException) {
hasFailure = true
finder.trackFailure(e)
codec.connection.trackFailure(call, e)
}
Expand Down
167 changes: 94 additions & 73 deletions okhttp/src/main/kotlin/okhttp3/internal/http/CallServerInterceptor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.net.ProtocolException
import okhttp3.Interceptor
import okhttp3.Response
import okhttp3.internal.EMPTY_RESPONSE
import okhttp3.internal.http2.ConnectionShutdownException
import okio.buffer

/** This is the last interceptor in the chain. It makes a network call to the server. */
Expand All @@ -33,98 +34,118 @@ class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()

exchange.writeRequestHeaders(request)

var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
var sendRequestException: IOException? = null
try {
exchange.writeRequestHeaders(request)

if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}

if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
} catch (e: IOException) {
if (e is ConnectionShutdownException) {
throw e // No request was sent so there's no response to read.
}
if (!exchange.hasFailure) {
throw e // Don't attempt to read the response; we failed to send the request.
}
sendRequestException = e
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()

try {
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
response = responseBuilder
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the
// actual response status.
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}

exchange.responseHeadersEnd(response)
exchange.responseHeadersEnd(response)

response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
} catch (e: IOException) {
if (sendRequestException != null) {
sendRequestException.addSuppressed(e)
throw sendRequestException
}
throw e
}
return response
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ class Http1ExchangeCodec(
}

override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) {
check(state == STATE_OPEN_REQUEST_BODY ||
state == STATE_WRITING_REQUEST_BODY ||
state == STATE_READ_RESPONSE_HEADERS) {
"state: $state"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class Http2ExchangeCodec(
}

override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
val headers = stream!!.takeHeaders()
val stream = stream ?: throw IOException("stream wasn't created")
val headers = stream.takeHeaders()
val responseBuilder = readHttp2HeadersList(headers, protocol)
return if (expectContinue && responseBuilder.code == HTTP_CONTINUE) {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class Http2Stream internal constructor(
synchronized(this@Http2Stream) {
readTimeout.enter()
try {
if (errorCode != null) {
if (errorCode != null && !finished) {
// Prepare to deliver an error.
errorExceptionToDeliver = errorException ?: StreamResetException(errorCode!!)
}
Expand Down
2 changes: 1 addition & 1 deletion okhttp/src/test/java/okhttp3/EventListenerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ private void writeChunk(BufferedSink sink) throws IOException {
assertThat(listener.recordedEventTypes()).containsExactly(
"CallStart", "ProxySelectStart", "ProxySelectEnd", "DnsStart", "DnsEnd", "ConnectStart",
"ConnectEnd", "ConnectionAcquired", "RequestHeadersStart", "RequestHeadersEnd",
"RequestBodyStart", "RequestFailed", "ConnectionReleased", "CallFailed");
"RequestBodyStart", "RequestFailed", "ResponseFailed", "ConnectionReleased", "CallFailed");
}

@Test public void requestBodySuccessHttp1OverHttps() throws IOException {
Expand Down
Loading

0 comments on commit e46a200

Please sign in to comment.