Skip to content

Commit cec880a

Browse files
authored
fix: Use OkHttp EventLister instead of ConnectionListener for idle connection monitoring (#1312)
1 parent a11c511 commit cec880a

File tree

8 files changed

+458
-50
lines changed

8 files changed

+458
-50
lines changed

.brazil.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
"com.squareup.okhttp3:okhttp-coroutines:5.*": "OkHttp3Coroutines-5.x",
77
"com.squareup.okhttp3:okhttp:5.*": "OkHttp3-5.x",
8+
"com.squareup.okhttp3:okhttp-jvm:5.*": "OkHttp3-5.x",
89
"com.squareup.okio:okio-jvm:3.*": "OkioJvm-3.x",
910
"io.opentelemetry:opentelemetry-api:1.*": "Maven-io-opentelemetry_opentelemetry-api-1.x",
1011
"io.opentelemetry:opentelemetry-extension-kotlin:1.*": "Maven-io-opentelemetry_opentelemetry-extension-kotlin-1.x",
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "db001c20-3788-4cfe-9ec2-284fd86a80bd",
3+
"type": "bugfix",
4+
"description": "Reimplement idle connection monitoring using `okhttp3.EventListener` instead of now-internal `okhttp3.ConnectionListener`",
5+
"issues": [
6+
"https://github.com/smithy-lang/smithy-kotlin/issues/1311"
7+
]
8+
}

gradle/libs.versions.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ aws-kotlin-repo-tools-version = "0.4.31"
77
# libs
88
coroutines-version = "1.10.2"
99
atomicfu-version = "0.29.0"
10-
okhttp-version = "5.0.0-alpha.14"
10+
okhttp-version = "5.1.0"
1111
okhttp4-version = "4.12.0"
12-
okio-version = "3.9.1"
12+
okio-version = "3.15.0"
1313
otel-version = "1.45.0"
1414
slf4j-version = "2.0.16"
1515
slf4j-v1x-version = "1.7.36"

runtime/protocol/http-client-engines/http-client-engine-okhttp/api/http-client-engine-okhttp.api

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConf
8787
}
8888

