Skip to content

Commit

Permalink
[Draft] Detached tasks (swift-server#334)
Browse files Browse the repository at this point in the history
* First prototype

* Fix build

* Removes task cancellation

swift-server#334 (comment)

* Force user to handle errors

swift-server#334 (comment)

* Remove EventLoop API

swift-server#334 (comment)

* Make DetachedTaskContainer internal

swift-server#334 (comment)
swift-server#334 (comment)

* Removes @unchecked Sendable

swift-server#334 (comment)

* Invoke awaitAll() from async context

* Fix ambiguous expression type for swift 5.7

* Fix visibility of detachedBackgroundTask

* Add swift-doc

* Add example usage to readme

* Add tests

---------

Co-authored-by: Sébastien Stormacq <sebastien.stormacq@gmail.com>
  • Loading branch information
Buratti and sebsto authored Aug 23, 2024
1 parent ed72bdd commit 79fa2c2
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 5 deletions.
95 changes: 95 additions & 0 deletions Sources/AWSLambdaRuntimeCore/DetachedTasks.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Foundation
import NIOConcurrencyHelpers
import NIOCore
import Logging

/// A container that allows tasks to finish after a synchronous invocation
/// has produced its response.
actor DetachedTasksContainer: Sendable {

struct Context: Sendable {
let eventLoop: EventLoop
let logger: Logger
}

private var context: Context
private var storage: [RegistrationKey: EventLoopFuture<Void>] = [:]

init(context: Context) {
self.context = context
}

/// Adds a detached async task.
///
/// - Parameters:
/// - name: The name of the task.
/// - task: The async task to execute.
/// - Returns: A `RegistrationKey` for the registered task.
func detached(task: @Sendable @escaping () async -> Void) {
let key = RegistrationKey()
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask(task)
let task = promise.futureResult.always { [weak self] result in
guard let self else { return }
Task {
await self.removeTask(forKey: key)
}
}
self.storage[key] = task
}

func removeTask(forKey key: RegistrationKey) {
self.storage.removeValue(forKey: key)
}

/// Awaits all registered tasks to complete.
///
/// - Returns: An `EventLoopFuture<Void>` that completes when all tasks have finished.
func awaitAll() -> EventLoopFuture<Void> {
let tasks = storage.values
if tasks.isEmpty {
return context.eventLoop.makeSucceededVoidFuture()
} else {
let context = context
return EventLoopFuture.andAllComplete(Array(tasks), on: context.eventLoop).flatMap { [weak self] in
guard let self else {
return context.eventLoop.makeSucceededFuture(())
}
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self.awaitAll().get()
}
return promise.futureResult
}
}
}
}

extension DetachedTasksContainer {
/// Lambda detached task registration key.
struct RegistrationKey: Hashable, CustomStringConvertible, Sendable {
var value: String

init() {
// UUID basically
self.value = UUID().uuidString
}

var description: String {
self.value
}
}
}
30 changes: 28 additions & 2 deletions Sources/AWSLambdaRuntimeCore/LambdaContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
let logger: Logger
let eventLoop: EventLoop
let allocator: ByteBufferAllocator
let tasks: DetachedTasksContainer

init(
requestID: String,
Expand All @@ -91,7 +92,8 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
clientContext: String?,
logger: Logger,
eventLoop: EventLoop,
allocator: ByteBufferAllocator
allocator: ByteBufferAllocator,
tasks: DetachedTasksContainer
) {
self.requestID = requestID
self.traceID = traceID
Expand All @@ -102,6 +104,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
self.logger = logger
self.eventLoop = eventLoop
self.allocator = allocator
self.tasks = tasks
}
}

Expand Down Expand Up @@ -177,7 +180,13 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
clientContext: clientContext,
logger: logger,
eventLoop: eventLoop,
allocator: allocator
allocator: allocator,
tasks: DetachedTasksContainer(
context: DetachedTasksContainer.Context(
eventLoop: eventLoop,
logger: logger
)
)
)
}

Expand All @@ -188,6 +197,23 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
let remaining = deadline - now
return .milliseconds(remaining)
}

var tasks: DetachedTasksContainer {
self.storage.tasks
}


