From 19cb19ab4ac31aa789bc94759d13898f64f93ce3 Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Mon, 17 Feb 2020 10:32:11 -0500 Subject: [PATCH] Be smarter about connection health checks Previously we didn't do any health checks at all until a connection completed a single exchange. This was awkward for HTTP/2, which could have multiple exchanges in-flight before any were health checked. We were also doing the awkward and expensive read timeout check on all non-GET requests on pooled connections. Now we only perform these checks if the connection was idle for 10 seconds or more. Closes: https://github.com/square/okhttp/issues/5547 --- okhttp/src/main/java/okhttp3/internal/Util.kt | 38 ++++++++++++++--- .../internal/connection/ExchangeFinder.kt | 10 +---- .../okhttp3/internal/connection/RealCall.kt | 2 +- .../internal/connection/RealConnection.kt | 32 ++++++-------- .../internal/connection/RealConnectionPool.kt | 4 +- .../src/test/java/okhttp3/CallKotlinTest.kt | 42 +++++++++++++++++++ .../java/okhttp3/ConnectionReuseTest.java | 35 +++++----------- .../test/java/okhttp3/internal/UtilTest.kt | 19 +++++++++ 8 files changed, 121 insertions(+), 61 deletions(-) diff --git a/okhttp/src/main/java/okhttp3/internal/Util.kt b/okhttp/src/main/java/okhttp3/internal/Util.kt index 9c9ce0a9ac3c..2e48b1668894 100644 --- a/okhttp/src/main/java/okhttp3/internal/Util.kt +++ b/okhttp/src/main/java/okhttp3/internal/Util.kt @@ -23,6 +23,7 @@ import java.io.InterruptedIOException import java.net.InetSocketAddress import java.net.ServerSocket import java.net.Socket +import java.net.SocketTimeoutException import java.nio.charset.Charset import java.nio.charset.StandardCharsets.UTF_16BE import java.nio.charset.StandardCharsets.UTF_16LE @@ -324,13 +325,13 @@ fun BufferedSource.readMedium(): Int { */ @Throws(IOException::class) fun Source.skipAll(duration: Int, timeUnit: TimeUnit): Boolean { - val now = System.nanoTime() - val originalDuration = if (timeout().hasDeadline()) { - timeout().deadlineNanoTime() - now + val nowNs = System.nanoTime() + val originalDurationNs = if (timeout().hasDeadline()) { + timeout().deadlineNanoTime() - nowNs } else { Long.MAX_VALUE } - timeout().deadlineNanoTime(now + minOf(originalDuration, timeUnit.toNanos(duration.toLong()))) + timeout().deadlineNanoTime(nowNs + minOf(originalDurationNs, timeUnit.toNanos(duration.toLong()))) return try { val skipBuffer = Buffer() while (read(skipBuffer, 8192) != -1L) { @@ -340,10 +341,10 @@ fun Source.skipAll(duration: Int, timeUnit: TimeUnit): Boolean { } catch (_: InterruptedIOException) { false // We ran out of time before exhausting the source. } finally { - if (originalDuration == Long.MAX_VALUE) { + if (originalDurationNs == Long.MAX_VALUE) { timeout().clearDeadline() } else { - timeout().deadlineNanoTime(now + originalDuration) + timeout().deadlineNanoTime(nowNs + originalDurationNs) } } } @@ -364,6 +365,31 @@ fun Socket.peerName(): String { return if (address is InetSocketAddress) address.hostName else address.toString() } +/** + * Returns true if new reads and writes should be attempted on this. + * + * Unfortunately Java's networking APIs don't offer a good health check, so we go on our own by + * attempting to read with a short timeout. If the fails immediately we know the socket is + * unhealthy. + * + * @param source the source used to read bytes from the socket. + */ +fun Socket.isHealthy(source: BufferedSource): Boolean { + try { + val readTimeout = soTimeout + try { + soTimeout = 1 + return !source.exhausted() + } finally { + soTimeout = readTimeout + } + } catch (_: SocketTimeoutException) { + return true // Read timed out; socket is good. + } catch (_: IOException) { + return false // Couldn't read; socket is closed. + } +} + /** Run [block] until it either throws an [IOException] or completes. */ inline fun ignoreIoExceptions(block: () -> Unit) { try { diff --git a/okhttp/src/main/java/okhttp3/internal/connection/ExchangeFinder.kt b/okhttp/src/main/java/okhttp3/internal/connection/ExchangeFinder.kt index 762e8202fe62..bf71ac2d6eef 100644 --- a/okhttp/src/main/java/okhttp3/internal/connection/ExchangeFinder.kt +++ b/okhttp/src/main/java/okhttp3/internal/connection/ExchangeFinder.kt @@ -112,15 +112,7 @@ class ExchangeFinder( connectionRetryEnabled = connectionRetryEnabled ) - // If this is a brand new connection, we can skip the extensive health checks. - synchronized(connectionPool) { - if (candidate.successCount == 0) { - return candidate - } - } - - // Do a (potentially slow) check to confirm that the pooled connection is still good. If it - // isn't, take it out of the pool and start again. + // Confirm that the connection is good. If it isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges() continue diff --git a/okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt b/okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt index fba2b2a8944f..f89d225fa318 100644 --- a/okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt +++ b/okhttp/src/main/java/okhttp3/internal/connection/RealCall.kt @@ -368,7 +368,7 @@ class RealCall( this.connection = null if (released.calls.isEmpty()) { - released.idleAtNanos = System.nanoTime() + released.idleAtNs = System.nanoTime() if (connectionPool.connectionBecameIdle(released)) { return released.socket() } diff --git a/okhttp/src/main/java/okhttp3/internal/connection/RealConnection.kt b/okhttp/src/main/java/okhttp3/internal/connection/RealConnection.kt index 12cfaeee904f..dece18642ad1 100644 --- a/okhttp/src/main/java/okhttp3/internal/connection/RealConnection.kt +++ b/okhttp/src/main/java/okhttp3/internal/connection/RealConnection.kt @@ -25,7 +25,6 @@ import java.net.ProtocolException import java.net.Proxy import java.net.Socket import java.net.SocketException -import java.net.SocketTimeoutException import java.net.UnknownServiceException import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit.MILLISECONDS @@ -59,6 +58,7 @@ import okhttp3.internal.http2.Http2ExchangeCodec import okhttp3.internal.http2.Http2Stream import okhttp3.internal.http2.Settings import okhttp3.internal.http2.StreamResetException +import okhttp3.internal.isHealthy import okhttp3.internal.platform.Platform import okhttp3.internal.tls.OkHostnameVerifier import okhttp3.internal.toHostHeader @@ -123,8 +123,8 @@ class RealConnection( /** Current calls carried by this connection. */ val calls = mutableListOf>() - /** Nanotime timestamp when `allocations.size()` reached zero. */ - internal var idleAtNanos = Long.MAX_VALUE + /** Timestamp when `allocations.size()` reached zero. Also assigned upon initial connection. */ + internal var idleAtNs = Long.MAX_VALUE /** * Returns true if this is an HTTP/2 connection. Such connections can be used in multiple HTTP @@ -227,6 +227,8 @@ class RealConnection( throw RouteException(ProtocolException( "Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS")) } + + idleAtNs = System.nanoTime() } /** @@ -622,6 +624,8 @@ class RealConnection( /** Returns true if this connection is ready to host new streams. */ fun isHealthy(doExtensiveChecks: Boolean): Boolean { + val nowNs = System.nanoTime() + val socket = this.socket!! val source = this.source!! if (socket.isClosed || socket.isInputShutdown || socket.isOutputShutdown) { @@ -630,23 +634,12 @@ class RealConnection( val http2Connection = this.http2Connection if (http2Connection != null) { - return http2Connection.isHealthy(System.nanoTime()) + return http2Connection.isHealthy(nowNs) } - if (doExtensiveChecks) { - try { - val readTimeout = socket.soTimeout - try { - socket.soTimeout = 1 - return !source.exhausted() - } finally { - socket.soTimeout = readTimeout - } - } catch (_: SocketTimeoutException) { - // Read timed out; socket is good. - } catch (_: IOException) { - return false // Couldn't read; socket is closed. - } + val idleDurationNs = nowNs - idleAtNs + if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) { + return socket.isHealthy(source) } return true @@ -736,6 +729,7 @@ class RealConnection( companion object { private const val NPE_THROW_WITH_NULL = "throw with null exception" private const val MAX_TUNNEL_ATTEMPTS = 21 + internal const val IDLE_CONNECTION_HEALTHY_NS = 10_000_000_000 // 10 seconds. fun newTestConnection( connectionPool: RealConnectionPool, @@ -745,7 +739,7 @@ class RealConnection( ): RealConnection { val result = RealConnection(connectionPool, route) result.socket = socket - result.idleAtNanos = idleAtNanos + result.idleAtNs = idleAtNanos return result } } diff --git a/okhttp/src/main/java/okhttp3/internal/connection/RealConnectionPool.kt b/okhttp/src/main/java/okhttp3/internal/connection/RealConnectionPool.kt index e0ab29a64001..7f49c0469044 100644 --- a/okhttp/src/main/java/okhttp3/internal/connection/RealConnectionPool.kt +++ b/okhttp/src/main/java/okhttp3/internal/connection/RealConnectionPool.kt @@ -153,7 +153,7 @@ class RealConnectionPool( idleConnectionCount++ // If the connection is ready to be evicted, we're done. - val idleDurationNs = now - connection.idleAtNanos + val idleDurationNs = now - connection.idleAtNs if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs longestIdleConnection = connection @@ -217,7 +217,7 @@ class RealConnectionPool( // If this was the last allocation, the connection is eligible for immediate eviction. if (references.isEmpty()) { - connection.idleAtNanos = now - keepAliveDurationNs + connection.idleAtNs = now - keepAliveDurationNs return 0 } } diff --git a/okhttp/src/test/java/okhttp3/CallKotlinTest.kt b/okhttp/src/test/java/okhttp3/CallKotlinTest.kt index 7b4096779df0..dfc80a96fb6f 100644 --- a/okhttp/src/test/java/okhttp3/CallKotlinTest.kt +++ b/okhttp/src/test/java/okhttp3/CallKotlinTest.kt @@ -19,8 +19,12 @@ import java.io.IOException import java.security.cert.X509Certificate import java.util.concurrent.TimeUnit import okhttp3.MediaType.Companion.toMediaType +import okhttp3.RequestBody.Companion.toRequestBody +import okhttp3.internal.connection.RealConnection +import okhttp3.internal.connection.RealConnection.Companion.IDLE_CONNECTION_HEALTHY_NS import okhttp3.mockwebserver.MockResponse import okhttp3.mockwebserver.MockWebServer +import okhttp3.mockwebserver.SocketPolicy import okhttp3.testing.PlatformRule import okhttp3.tls.internal.TlsUtil.localhost import okio.BufferedSink @@ -168,4 +172,42 @@ class CallKotlinTest { recordedRequest = server.takeRequest() assertEquals("HEAD", recordedRequest.method) } + + @Test + fun staleConnectionNotReusedForNonIdempotentRequest() { + // Capture the connection so that we can later make it stale. + var connection: RealConnection? = null + client = client.newBuilder() + .addNetworkInterceptor(object : Interceptor { + override fun intercept(chain: Interceptor.Chain): Response { + connection = chain.connection() as RealConnection + return chain.proceed(chain.request()) + } + }) + .build() + + server.enqueue(MockResponse().setBody("a") + .setSocketPolicy(SocketPolicy.SHUTDOWN_OUTPUT_AT_END)) + server.enqueue(MockResponse().setBody("b")) + + val requestA = Request.Builder() + .url(server.url("/")) + .build() + val responseA = client.newCall(requestA).execute() + + assertThat(responseA.body!!.string()).isEqualTo("a") + assertThat(server.takeRequest().sequenceNumber).isEqualTo(0) + + // Give the socket a chance to become stale. + connection!!.idleAtNs -= IDLE_CONNECTION_HEALTHY_NS + Thread.sleep(250) + + val requestB = Request.Builder() + .url(server.url("/")) + .post("b".toRequestBody("text/plain".toMediaType())) + .build() + val responseB = client.newCall(requestB).execute() + assertThat(responseB.body!!.string()).isEqualTo("b") + assertThat(server.takeRequest().sequenceNumber).isEqualTo(0) + } } diff --git a/okhttp/src/test/java/okhttp3/ConnectionReuseTest.java b/okhttp/src/test/java/okhttp3/ConnectionReuseTest.java index ea1a635f9e49..5c07f812a974 100644 --- a/okhttp/src/test/java/okhttp3/ConnectionReuseTest.java +++ b/okhttp/src/test/java/okhttp3/ConnectionReuseTest.java @@ -54,6 +54,17 @@ public final class ConnectionReuseTest { assertConnectionReused(request, request); } + @Test public void connectionsAreReusedForPosts() throws Exception { + server.enqueue(new MockResponse().setBody("a")); + server.enqueue(new MockResponse().setBody("b")); + + Request request = new Request.Builder() + .url(server.url("/")) + .post(RequestBody.create("request body", MediaType.get("text/plain"))) + .build(); + assertConnectionReused(request, request); + } + @Test public void connectionsAreReusedWithHttp2() throws Exception { enableHttp2(); server.enqueue(new MockResponse().setBody("a")); @@ -178,30 +189,6 @@ public final class ConnectionReuseTest { assertThat(server.takeRequest().getSequenceNumber()).isEqualTo(0); } - @Test public void staleConnectionNotReusedForNonIdempotentRequest() throws Exception { - server.enqueue(new MockResponse().setBody("a") - .setSocketPolicy(SocketPolicy.SHUTDOWN_OUTPUT_AT_END)); - server.enqueue(new MockResponse().setBody("b")); - - Request requestA = new Request.Builder() - .url(server.url("/")) - .build(); - Response responseA = client.newCall(requestA).execute(); - assertThat(responseA.body().string()).isEqualTo("a"); - assertThat(server.takeRequest().getSequenceNumber()).isEqualTo(0); - - // Give the socket a chance to become stale. - Thread.sleep(250); - - Request requestB = new Request.Builder() - .url(server.url("/")) - .post(RequestBody.create("b", MediaType.get("text/plain"))) - .build(); - Response responseB = client.newCall(requestB).execute(); - assertThat(responseB.body().string()).isEqualTo("b"); - assertThat(server.takeRequest().getSequenceNumber()).isEqualTo(0); - } - @Test public void http2ConnectionsAreSharedBeforeResponseIsConsumed() throws Exception { enableHttp2(); server.enqueue(new MockResponse().setBody("a")); diff --git a/okhttp/src/test/java/okhttp3/internal/UtilTest.kt b/okhttp/src/test/java/okhttp3/internal/UtilTest.kt index bde87b4c3cdc..60977ddfe2ed 100644 --- a/okhttp/src/test/java/okhttp3/internal/UtilTest.kt +++ b/okhttp/src/test/java/okhttp3/internal/UtilTest.kt @@ -15,7 +15,12 @@ */ package okhttp3.internal +import java.net.InetAddress +import java.net.ServerSocket +import java.net.Socket import java.util.LinkedHashMap +import okio.buffer +import okio.source import org.assertj.core.api.Assertions.assertThat import org.junit.Assert.fail import org.junit.Test @@ -34,4 +39,18 @@ class UtilTest { } catch (_: UnsupportedOperationException) { } } + + @Test fun socketIsHealthy() { + val localhost = InetAddress.getLoopbackAddress() + val serverSocket = ServerSocket(0, 1, localhost) + + val socket = Socket() + socket.connect(serverSocket.localSocketAddress) + val socketSource = socket.source().buffer() + + assertThat(socket.isHealthy(socketSource)).isTrue() + + serverSocket.close() + assertThat(socket.isHealthy(socketSource)).isFalse() + } }