Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions Sources/ClientRuntime/Config/DefaultSDKRuntimeConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,13 @@ public extension DefaultSDKRuntimeConfiguration {
static func makeClient(
httpClientConfiguration: HttpClientConfiguration = defaultHttpClientConfiguration
) -> HTTPClient {
#if os(iOS) || os(tvOS) || os(watchOS) || os(visionOS) || os(macOS)
// TODO -- For testing,revert prior to merge to main
#if os(iOS) || os(tvOS) || os(watchOS) || os(visionOS)
return URLSessionHTTPClient(httpClientConfiguration: httpClientConfiguration)
#else
let connectTimeoutMs = httpClientConfiguration.connectTimeout.map { UInt32($0 * 1000) }
let socketTimeout = UInt32(httpClientConfiguration.socketTimeout)
let config = CRTClientEngineConfig(
maxConnectionsPerEndpoint: httpClientConfiguration.maxConnections,
telemetry: httpClientConfiguration.telemetry ?? CRTClientEngine.noOpCrtClientEngineTelemetry,
connectTimeoutMs: connectTimeoutMs,
crtTlsOptions: httpClientConfiguration.tlsConfiguration as? CRTClientTLSOptions,
socketTimeout: socketTimeout
)
return CRTClientEngine(config: config)
// TODO -- For testing, revert prior to merge to main
// Will be used on Mac and Linux
return NIOHTTPClient(httpClientConfiguration: httpClientConfiguration)
#endif
}

Expand Down
218 changes: 162 additions & 56 deletions Sources/ClientRuntime/Networking/Http/NIO/NIOHTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,25 @@

import AsyncHTTPClient
import NIOCore
import NIOHTTP1
import NIOPosix
import NIOSSL
import struct Smithy.Attributes
import struct Smithy.SwiftLogger
import protocol Smithy.LogAgent
import struct SmithyHTTPAPI.Headers
import struct SmithyHTTPAPI.Header
import protocol SmithyHTTPAPI.HTTPClient
import class SmithyHTTPAPI.HTTPResponse
import class SmithyHTTPAPI.HTTPRequest
import enum SmithyHTTPAPI.HTTPStatusCode
import enum SmithyHTTPAPI.HTTPMethodType
import protocol Smithy.ReadableStream
import enum Smithy.ByteStream
import class SmithyStreams.BufferedStream
import struct Foundation.Date
import struct Foundation.URLComponents
import struct Foundation.URLQueryItem
import AwsCommonRuntimeKit

/// AsyncHTTPClient-based HTTP client implementation that conforms to SmithyHTTPAPI.HTTPClient
Expand All @@ -47,97 +52,198 @@ public final class NIOHTTPClient: SmithyHTTPAPI.HTTPClient {
/// The client is created with its own internal `AsyncHTTPClient`, which is configured with system defaults.
/// - Parameters:
/// - httpClientConfiguration: The configuration to use for the client's `AsyncHTTPClient` setup.
/// - eventLoopGroup: The `EventLoopGroup` that the ``HTTPClient`` will use.
public init(
httpClientConfiguration: HttpClientConfiguration
) throws {
httpClientConfiguration: HttpClientConfiguration,
eventLoopGroup: (any NIOCore.EventLoopGroup)? = nil
) {
self.config = httpClientConfiguration
self.telemetry = httpClientConfiguration.telemetry ?? NIOHTTPClient.noOpNIOHTTPClientTelemetry
self.logger = self.telemetry.loggerProvider.getLogger(name: "NIOHTTPClient")
self.tlsConfiguration = httpClientConfiguration.tlsConfiguration as? NIOHTTPClientTLSOptions
self.allocator = ByteBufferAllocator()

var clientConfig = AsyncHTTPClient.HTTPClient.Configuration()
var clientConfig = AsyncHTTPClient.HTTPClient.Configuration.from(
httpClientConfiguration: httpClientConfiguration
)

// Configure TLS if options are provided
if let tlsOptions = tlsConfiguration {
clientConfig.tlsConfiguration = try tlsOptions.makeNIOSSLConfiguration()
do {
clientConfig.tlsConfiguration = try tlsOptions.makeNIOSSLConfiguration()
} catch {
// Log TLS configuration error but continue with default TLS settings
self.logger.error(
"Failed to configure TLS: \(String(describing: error)). Using default TLS configuration."
)
}
}

if let eventLoopGroup {
self.client = AsyncHTTPClient.HTTPClient(eventLoopGroup: eventLoopGroup, configuration: clientConfig)
} else {
self.client = AsyncHTTPClient.HTTPClient(configuration: clientConfig)
}
}

self.client = AsyncHTTPClient.HTTPClient(configuration: clientConfig)
deinit {
try? client.syncShutdown()
}

public func send(request: SmithyHTTPAPI.HTTPRequest) async throws -> SmithyHTTPAPI.HTTPResponse {
let telemetryContext = telemetry.contextManager.current()
let tracer = telemetry.tracerProvider.getTracer(
scope: telemetry.tracerScope
)
do {
// START - smithy.client.http.requests.queued_duration
let queuedStart = Date().timeIntervalSinceReferenceDate
let span = tracer.createSpan(
name: telemetry.spanName,
initialAttributes: telemetry.spanAttributes,
spanKind: SpanKind.internal,
parentContext: telemetryContext)
defer {
span.end()
}

// START - smithy.client.http.connections.acquire_duration
let acquireConnectionStart = Date().timeIntervalSinceReferenceDate
// START - smithy.client.http.requests.queued_duration
let queuedStart = Date().timeIntervalSinceReferenceDate
let span = tracer.createSpan(
name: telemetry.spanName,
initialAttributes: telemetry.spanAttributes,
spanKind: SpanKind.internal,
parentContext: telemetryContext)
defer {
span.end()
}

// TODO: Convert Smithy HTTPRequest to AsyncHTTPClient HTTPClientRequest
// START - smithy.client.http.connections.acquire_duration
let acquireConnectionStart = Date().timeIntervalSinceReferenceDate

// Convert Smithy HTTPRequest to AsyncHTTPClient HTTPClientRequest
let nioRequest = try await makeNIORequest(from: request)

let acquireConnectionEnd = Date().timeIntervalSinceReferenceDate
telemetry.connectionsAcquireDuration.record(
value: acquireConnectionEnd - acquireConnectionStart,
attributes: Attributes(),
context: telemetryContext)
// END - smithy.client.http.connections.acquire_duration

let queuedEnd = acquireConnectionEnd
telemetry.requestsQueuedDuration.record(
value: queuedEnd - queuedStart,
attributes: Attributes(),
context: telemetryContext)
// END - smithy.client.http.requests.queued_duration

// Update connection and request usage metrics
telemetry.updateHTTPMetricsUsage { httpMetricsUsage in
// TICK - smithy.client.http.connections.limit
// Note: AsyncHTTPClient doesn't expose connection pool configuration publicly
httpMetricsUsage.connectionsLimit = 0

// TICK - smithy.client.http.connections.usage
// Note: AsyncHTTPClient doesn't expose current connection counts
httpMetricsUsage.acquiredConnections = 0
httpMetricsUsage.idleConnections = 0

// TICK - smithy.client.http.requests.usage
httpMetricsUsage.inflightRequests = httpMetricsUsage.acquiredConnections
httpMetricsUsage.queuedRequests = httpMetricsUsage.idleConnections
}

let acquireConnectionEnd = Date().timeIntervalSinceReferenceDate
telemetry.connectionsAcquireDuration.record(
value: acquireConnectionEnd - acquireConnectionStart,
// DURATION - smithy.client.http.connections.uptime
let connectionUptimeStart = acquireConnectionEnd
defer {
telemetry.connectionsUptime.record(
value: Date().timeIntervalSinceReferenceDate - connectionUptimeStart,
attributes: Attributes(),
context: telemetryContext)
// END - smithy.client.http.connections.acquire_duration
}

let queuedEnd = acquireConnectionEnd
telemetry.requestsQueuedDuration.record(
value: queuedEnd - queuedStart,
attributes: Attributes(),
context: telemetryContext)
// END - smithy.client.http.requests.queued_duration
let httpMethod = request.method.rawValue
let url = request.destination.url
logger.debug("NIOHTTPClient(\(httpMethod) \(String(describing: url))) started")
logBodyDescription(request.body)

// TODO: Update connection and request usage metrics based on AsyncHTTPClient configuration
telemetry.updateHTTPMetricsUsage { httpMetricsUsage in
// TICK - smithy.client.http.connections.limit
httpMetricsUsage.connectionsLimit = 0 // TODO: Get from AsyncHTTPClient configuration
do {
let timeout: TimeAmount = .seconds(Int64(config.socketTimeout))
let nioResponse = try await client.execute(nioRequest, timeout: timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol is this really all it takes? Will it also do HTTP/2 bidi streaming out of the box for us?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup :) All integration tests are passing for me!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is performance? We should look at the tests where we run huge #s of S3 streaming ops at the same time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be doing performance testing for the whole implementation prior to merging the feature branch to main. So that answer is incoming! But I dont think we need to hold off on merging this to the feature branch until we get those numbers.


// Convert NIO response to Smithy HTTPResponse
let statusCode = HTTPStatusCode(rawValue: Int(nioResponse.status.code)) ?? .insufficientStorage
var headers = Headers()
for (name, value) in nioResponse.headers {
headers.add(name: name, value: value)
}

// TICK - smithy.client.http.connections.usage
httpMetricsUsage.acquiredConnections = 0 // TODO: Get from AsyncHTTPClient
httpMetricsUsage.idleConnections = 0 // TODO: Get from AsyncHTTPClient
let body = await NIOHTTPClientStreamBridge.convertResponseBody(from: nioResponse)

// TICK - smithy.client.http.requests.usage
httpMetricsUsage.inflightRequests = httpMetricsUsage.acquiredConnections
httpMetricsUsage.queuedRequests = httpMetricsUsage.idleConnections
}
let response = HTTPResponse(headers: headers, body: body, statusCode: statusCode)
logger.debug("NIOHTTPClient(\(httpMethod) \(String(describing: url))) succeeded")

// DURATION - smithy.client.http.connections.uptime
let connectionUptimeStart = acquireConnectionEnd
defer {
telemetry.connectionsUptime.record(
value: Date().timeIntervalSinceReferenceDate - connectionUptimeStart,
attributes: Attributes(),
context: telemetryContext)
return response
} catch {
let urlDescription = String(describing: url)
let errorDescription = String(describing: error)
logger.error("NIOHTTPClient(\(httpMethod) \(urlDescription)) failed with error: \(errorDescription)")
throw error
}
}

/// Create an AsyncHTTPClient request from a Smithy HTTPRequest
private func makeNIORequest(
from request: SmithyHTTPAPI.HTTPRequest
) async throws -> AsyncHTTPClient.HTTPClientRequest {
var components = URLComponents()
components.scheme = config.protocolType?.rawValue ?? request.destination.scheme.rawValue
components.host = request.endpoint.uri.host
components.port = port(for: request)
components.percentEncodedPath = request.destination.path
if let queryItems = request.queryItems, !queryItems.isEmpty {
components.percentEncodedQueryItems = queryItems.map {
URLQueryItem(name: $0.name, value: $0.value)
}
}
guard let url = components.url else { throw NIOHTTPClientError.incompleteHTTPRequest }

// TODO: Execute the HTTP request using AsyncHTTPClient
let method = NIOHTTP1.HTTPMethod(rawValue: request.method.rawValue)
var nioRequest = AsyncHTTPClient.HTTPClientRequest(url: url.absoluteString)
nioRequest.method = method

// TODO: Log body description
// request headers will replace default if the same value is present in both
for header in config.defaultHeaders.headers + request.headers.headers {
for value in header.value {
nioRequest.headers.replaceOrAdd(name: header.name, value: value)
}
}

// TODO: Handle response
// TODO: Record bytes sent during request body streaming with server address attributes
// TODO: Record bytes received during response streaming with server address attributes
nioRequest.body = try await NIOHTTPClientStreamBridge.convertRequestBody(
from: request.body,
allocator: allocator
)

// TODO: Convert NIO response to Smithy HTTPResponse
return nioRequest
}

return HTTPResponse() // TODO: Return actual response
} catch {
// TODO: Handle catch
private func port(for request: SmithyHTTPAPI.HTTPRequest) -> Int? {
switch (request.destination.scheme, request.destination.port) {
case (.https, 443), (.http, 80):
return nil
default:
return request.destination.port.map { Int($0) }
}
}

private func logBodyDescription(_ body: ByteStream) {
switch body {
case .stream(let stream):
let lengthString: String
if let length = stream.length {
lengthString = "\(length) bytes"
} else {
lengthString = "unknown length"
}
logger.debug("body is Stream (\(lengthString))")
case .data(let data):
if let data {
logger.debug("body is Data (\(data.count) bytes)")
} else {
logger.debug("body is empty")
}
case .noStream:
logger.debug("body is empty")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,11 @@
/// Errors that are particular to the NIO-based Smithy HTTP client.
public enum NIOHTTPClientError: Error {

/// A URL could not be formed from the `HTTPRequest`.
/// Please file a bug with aws-sdk-swift if you experience this error.
case incompleteHTTPRequest

/// An error occurred during streaming operations.
/// Please file a bug with aws-sdk-swift if you experience this error.
case streamingError(Error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,13 @@ final class NIOHTTPClientStreamBridge {
allocator: ByteBufferAllocator,
chunkSize: Int = CHUNK_SIZE_BYTES
) async throws -> AsyncHTTPClient.HTTPClientRequest.Body {
if let streamLength = stream.length {
let asyncSequence = StreamToAsyncSequence(stream: stream, allocator: allocator, chunkSize: chunkSize)
return .stream(asyncSequence, length: .known(Int64(streamLength)))
let asyncSequence = StreamToAsyncSequence(stream: stream, allocator: allocator, chunkSize: chunkSize)

// Use known length if available, unless the stream is eligible for chunked streaming.
if let length = stream.length, !stream.isEligibleForChunkedStreaming {
return .stream(asyncSequence, length: .known(Int64(length)))
} else {
do {
let data = try await stream.readToEndAsync()
if let data = data {
var buffer = allocator.buffer(capacity: data.count)
buffer.writeBytes(data)
return .bytes(buffer)
} else {
return .bytes(allocator.buffer(capacity: 0))
}
} catch {
throw NIOHTTPClientError.streamingError(error)
}
return .stream(asyncSequence, length: .unknown)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public enum NIOHTTPClientTLSError: Error, LocalizedError {
case .noCertificateFound(let path):
return "No certificate found at path: \(path)"
case .invalidPKCS12(let path, let underlying):
return "Failed to load PKCS#12 file at path: \(path). Error: \(underlying.localizedDescription)"
return "Failed to load PKCS#12 file at path: \(path). Error: \(String(describing: underlying))"
}
}
}
Loading