/// Registers a background task that continues running after the synchronous invocation has completed.
/// This is useful for tasks like flushing metrics or performing clean-up operations without delaying the response.
///
/// - Parameter body: An asynchronous closure that performs the background task.
/// - Warning: You will be billed for the milliseconds of Lambda execution time until the very last
/// background task is finished.
public func detachedBackgroundTask(_ body: @escaping @Sendable () async -> ()) {
Task {
await self.tasks.detached(task: body)
}
}

public var debugDescription: String {
"\(Self.self)(requestID: \(self.requestID), traceID: \(self.traceID), invokedFunctionARN: \(self.invokedFunctionARN), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))"
Expand Down
18 changes: 16 additions & 2 deletions Sources/AWSLambdaRuntimeCore/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,27 @@ internal final class LambdaRunner {
if case .failure(let error) = result {
logger.warning("lambda handler returned an error: \(error)")
}
return (invocation, result)
return (invocation, result, context)
}
}.flatMap { invocation, result in
}.flatMap { invocation, result, context in
// 3. report results to runtime engine
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
logger.error("could not report results to lambda runtime engine: \(error)")
// To discuss:
// Do we want to await the tasks in this case?
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
return try await context.tasks.awaitAll().get()
}
return promise.futureResult
}.map { _ in context }
}
.flatMap { (context: LambdaContext) -> EventLoopFuture<Void> in
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await context.tasks.awaitAll().get()
}
return promise.futureResult
}
}

Expand Down
80 changes: 80 additions & 0 deletions Tests/AWSLambdaRuntimeCoreTests/DetachedTasksTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@testable import AWSLambdaRuntimeCore
import NIO
import XCTest
import Logging

class DetachedTasksTest: XCTestCase {

actor Expectation {
var isFulfilled = false
func fulfill() {
isFulfilled = true
}
}

func testAwaitTasks() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let context = DetachedTasksContainer.Context(
eventLoop: eventLoopGroup.next(),
logger: Logger(label: "test")
)
let expectation = Expectation()

let container = DetachedTasksContainer(context: context)
await container.detached {
try! await Task.sleep(for: .milliseconds(200))
await expectation.fulfill()
}

try await container.awaitAll().get()
let isFulfilled = await expectation.isFulfilled
XCTAssert(isFulfilled)
}

func testAwaitChildrenTasks() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let context = DetachedTasksContainer.Context(
eventLoop: eventLoopGroup.next(),
logger: Logger(label: "test")
)
let expectation1 = Expectation()
let expectation2 = Expectation()

let container = DetachedTasksContainer(context: context)
await container.detached {
await container.detached {
try! await Task.sleep(for: .milliseconds(300))
await expectation1.fulfill()
}
try! await Task.sleep(for: .milliseconds(200))
await container.detached {
try! await Task.sleep(for: .milliseconds(100))
await expectation2.fulfill()
}
}

try await container.awaitAll().get()
let isFulfilled1 = await expectation1.isFulfilled
let isFulfilled2 = await expectation2.isFulfilled
XCTAssert(isFulfilled1)
XCTAssert(isFulfilled2)
}
}
21 changes: 20 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public protocol SimpleLambdaHandler {

### Context

When calling the user provided Lambda function, the library provides a `LambdaContext` class that provides metadata about the execution context, as well as utilities for logging and allocating buffers.
When calling the user provided Lambda function, the library provides a `LambdaContext` class that provides metadata about the execution context, as well as utilities for logging, allocating buffers and dispatch background tasks.

```swift
public struct LambdaContext: CustomDebugStringConvertible, Sendable {
Expand Down Expand Up @@ -555,6 +555,25 @@ public struct LambdaInitializationContext: Sendable {
}
```

### Background tasks

The detachedBackgroundTask method allows you to register background tasks that continue running even after the Lambda runtime has reported the result of a synchronous invocation. This is particularly useful for integrations with services like API Gateway or CloudFront, where you can quickly return a response without waiting for non-essential tasks such as flushing metrics or performing non-critical clean-up operations.

```swift
@main
struct MyLambda: SimpleLambdaHandler {
func handle(_ request: APIGatewayV2Request, context: LambdaContext) async throws -> APIGatewayV2Response {
let response = makeResponse()
context.detachedBackgroundTask {
try? await Task.sleep(for: .seconds(3))
print("Background task completed")
}
print("Returning response")
return response
}
}
```

### Configuration

The library’s behavior can be fine tuned using environment variables based configuration. The library supported the following environment variables:
Expand Down

0 comments on commit 79fa2c2

Please sign in to comment.