Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianfett committed Apr 12, 2021
1 parent d021910 commit d0a15da
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 67 deletions.
136 changes: 76 additions & 60 deletions Sources/AWSLambdaRuntimeCore/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -103,8 +103,7 @@ internal final class HTTPClient {
// https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
try channel.pipeline.syncOperations.addHandler(
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024))
try channel.pipeline.syncOperations.addHandler(
UnaryHandler(keepAlive: self.configuration.keepAlive))
try channel.pipeline.syncOperations.addHandler(UnaryHandler())
return channel.eventLoop.makeSucceededFuture(())
} catch {
return channel.eventLoop.makeFailedFuture(error)
Expand Down Expand Up @@ -139,10 +138,10 @@ internal final class HTTPClient {
}

internal struct Response: Equatable {
public var version: HTTPVersion
public var status: HTTPResponseStatus
public var headers: HTTPHeaders
public var body: ByteBuffer?
var version: HTTPVersion
var status: HTTPResponseStatus
var headers: HTTPHeaders
var body: ByteBuffer?
}

internal enum Errors: Error {
Expand All @@ -163,26 +162,29 @@ private final class UnaryHandler: ChannelDuplexHandler {
typealias OutboundIn = HTTPRequestWrapper
typealias OutboundOut = HTTPClientRequestPart

private let keepAlive: Bool
enum State {
case idle
case running(promise: EventLoopPromise<HTTPClient.Response>, timeout: Scheduled<Void>?)
case waitForConnectionClose(HTTPClient.Response, EventLoopPromise<HTTPClient.Response>)
}

private var pending: (promise: EventLoopPromise<HTTPClient.Response>, timeout: Scheduled<Void>?)?
private var state: State = .idle
private var lastError: Error?

init(keepAlive: Bool) {
self.keepAlive = keepAlive
}
init() {}

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
guard self.pending == nil else {
guard case .idle = self.state else {
preconditionFailure("invalid state, outstanding request")
}
let wrapper = unwrapOutboundIn(data)

var head = HTTPRequestHead(
version: .http1_1,
method: wrapper.request.method,
uri: wrapper.request.url,
headers: wrapper.request.headers)
headers: wrapper.request.headers
)
head.headers.add(name: "host", value: wrapper.request.targetHost)
switch head.method {
case .POST, .PUT:
Expand All @@ -191,29 +193,17 @@ private final class UnaryHandler: ChannelDuplexHandler {
break
}

// We don't add a "Connection" header here if we want to keep the connection open,
// HTTP/1.1 specified in RFC 7230, Section 6.3 Persistence:
//
// HTTP/1.1 defaults to the use of "persistent connections", allowing
// multiple requests and responses to be carried over a single
// connection. The "close" connection option is used to signal that a
// connection will not persist after the current request/response. HTTP
// implementations SHOULD support persistent connections.
//
// See also UnaryHandler.channelRead below.
if !self.keepAlive {
head.headers.add(name: "connection", value: "close")
}

let timeoutTask = wrapper.request.timeout.map {
context.eventLoop.scheduleTask(in: $0) {
if self.pending != nil {
context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout)
guard case .running = self.state else {
preconditionFailure("invalid state")
}

context.pipeline.fireErrorCaught(HTTPClient.Errors.timeout)
}
}
self.pending = (promise: wrapper.promise, timeout: timeoutTask)
self.state = .running(promise: wrapper.promise, timeout: timeoutTask)

context.write(wrapOutboundOut(.head(head)), promise: nil)
if let body = wrapper.request.body {
context.write(wrapOutboundOut(.body(IOData.byteBuffer(body))), promise: nil)
Expand All @@ -222,20 +212,21 @@ private final class UnaryHandler: ChannelDuplexHandler {
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
guard let pending = self.pending else {
guard case .running(let promise, let timeout) = self.state else {
preconditionFailure("invalid state, no pending request")
}

let response = unwrapInboundIn(data)

let httpResponse = HTTPClient.Response(
version: response.head.version,
status: response.head.status,
headers: response.head.headers,
body: response.body)

self.completeWith(.success(httpResponse))

body: response.body
)

timeout?.cancel()

// As defined in RFC 7230 Section 6.3:
// HTTP/1.1 defaults to the use of "persistent connections", allowing
// multiple requests and responses to be carried over a single
Expand All @@ -248,10 +239,27 @@ private final class UnaryHandler: ChannelDuplexHandler {
let serverCloseConnection =
response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })

if !self.keepAlive || serverCloseConnection || response.head.version != .http1_1 {
pending.promise.futureResult.whenComplete { _ in
_ = context.channel.close()
}
let closeConnection = serverCloseConnection || response.head.version != .http1_1

if closeConnection {
// If we were succeeding the request promise here directly and closing the connection
// after succeeding the promise we may run into a race condition:
//
// The lambda runtime will ask for the next work item directly after a succeeded post
// response request. The desire for the next work item might be faster than the attempt
// to close the connection. This will lead to a situation where we try to the connection
// but the next request has already been scheduled on the connection that we want to
// close. For this reason we postpone succeeding the promise until the connection has
// been closed. This codepath will only be hit in the very, very unlikely event of the
// Lambda control plane demanding to close connection. (It's more or less only
// implemented to support http1.1 correctly.) This behavior is ensured with the test
// `LambdaTest.testNoKeepAliveServer`.
self.state = .waitForConnectionClose(httpResponse, promise)
_ = context.channel.close()
return
} else {
self.state = .idle
promise.succeed(httpResponse)
}
}

Expand All @@ -263,36 +271,44 @@ private final class UnaryHandler: ChannelDuplexHandler {

func channelInactive(context: ChannelHandlerContext) {
// fail any pending responses with last error or assume peer disconnected
if self.pending != nil {
let error = self.lastError ?? HTTPClient.Errors.connectionResetByPeer
self.completeWith(.failure(error))
}
context.fireChannelInactive()

switch self.state {
case .idle:
break
case .running(let promise, let timeout):
self.state = .idle
timeout?.cancel()
promise.fail(self.lastError ?? HTTPClient.Errors.connectionResetByPeer)

case .waitForConnectionClose(let response, let promise):
self.state = .idle
promise.succeed(response)
}
}

func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
switch event {
case is RequestCancelEvent:
if self.pending != nil {
self.completeWith(.failure(HTTPClient.Errors.cancelled))
switch self.state {
case .idle:
break
case .running(let promise, let timeout):
self.state = .idle
timeout?.cancel()
promise.fail(HTTPClient.Errors.cancelled)

// after the cancel error has been send, we want to close the connection so
// that no more packets can be read on this connection.
_ = context.channel.close()
case .waitForConnectionClose(_, let promise):
self.state = .idle
promise.fail(HTTPClient.Errors.cancelled)
}
default:
context.triggerUserOutboundEvent(event, promise: promise)
}
}

private func completeWith(_ result: Result<HTTPClient.Response, Error>) {
guard let pending = self.pending else {
preconditionFailure("invalid state, no pending request")
}
self.pending = nil
self.lastError = nil
pending.timeout?.cancel()
pending.promise.completeWith(result)
}
}

private struct HTTPRequestWrapper {
Expand Down
6 changes: 2 additions & 4 deletions Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2020 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -64,7 +64,6 @@ extension Lambda {
struct RuntimeEngine: CustomStringConvertible {
let ip: String
let port: Int
let keepAlive: Bool
let requestTimeout: TimeAmount?

init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
Expand All @@ -74,12 +73,11 @@ extension Lambda {
}
self.ip = String(ipPort[0])
self.port = port
self.keepAlive = keepAlive ?? env("KEEP_ALIVE").flatMap(Bool.init) ?? true
self.requestTimeout = requestTimeout ?? env("REQUEST_TIMEOUT").flatMap(Int64.init).flatMap { .milliseconds($0) }
}

var description: String {
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), keepAlive: \(self.keepAlive), requestTimeout: \(String(describing: self.requestTimeout))"
"\(RuntimeEngine.self)(ip: \(self.ip), port: \(self.port), requestTimeout: \(String(describing: self.requestTimeout))"
}
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/AWSLambdaRuntimeCoreTests/MockLambdaServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down
2 changes: 1 addition & 1 deletion Tests/AWSLambdaRuntimeCoreTests/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down
2 changes: 1 addition & 1 deletion scripts/soundness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

function replace_acceptable_years() {
# this needs to replace all acceptable forms with 'YEARS'
sed -e 's/2017-2018/YEARS/' -e 's/2017-2020/YEARS/' -e 's/2019/YEARS/' -e 's/2020/YEARS/'
sed -e 's/2017-2018/YEARS/' -e 's/2017-2020/YEARS/' -e 's/2017-2021/YEARS/' -e 's/2019/YEARS/' -e 's/2020/YEARS/'
}

printf "=> Checking for unacceptable language... "
Expand Down

0 comments on commit d0a15da

Please sign in to comment.