8989
public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineKt {
90-
public static final fun buildClient (Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig;Laws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics;)Lokhttp3/OkHttpClient;
90+
public static final fun buildClient (Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig;Laws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics;[Lokhttp3/EventListener;)Lokhttp3/OkHttpClient;
9191
}
9292

9393
public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpHeadersAdapter : aws/smithy/kotlin/runtime/http/Headers {
Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@
44
*/
55
package aws.smithy.kotlin.runtime.http.engine.okhttp
66

7+
import aws.smithy.kotlin.runtime.io.Closeable
78
import aws.smithy.kotlin.runtime.telemetry.logging.logger
89
import kotlinx.coroutines.*
9-
import okhttp3.Call
10-
import okhttp3.Connection
11-
import okhttp3.ConnectionListener
12-
import okhttp3.ExperimentalOkHttpApi
10+
import okhttp3.*
1311
import okhttp3.internal.closeQuietly
1412
import okio.IOException
1513
import okio.buffer
@@ -22,12 +20,20 @@ import kotlin.coroutines.coroutineContext
2220
import kotlin.time.Duration
2321
import kotlin.time.measureTime
2422

25-
@OptIn(ExperimentalOkHttpApi::class)
26-
internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() {
23+
/**
24+
* An [okhttp3.EventListener] implementation that monitors connections for remote closure.
25+
* This replaces the functionality previously provided by the now-internal [okhttp3.ConnectionListener].
26+
*/
27+
internal class ConnectionMonitoringEventListener(private val pollInterval: Duration) :
28+
EventListener(),
29+
Closeable {
2730
private val monitorScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
2831
private val monitors = ConcurrentHashMap<Connection, Job>()
2932

30-
fun close(): Unit = runBlocking {
33+
/**
34+
* Close all active connection monitors.
35+
*/
36+
override fun close(): Unit = runBlocking {
3137
val monitorJob = requireNotNull(monitorScope.coroutineContext[Job]) {
3238
"Connection idle monitor scope cannot be cancelled because it does not have a job: $this"
3339
}
@@ -40,13 +46,16 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
4046
?.callContext
4147
?: Dispatchers.IO
4248

43-
override fun connectionAcquired(connection: Connection, call: Call) {
49+
// Cancel monitoring when a connection is acquired
50+
override fun connectionAcquired(call: Call, connection: Connection) {
51+
super.connectionAcquired(call, connection)
52+
4453
// Non-locking map access is okay here because this code will only execute synchronously as part of a
4554
// `connectionAcquired` event and will be complete before any future `connectionReleased` event could fire for
4655
// the same connection.
4756
monitors.remove(connection)?.let { monitor ->
4857
val context = call.callContext()
49-
val logger = context.logger<ConnectionIdleMonitor>()
58+
val logger = context.logger<ConnectionMonitoringEventListener>()
5059
logger.trace { "Cancel monitoring for $connection" }
5160

5261
// Use `runBlocking` because this _must_ finish before OkHttp goes to use the connection
@@ -58,13 +67,18 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
5867
}
5968
}
6069

61-
override fun connectionReleased(connection: Connection, call: Call) {
70+
// Start monitoring when a connection is released
71+
override fun connectionReleased(call: Call, connection: Connection) {
72+
super.connectionReleased(call, connection)
73+
6274
val connId = System.identityHashCode(connection)
6375
val callContext = call.callContext()
76+
77+
// Start monitoring
6478
val monitor = monitorScope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
6579
doMonitor(connection, callContext)
6680
}
67-
callContext.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }
81+
callContext.logger<ConnectionMonitoringEventListener>().trace { "Launched coroutine $monitor to monitor $connection" }
6882

6983
// Non-locking map access is okay here because this code will only execute synchronously as part of a
7084
// `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for
@@ -73,7 +87,7 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
7387
}
7488

7589
private suspend fun doMonitor(conn: Connection, callContext: CoroutineContext) {
76-
val logger = callContext.logger<ConnectionIdleMonitor>()
90+
val logger = callContext.logger<ConnectionMonitoringEventListener>()
7791

7892
val socket = conn.socket()
7993
val source = try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package aws.smithy.kotlin.runtime.http.engine.okhttp
6+
7+
import aws.smithy.kotlin.runtime.io.closeIfCloseable
8+
import okhttp3.*
9+
import java.io.IOException
10+
import java.net.InetAddress
11+
import java.net.InetSocketAddress
12+
import java.net.Proxy
13+
14+
/**
15+
* An [okhttp3.EventListener] that delegates to a chain of EventListeners.
16+
* Start events are sent in forward order, terminal events are sent in reverse order
17+
*/
18+
internal class EventListenerChain(
19+
private val listeners: List<EventListener>,
20+
) : EventListener() {
21+
private val reverseListeners = listeners.reversed()
22+
23+
fun close() {
24+
listeners.forEach {
25+
it.closeIfCloseable()
26+
}
27+
}
28+
29+
override fun callStart(call: Call): Unit =
30+
listeners.forEach { it.callStart(call) }
31+
32+
override fun dnsStart(call: Call, domainName: String): Unit =
33+
listeners.forEach { it.dnsStart(call, domainName) }
34+
35+
override fun dnsEnd(call: Call, domainName: String, inetAddressList: List<InetAddress>): Unit =
36+
reverseListeners.forEach { it.dnsEnd(call, domainName, inetAddressList) }
37+
38+
override fun proxySelectStart(call: Call, url: HttpUrl): Unit =
39+
listeners.forEach { it.proxySelectStart(call, url) }
40+
41+
override fun proxySelectEnd(call: Call, url: HttpUrl, proxies: List<Proxy>): Unit =
42+
reverseListeners.forEach { it.proxySelectEnd(call, url, proxies) }
43+
44+
override fun connectStart(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy): Unit =
45+
listeners.forEach { it.connectStart(call, inetSocketAddress, proxy) }
46+
47+
override fun secureConnectStart(call: Call): Unit =
48+
listeners.forEach { it.secureConnectStart(call) }
49+
50+
override fun secureConnectEnd(call: Call, handshake: Handshake?): Unit =
51+
reverseListeners.forEach { it.secureConnectEnd(call, handshake) }
52+
53+
override fun connectEnd(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy, protocol: Protocol?): Unit =
54+
reverseListeners.forEach { it.connectEnd(call, inetSocketAddress, proxy, protocol) }
55+
56+
override fun connectFailed(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy, protocol: Protocol?, ioe: IOException): Unit =
57+
reverseListeners.forEach { it.connectFailed(call, inetSocketAddress, proxy, protocol, ioe) }
58+
59+
override fun connectionAcquired(call: Call, connection: Connection): Unit =
60+
listeners.forEach { it.connectionAcquired(call, connection) }
61+
62+
override fun connectionReleased(call: Call, connection: Connection): Unit =
63+
reverseListeners.forEach { it.connectionReleased(call, connection) }
64+
65+
override fun requestHeadersStart(call: Call): Unit =
66+
listeners.forEach { it.requestHeadersStart(call) }
67+
68+
override fun requestHeadersEnd(call: Call, request: Request): Unit =
69+
reverseListeners.forEach { it.requestHeadersEnd(call, request) }
70+
71+
override fun requestBodyStart(call: Call): Unit =
72+
listeners.forEach { it.requestBodyStart(call) }
73+
74+
override fun requestBodyEnd(call: Call, byteCount: Long): Unit =
75+
reverseListeners.forEach { it.requestBodyEnd(call, byteCount) }
76+
77+
override fun requestFailed(call: Call, ioe: IOException): Unit =
78+
reverseListeners.forEach { it.requestFailed(call, ioe) }
79+
80+
override fun responseHeadersStart(call: Call): Unit =
81+
listeners.forEach { it.responseHeadersStart(call) }
82+
83+
override fun responseHeadersEnd(call: Call, response: Response): Unit =
84+
reverseListeners.forEach { it.responseHeadersEnd(call, response) }
85+
86+
override fun responseBodyStart(call: Call): Unit =
87+
listeners.forEach { it.responseBodyStart(call) }
88+
89+
override fun responseBodyEnd(call: Call, byteCount: Long): Unit =
90+
reverseListeners.forEach { it.responseBodyEnd(call, byteCount) }
91+
92+
override fun responseFailed(call: Call, ioe: IOException): Unit =
93+
reverseListeners.forEach { it.responseFailed(call, ioe) }
94+
95+
override fun callEnd(call: Call): Unit =
96+
reverseListeners.forEach { it.callEnd(call) }
97+
98+
override fun callFailed(call: Call, ioe: IOException): Unit =
99+
reverseListeners.forEach { it.callFailed(call, ioe) }
100+
101+
override fun canceled(call: Call): Unit =
102+
reverseListeners.forEach { it.canceled(call) }
103+
104+
override fun satisfactionFailure(call: Call, response: Response): Unit =
105+
reverseListeners.forEach { it.satisfactionFailure(call, response) }
106+
107+
override fun cacheConditionalHit(call: Call, cachedResponse: Response): Unit =
108+
listeners.forEach { it.cacheConditionalHit(call, cachedResponse) }
109+
110+
override fun cacheHit(call: Call, response: Response): Unit =
111+
listeners.forEach { it.cacheHit(call, response) }
112+
113+
override fun cacheMiss(call: Call): Unit =
114+
listeners.forEach { it.cacheMiss(call) }
115+
}

runtime/protocol/http-client-engines/http-client-engine-okhttp/jvm/src/aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngine.kt

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import aws.smithy.kotlin.runtime.http.config.EngineFactory
1111
import aws.smithy.kotlin.runtime.http.engine.*
1212
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
1313
import aws.smithy.kotlin.runtime.http.request.HttpRequest
14+
import aws.smithy.kotlin.runtime.io.closeIfCloseable
1415
import aws.smithy.kotlin.runtime.net.TlsVersion
1516
import aws.smithy.kotlin.runtime.operation.ExecutionContext
1617
import aws.smithy.kotlin.runtime.time.Instant
@@ -44,9 +45,14 @@ public class OkHttpEngine(
4445
override val engineConstructor: (OkHttpEngineConfig.Builder.() -> Unit) -> OkHttpEngine = ::invoke
4546
}
4647

48+
// Create a single shared connection monitoring listener if idle polling is enabled
49+
private val connectionMonitoringListener: EventListener? =
50+
config.connectionIdlePollingInterval?.let {
51+
ConnectionMonitoringEventListener(it)
52+
}
53+
4754
private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider)
48-
private val connectionIdleMonitor = config.connectionIdlePollingInterval?.let { ConnectionIdleMonitor(it) }
49-
private val client = config.buildClientWithConnectionListener(metrics, connectionIdleMonitor)
55+
private val client = config.buildClient(metrics, connectionMonitoringListener)
5056

5157
@OptIn(ExperimentalCoroutinesApi::class)
5258
override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall {
@@ -73,16 +79,20 @@ public class OkHttpEngine(
7379
}
7480

7581
override fun shutdown() {
76-
connectionIdleMonitor?.close()
82+
connectionMonitoringListener?.closeIfCloseable()
7783
client.connectionPool.evictAll()
7884
client.dispatcher.executorService.shutdown()
7985
metrics.close()
8086
}
8187
}
8288

83-
private fun OkHttpEngineConfig.buildClientFromConfig(
89+
/**
90+
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
91+
*/
92+
@InternalApi
93+
public fun OkHttpEngineConfig.buildClient(
8494
metrics: HttpClientMetrics,
85-
poolOverride: ConnectionPool? = null,
95+
vararg clientScopedEventListeners: EventListener?,
8696
): OkHttpClient {
8797
val config = this
8898

@@ -102,7 +112,7 @@ private fun OkHttpEngineConfig.buildClientFromConfig(
102112
writeTimeout(config.socketWriteTimeout.toJavaDuration())
103113

104114
// use our own pool configured with the timeout settings taken from config
105-
val pool = poolOverride ?: ConnectionPool(
115+
val pool = ConnectionPool(
106116
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
107117
keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds,
108118
TimeUnit.MILLISECONDS,
@@ -116,7 +126,14 @@ private fun OkHttpEngineConfig.buildClientFromConfig(
116126
dispatcher(dispatcher)
117127

118128
// Log events coming from okhttp. Allocate a new listener per-call to facilitate dedicated trace spans.
119-
eventListenerFactory { call -> HttpEngineEventListener(pool, config.hostResolver, dispatcher, metrics, call) }
129+
eventListenerFactory { call ->
130+
EventListenerChain(
131+
listOfNotNull(
132+
HttpEngineEventListener(pool, config.hostResolver, dispatcher, metrics, call),
133+
*clientScopedEventListeners,
134+
),
135+
)
136+
}
120137

121138
// map protocols
122139
if (config.tlsContext.alpn.isNotEmpty()) {
@@ -140,34 +157,6 @@ private fun OkHttpEngineConfig.buildClientFromConfig(
140157
}.build()
141158
}
142159

143-
/**
144-
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
145-
*/
146-
// Used by OkHttp4Engine - OkHttp4 does NOT have `connectionListener`
147-
// TODO - Refactor in next minor version - Move this to OkHttp4Engine and make it private
148-
@InternalApi
149-
public fun OkHttpEngineConfig.buildClient(
150-
metrics: HttpClientMetrics,
151-
): OkHttpClient = this.buildClientFromConfig(metrics)
152-
153-
/**
154-
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
155-
*/
156-
// Used by OkHttpEngine - OkHttp5 does have `connectionListener`
157-
@OptIn(ExperimentalOkHttpApi::class)
158-
private fun OkHttpEngineConfig.buildClientWithConnectionListener(
159-
metrics: HttpClientMetrics,
160-
connectionListener: ConnectionIdleMonitor?,
161-
): OkHttpClient = this.buildClientFromConfig(
162-
metrics,
163-
ConnectionPool(
164-
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
165-
keepAliveDuration = this.connectionIdleTimeout.inWholeMilliseconds,
166-
timeUnit = TimeUnit.MILLISECONDS,
167-
connectionListener = connectionListener ?: ConnectionListener.NONE,
168-
),
169-
)
170-
171160
private fun minTlsConnectionSpec(tlsContext: TlsContext): ConnectionSpec {
172161
val minVersion = tlsContext.minVersion ?: TlsVersion.TLS_1_2
173162
val okHttpTlsVersions = SdkTlsVersion

0 commit comments

Comments
 (0)