Skip to content

Commit

Permalink
Be smarter about connection health checks
Browse files Browse the repository at this point in the history
Previously we didn't do any health checks at all until a
connection completed a single exchange. This was awkward
for HTTP/2, which could have multiple exchanges in-flight
before any were health checked.

We were also doing the awkward and expensive read timeout
check on all non-GET requests on pooled connections. Now
we only perform these checks if the connection was idle
for 10 seconds or more.

Closes: #5547
  • Loading branch information
squarejesse committed Feb 17, 2020
1 parent 1c03efd commit 19cb19a
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 61 deletions.
38 changes: 32 additions & 6 deletions okhttp/src/main/java/okhttp3/internal/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.io.InterruptedIOException
import java.net.InetSocketAddress
import java.net.ServerSocket
import java.net.Socket
import java.net.SocketTimeoutException
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets.UTF_16BE
import java.nio.charset.StandardCharsets.UTF_16LE
Expand Down Expand Up @@ -324,13 +325,13 @@ fun BufferedSource.readMedium(): Int {
*/
@Throws(IOException::class)
fun Source.skipAll(duration: Int, timeUnit: TimeUnit): Boolean {
val now = System.nanoTime()
val originalDuration = if (timeout().hasDeadline()) {
timeout().deadlineNanoTime() - now
val nowNs = System.nanoTime()
val originalDurationNs = if (timeout().hasDeadline()) {
timeout().deadlineNanoTime() - nowNs
} else {
Long.MAX_VALUE
}
timeout().deadlineNanoTime(now + minOf(originalDuration, timeUnit.toNanos(duration.toLong())))
timeout().deadlineNanoTime(nowNs + minOf(originalDurationNs, timeUnit.toNanos(duration.toLong())))
return try {
val skipBuffer = Buffer()
while (read(skipBuffer, 8192) != -1L) {
Expand All @@ -340,10 +341,10 @@ fun Source.skipAll(duration: Int, timeUnit: TimeUnit): Boolean {
} catch (_: InterruptedIOException) {
false // We ran out of time before exhausting the source.
} finally {
if (originalDuration == Long.MAX_VALUE) {
if (originalDurationNs == Long.MAX_VALUE) {
timeout().clearDeadline()
} else {
timeout().deadlineNanoTime(now + originalDuration)
timeout().deadlineNanoTime(nowNs + originalDurationNs)
}
}
}
Expand All @@ -364,6 +365,31 @@ fun Socket.peerName(): String {
return if (address is InetSocketAddress) address.hostName else address.toString()
}

/**
* Returns true if new reads and writes should be attempted on this.
*
* Unfortunately Java's networking APIs don't offer a good health check, so we go on our own by
* attempting to read with a short timeout. If the fails immediately we know the socket is
* unhealthy.
*
* @param source the source used to read bytes from the socket.
*/
fun Socket.isHealthy(source: BufferedSource): Boolean {
try {
val readTimeout = soTimeout
try {
soTimeout = 1
return !source.exhausted()
} finally {
soTimeout = readTimeout
}
} catch (_: SocketTimeoutException) {
return true // Read timed out; socket is good.
} catch (_: IOException) {
return false // Couldn't read; socket is closed.
}
}

/** Run [block] until it either throws an [IOException] or completes. */
inline fun ignoreIoExceptions(block: () -> Unit) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,7 @@ class ExchangeFinder(
connectionRetryEnabled = connectionRetryEnabled
)

// If this is a brand new connection, we can skip the extensive health checks.
synchronized(connectionPool) {
if (candidate.successCount == 0) {
return candidate
}
}

// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
// Confirm that the connection is good. If it isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges()
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class RealCall(
this.connection = null

if (released.calls.isEmpty()) {
released.idleAtNanos = System.nanoTime()
released.idleAtNs = System.nanoTime()
if (connectionPool.connectionBecameIdle(released)) {
return released.socket()
}
Expand Down
32 changes: 13 additions & 19 deletions okhttp/src/main/java/okhttp3/internal/connection/RealConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.net.ProtocolException
import java.net.Proxy
import java.net.Socket
import java.net.SocketException
import java.net.SocketTimeoutException
import java.net.UnknownServiceException
import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit.MILLISECONDS
Expand Down Expand Up @@ -59,6 +58,7 @@ import okhttp3.internal.http2.Http2ExchangeCodec
import okhttp3.internal.http2.Http2Stream
import okhttp3.internal.http2.Settings
import okhttp3.internal.http2.StreamResetException
import okhttp3.internal.isHealthy
import okhttp3.internal.platform.Platform
import okhttp3.internal.tls.OkHostnameVerifier
import okhttp3.internal.toHostHeader
Expand Down Expand Up @@ -123,8 +123,8 @@ class RealConnection(
/** Current calls carried by this connection. */
val calls = mutableListOf<Reference<RealCall>>()

/** Nanotime timestamp when `allocations.size()` reached zero. */
internal var idleAtNanos = Long.MAX_VALUE
/** Timestamp when `allocations.size()` reached zero. Also assigned upon initial connection. */
internal var idleAtNs = Long.MAX_VALUE

/**
* Returns true if this is an HTTP/2 connection. Such connections can be used in multiple HTTP
Expand Down Expand Up @@ -227,6 +227,8 @@ class RealConnection(
throw RouteException(ProtocolException(
"Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
}

idleAtNs = System.nanoTime()
}

/**
Expand Down Expand Up @@ -622,6 +624,8 @@ class RealConnection(

/** Returns true if this connection is ready to host new streams. */
fun isHealthy(doExtensiveChecks: Boolean): Boolean {
val nowNs = System.nanoTime()

val socket = this.socket!!
val source = this.source!!
if (socket.isClosed || socket.isInputShutdown || socket.isOutputShutdown) {
Expand All @@ -630,23 +634,12 @@ class RealConnection(

val http2Connection = this.http2Connection
if (http2Connection != null) {
return http2Connection.isHealthy(System.nanoTime())
return http2Connection.isHealthy(nowNs)
}

if (doExtensiveChecks) {
try {
val readTimeout = socket.soTimeout
try {
socket.soTimeout = 1
return !source.exhausted()
} finally {
socket.soTimeout = readTimeout
}
} catch (_: SocketTimeoutException) {
// Read timed out; socket is good.
} catch (_: IOException) {
return false // Couldn't read; socket is closed.
}
val idleDurationNs = nowNs - idleAtNs
if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) {
return socket.isHealthy(source)
}

return true
Expand Down Expand Up @@ -736,6 +729,7 @@ class RealConnection(
companion object {
private const val NPE_THROW_WITH_NULL = "throw with null exception"
private const val MAX_TUNNEL_ATTEMPTS = 21
internal const val IDLE_CONNECTION_HEALTHY_NS = 10_000_000_000 // 10 seconds.

fun newTestConnection(
connectionPool: RealConnectionPool,
Expand All @@ -745,7 +739,7 @@ class RealConnection(
): RealConnection {
val result = RealConnection(connectionPool, route)
result.socket = socket
result.idleAtNanos = idleAtNanos
result.idleAtNs = idleAtNanos
return result
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class RealConnectionPool(
idleConnectionCount++

// If the connection is ready to be evicted, we're done.
val idleDurationNs = now - connection.idleAtNanos
val idleDurationNs = now - connection.idleAtNs
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs
longestIdleConnection = connection
Expand Down Expand Up @@ -217,7 +217,7 @@ class RealConnectionPool(

// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs
connection.idleAtNs = now - keepAliveDurationNs
return 0
}
}
Expand Down
42 changes: 42 additions & 0 deletions okhttp/src/test/java/okhttp3/CallKotlinTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ import java.io.IOException
import java.security.cert.X509Certificate
import java.util.concurrent.TimeUnit
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.internal.connection.RealConnection
import okhttp3.internal.connection.RealConnection.Companion.IDLE_CONNECTION_HEALTHY_NS
import okhttp3.mockwebserver.MockResponse
import okhttp3.mockwebserver.MockWebServer
import okhttp3.mockwebserver.SocketPolicy
import okhttp3.testing.PlatformRule
import okhttp3.tls.internal.TlsUtil.localhost
import okio.BufferedSink
Expand Down Expand Up @@ -168,4 +172,42 @@ class CallKotlinTest {
recordedRequest = server.takeRequest()
assertEquals("HEAD", recordedRequest.method)
}

@Test
fun staleConnectionNotReusedForNonIdempotentRequest() {
// Capture the connection so that we can later make it stale.
var connection: RealConnection? = null
client = client.newBuilder()
.addNetworkInterceptor(object : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
connection = chain.connection() as RealConnection
return chain.proceed(chain.request())
}
})
.build()

server.enqueue(MockResponse().setBody("a")
.setSocketPolicy(SocketPolicy.SHUTDOWN_OUTPUT_AT_END))
server.enqueue(MockResponse().setBody("b"))

val requestA = Request.Builder()
.url(server.url("/"))
.build()
val responseA = client.newCall(requestA).execute()

assertThat(responseA.body!!.string()).isEqualTo("a")
assertThat(server.takeRequest().sequenceNumber).isEqualTo(0)

// Give the socket a chance to become stale.
connection!!.idleAtNs -= IDLE_CONNECTION_HEALTHY_NS
Thread.sleep(250)

val requestB = Request.Builder()
.url(server.url("/"))
.post("b".toRequestBody("text/plain".toMediaType()))
.build()
val responseB = client.newCall(requestB).execute()
assertThat(responseB.body!!.string()).isEqualTo("b")
assertThat(server.takeRequest().sequenceNumber).isEqualTo(0)
}
}
35 changes: 11 additions & 24 deletions okhttp/src/test/java/okhttp3/ConnectionReuseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ public final class ConnectionReuseTest {
assertConnectionReused(request, request);
}

@Test public void connectionsAreReusedForPosts() throws Exception {
server.enqueue(new MockResponse().setBody("a"));
server.enqueue(new MockResponse().setBody("b"));

Request request = new Request.Builder()
.url(server.url("/"))
.post(RequestBody.create("request body", MediaType.get("text/plain")))
.build();
assertConnectionReused(request, request);
}

@Test public void connectionsAreReusedWithHttp2() throws Exception {
enableHttp2();
server.enqueue(new MockResponse().setBody("a"));
Expand Down Expand Up @@ -178,30 +189,6 @@ public final class ConnectionReuseTest {
assertThat(server.takeRequest().getSequenceNumber()).isEqualTo(0);
}

@Test public void staleConnectionNotReusedForNonIdempotentRequest() throws Exception {
server.enqueue(new MockResponse().setBody("a")
.setSocketPolicy(SocketPolicy.SHUTDOWN_OUTPUT_AT_END));
server.enqueue(new MockResponse().setBody("b"));

Request requestA = new Request.Builder()
.url(server.url("/"))
.build();
Response responseA = client.newCall(requestA).execute();
assertThat(responseA.body().string()).isEqualTo("a");
assertThat(server.takeRequest().getSequenceNumber()).isEqualTo(0);

// Give the socket a chance to become stale.
Thread.sleep(250);

Request requestB = new Request.Builder()
.url(server.url("/"))
.post(RequestBody.create("b", MediaType.get("text/plain")))
.build();
Response responseB = client.newCall(requestB).execute();
assertThat(responseB.body().string()).isEqualTo("b");
assertThat(server.takeRequest().getSequenceNumber()).isEqualTo(0);
}

@Test public void http2ConnectionsAreSharedBeforeResponseIsConsumed() throws Exception {
enableHttp2();
server.enqueue(new MockResponse().setBody("a"));
Expand Down
19 changes: 19 additions & 0 deletions okhttp/src/test/java/okhttp3/internal/UtilTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/
package okhttp3.internal

import java.net.InetAddress
import java.net.ServerSocket
import java.net.Socket
import java.util.LinkedHashMap
import okio.buffer
import okio.source
import org.assertj.core.api.Assertions.assertThat
import org.junit.Assert.fail
import org.junit.Test
Expand All @@ -34,4 +39,18 @@ class UtilTest {
} catch (_: UnsupportedOperationException) {
}
}

@Test fun socketIsHealthy() {
val localhost = InetAddress.getLoopbackAddress()
val serverSocket = ServerSocket(0, 1, localhost)

val socket = Socket()
socket.connect(serverSocket.localSocketAddress)
val socketSource = socket.source().buffer()

assertThat(socket.isHealthy(socketSource)).isTrue()

serverSocket.close()
assertThat(socket.isHealthy(socketSource)).isFalse()
}
}

0 comments on commit 19cb19a

Please sign in to comment.