Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small Performance Improvements #199

Merged
merged 2 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let package = Package(
.library(name: "AWSLambdaTesting", targets: ["AWSLambdaTesting"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "2.17.0")),
.package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "2.26.0")),
.package(url: "https://github.com/apple/swift-log.git", .upToNextMajor(from: "1.0.0")),
.package(url: "https://github.com/swift-server/swift-backtrace.git", .upToNextMajor(from: "1.1.0")),
],
Expand Down
246 changes: 111 additions & 135 deletions Sources/AWSLambdaRuntimeCore/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -97,9 +97,16 @@ internal final class HTTPClient {
private func connect() -> EventLoopFuture<Channel> {
let bootstrap = ClientBootstrap(group: self.eventLoop)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.addHandlers([HTTPHandler(keepAlive: self.configuration.keepAlive),
UnaryHandler(keepAlive: self.configuration.keepAlive)])
do {
try channel.pipeline.syncOperations.addHTTPClientHandlers()
// Lambda quotas... An invocation payload is maximal 6MB in size:
// https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
try channel.pipeline.syncOperations.addHandler(
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024))
try channel.pipeline.syncOperations.addHandler(LambdaChannelHandler())
return channel.eventLoop.makeSucceededFuture(())
} catch {
return channel.eventLoop.makeFailedFuture(error)
}
}

Expand Down Expand Up @@ -131,10 +138,10 @@ internal final class HTTPClient {
}

internal struct Response: Equatable {
public var version: HTTPVersion
public var status: HTTPResponseStatus
public var headers: HTTPHeaders
public var body: ByteBuffer?
var version: HTTPVersion
var status: HTTPResponseStatus
var headers: HTTPHeaders
var body: ByteBuffer?
Comment on lines +141 to +144
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cleanup change. public access control in a internal struct doesn't make much sense.

}

internal enum Errors: Error {
Expand All @@ -149,133 +156,77 @@ internal final class HTTPClient {
}
}

private final class HTTPHandler: ChannelDuplexHandler {
typealias OutboundIn = HTTPClient.Request
typealias InboundOut = HTTPClient.Response
typealias InboundIn = HTTPClientResponsePart
// no need in locks since we validate only one request can run at a time
private final class LambdaChannelHandler: ChannelDuplexHandler {
typealias InboundIn = NIOHTTPClientResponseFull
typealias OutboundIn = HTTPRequestWrapper
typealias OutboundOut = HTTPClientRequestPart

private let keepAlive: Bool
private var readState: ReadState = .idle

init(keepAlive: Bool) {
self.keepAlive = keepAlive
}

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let request = unwrapOutboundIn(data)

var head = HTTPRequestHead(version: .init(major: 1, minor: 1), method: request.method, uri: request.url, headers: request.headers)
head.headers.add(name: "host", value: request.targetHost)
switch request.method {
case .POST, .PUT:
head.headers.add(name: "content-length", value: String(request.body?.readableBytes ?? 0))
default:
break
}

// We don't add a "Connection" header here if we want to keep the connection open,
// HTTP/1.1 defines specifies the following in RFC 2616, Section 8.1.2.1:
//
// An HTTP/1.1 server MAY assume that a HTTP/1.1 client intends to
// maintain a persistent connection unless a Connection header including
// the connection-token "close" was sent in the request. If the server
// chooses to close the connection immediately after sending the
// response, it SHOULD send a Connection header including the
// connection-token close.
//
// See also UnaryHandler.channelRead below.
if !self.keepAlive {
head.headers.add(name: "connection", value: "close")
}

context.write(self.wrapOutboundOut(HTTPClientRequestPart.head(head))).flatMap { _ -> EventLoopFuture<Void> in
if let body = request.body {
return context.writeAndFlush(self.wrapOutboundOut(HTTPClientRequestPart.body(.byteBuffer(body))))
} else {
context.flush()
return context.eventLoop.makeSucceededFuture(())
}
}.cascade(to: promise)
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let response = unwrapInboundIn(data)

switch response {
case .head(let head):
guard case .idle = self.readState else {
preconditionFailure("invalid read state \(self.readState)")
}
self.readState = .head(head)
case .body(var bodyPart):
switch self.readState {
case .head(let head):
self.readState = .body(head, bodyPart)
case .body(let head, var body):
body.writeBuffer(&bodyPart)
self.readState = .body(head, body)
Comment on lines -215 to -217
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a fairly large performance issue here... Since body's internal storage is referenced from self.readState and from body, we incur a CoW on every append to the local buffer. We could fix this by an easy CoW prevention like:

case .body(let head, var body):
    self.readState = .idle // removes the self.readState reference. Therefore no Cow necessary
    body.writeBuffer(&bodyPart)
    self.readState = .body(head, body)

However since October of last year NIO has a NIOHTTPClientResponseAggregator. I think we should use that instead. All the http write logic from HTTPHandler has been moved into UnaryHandler directly.

Copy link
Contributor

@tomerd tomerd Apr 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving to NIOHTTPClientResponseAggregator makes perfect sense. I wonder tho if moving all the extra functionality to UnaryHandler is overloading its intent - to be a unary handler. not sure the performance costs of having another handlers, if low, we can consider splitting that up, alternatively we can rename UnaryHandler to something more appropriate givens expanded role

default:
preconditionFailure("invalid read state \(self.readState)")
}
case .end:
switch self.readState {
case .head(let head):
context.fireChannelRead(wrapInboundOut(HTTPClient.Response(version: head.version, status: head.status, headers: head.headers, body: nil)))
self.readState = .idle
case .body(let head, let body):
context.fireChannelRead(wrapInboundOut(HTTPClient.Response(version: head.version, status: head.status, headers: head.headers, body: body)))
self.readState = .idle
default:
preconditionFailure("invalid read state \(self.readState)")
}
}
}

private enum ReadState {
enum State {
case idle
case head(HTTPResponseHead)
case body(HTTPResponseHead, ByteBuffer)
case running(promise: EventLoopPromise<HTTPClient.Response>, timeout: Scheduled<Void>?)
case waitForConnectionClose(HTTPClient.Response, EventLoopPromise<HTTPClient.Response>)
}
}

// no need in locks since we validate only one request can run at a time
private final class UnaryHandler: ChannelDuplexHandler {
typealias OutboundIn = HTTPRequestWrapper
typealias InboundIn = HTTPClient.Response
typealias OutboundOut = HTTPClient.Request

private let keepAlive: Bool

private var pending: (promise: EventLoopPromise<HTTPClient.Response>, timeout: Scheduled<Void>?)?
private var state: State = .idle
private var lastError: Error?

init(keepAlive: Bool) {
self.keepAlive = keepAlive
}
init() {}

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
guard self.pending == nil else {
guard case .idle = self.state else {
preconditionFailure("invalid state, outstanding request")
}
let wrapper = unwrapOutboundIn(data)

var head = HTTPRequestHead(
version: .http1_1,
method: wrapper.request.method,
uri: wrapper.request.url,
headers: wrapper.request.headers
)
head.headers.add(name: "host", value: wrapper.request.targetHost)
switch head.method {
case .POST, .PUT:
head.headers.add(name: "content-length", value: String(wrapper.request.body?.readableBytes ?? 0))
default:
break
}

let timeoutTask = wrapper.request.timeout.map {
context.eventLoop.scheduleTask(in: $0) {
if self.pending != nil {
context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout)
guard case .running = self.state else {
preconditionFailure("invalid state")
}

context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout)
}
}
self.pending = (promise: wrapper.promise, timeout: timeoutTask)
context.writeAndFlush(wrapOutboundOut(wrapper.request), promise: promise)
self.state = .running(promise: wrapper.promise, timeout: timeoutTask)

context.write(wrapOutboundOut(.head(head)), promise: nil)
if let body = wrapper.request.body {
context.write(wrapOutboundOut(.body(IOData.byteBuffer(body))), promise: nil)
}
context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: promise)
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let response = unwrapInboundIn(data)
guard let pending = self.pending else {
guard case .running(let promise, let timeout) = self.state else {
preconditionFailure("invalid state, no pending request")
}

let response = unwrapInboundIn(data)

let httpResponse = HTTPClient.Response(
version: response.head.version,
status: response.head.status,
headers: response.head.headers,
body: response.body
)

timeout?.cancel()

// As defined in RFC 7230 Section 6.3:
// HTTP/1.1 defaults to the use of "persistent connections", allowing
// multiple requests and responses to be carried over a single
Expand All @@ -285,14 +236,31 @@ private final class UnaryHandler: ChannelDuplexHandler {
//
// That's why we only assume the connection shall be closed if we receive
// a "connection = close" header.
let serverCloseConnection = response.headers.first(name: "connection")?.lowercased() == "close"

if !self.keepAlive || serverCloseConnection || response.version != .init(major: 1, minor: 1) {
pending.promise.futureResult.whenComplete { _ in
_ = context.channel.close()
}
let serverCloseConnection =
response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })

let closeConnection = serverCloseConnection || response.head.version != .http1_1

if closeConnection {
// If we were succeeding the request promise here directly and closing the connection
// after succeeding the promise we may run into a race condition:
//
// The lambda runtime will ask for the next work item directly after a succeeded post
// response request. The desire for the next work item might be faster than the attempt
// to close the connection. This will lead to a situation where we try to the connection
// but the next request has already been scheduled on the connection that we want to
// close. For this reason we postpone succeeding the promise until the connection has
// been closed. This codepath will only be hit in the very, very unlikely event of the
// Lambda control plane demanding to close connection. (It's more or less only
// implemented to support http1.1 correctly.) This behavior is ensured with the test
// `LambdaTest.testNoKeepAliveServer`.
self.state = .waitForConnectionClose(httpResponse, promise)
_ = context.channel.close()
return
} else {
self.state = .idle
promise.succeed(httpResponse)
}
self.completeWith(.success(response))
}

func errorCaught(context: ChannelHandlerContext, error: Error) {
Expand All @@ -303,36 +271,44 @@ private final class UnaryHandler: ChannelDuplexHandler {

func channelInactive(context: ChannelHandlerContext) {
// fail any pending responses with last error or assume peer disconnected
if self.pending != nil {
let error = self.lastError ?? HTTPClient.Errors.connectionResetByPeer
self.completeWith(.failure(error))
}
context.fireChannelInactive()

switch self.state {
case .idle:
break
case .running(let promise, let timeout):
self.state = .idle
timeout?.cancel()
promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)

case .waitForConnectionClose(let response, let promise):
self.state = .idle
promise.succeed(response)
}
}

func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
switch event {
case is RequestCancelEvent:
if self.pending != nil {
self.completeWith(.failure(HTTPClient.Errors.cancelled))
switch self.state {
case .idle:
break
case .running(let promise, let timeout):
self.state = .idle
timeout?.cancel()
promise.fail(HTTPClient.Errors.cancelled)

// after the cancel error has been send, we want to close the connection so
// that no more packets can be read on this connection.
_ = context.channel.close()
case .waitForConnectionClose(_, let promise):
self.state = .idle
promise.fail(HTTPClient.Errors.cancelled)
}
default:
context.triggerUserOutboundEvent(event, promise: promise)
}
}

private func completeWith(_ result: Result<HTTPClient.Response, Error>) {
guard let pending = self.pending else {
preconditionFailure("invalid state, no pending request")
}
self.pending = nil
self.lastError = nil
pending.timeout?.cancel()
pending.promise.completeWith(result)
}
}

private struct HTTPRequestWrapper {
Expand Down
6 changes: 2 additions & 4 deletions Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -64,7 +64,6 @@ extension Lambda {
struct RuntimeEngine: CustomStringConvertible {
let ip: String
let port: Int
let keepAlive: Bool
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the keepAlive option here, since neither tested nor used anywhere in the project. The HTTPClient should always prefer keep-alive but should be able to close a connection after a connection: close header. There is a test that ensures that the HTTPClient works correctly when a connection: close header is received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intent was to be able and force closed connection by sending that header from the client, but this is fine.

let requestTimeout: TimeAmount?

init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
Expand All @@ -74,12 +73,11 @@ extension Lambda {
}
self.ip = String(ipPort[0])
self.port = port
self.keepAlive = keepAlive ?? env("KEEP_ALIVE").flatMap(Bool.init) ?? true
self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) }
}

var description: String {
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), keepAlive: \(self.keepAlive), requestTimeout: \(String(describing: self.requestTimeout))"
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), requestTimeout: \(String(describing: self.requestTimeout))"
}
}

Expand Down
4 changes: 2 additions & 2 deletions Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -30,7 +30,7 @@ internal final class MockLambdaServer {
private var shutdown = false

public init(behavior: LambdaServerBehavior, host: String = "127.0.0.1", port: Int = 7000, keepAlive: Bool = true) {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes Xcodes debug view much easier to deal with! And we only need a single core for the MockServer anyway.

self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.behavior = behavior
self.host = host
self.port = port
Expand Down
Loading