Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventLoop APIs for simpler scheduling of callbacks #2759

Merged
merged 43 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5f7065f
benchmarks: Add benchmark for MTELG.scheduleTask(in:_:)
simonjbeaumont Jun 28, 2024
33e82e0
api: Add NIOTimer, NIOTimerHandler, and EventLoop.setTimer(for:_:)
simonjbeaumont Jun 28, 2024
b59e31d
benchmarks: Add benchmark for MTELG.setTimer(for:_:)
simonjbeaumont Jun 28, 2024
17cf105
internal: Add NIOCustomTimerImplementation conformance to SelectableE…
simonjbeaumont Jun 28, 2024
0afb3a2
test: Add Linux pre-5.9.2 backport for fulfillment(of:timeout:enforce…
simonjbeaumont Jul 1, 2024
df3cdb1
test: Increase timer used in shutdown test
simonjbeaumont Jul 1, 2024
b0d3f66
feedback: Make NIOTimer Sendable
simonjbeaumont Jul 5, 2024
907c087
feedback: Rename timerFired(loop:) to timerFired(eventLoop:)
simonjbeaumont Jul 8, 2024
b3e4903
feedback(attempted): Store a closure instead of UInt64
simonjbeaumont Jul 8, 2024
fb5e835
feedback(reverted, allocates): Store a closure instead of UInt64
simonjbeaumont Jul 8, 2024
89c57ec
feedback(unsure): Replace extra protocol with runtime checks
simonjbeaumont Jul 8, 2024
59a04de
feedback(unsure): Generic timerFired protocol witness
simonjbeaumont Jul 8, 2024
06a4ce7
feedback(unsure): Make setTimer generic over the handler
simonjbeaumont Jul 8, 2024
950db0c
feedback: Use labelled parameter for handler
simonjbeaumont Jul 8, 2024
d6ae472
feedback: Use separate prepositions for TimeAmount and NIODeadline APIs
simonjbeaumont Jul 8, 2024
4439ac6
Remove DocC disambiguation for now until API is decided
simonjbeaumont Jul 8, 2024
abd4b28
feedback: Add documentation to NIOTimerHandler.timerFired protocol re…
simonjbeaumont Jul 8, 2024
ceabc7b
feedback: Change API terms from setTimer to scheduleCallback
simonjbeaumont Jul 8, 2024
80d91f6
feedback: Local variable rename: taskId -> taskID
simonjbeaumont Jul 10, 2024
2ec5543
feedback: Reanme NIOScheduledCallbackHandler.onSchedule to handleSche…
simonjbeaumont Jul 10, 2024
5d6ac17
feedback: Update protocol requirement documentation comments to use D…
simonjbeaumont Jul 10, 2024
bbf8f9f
feedback: Remove explicit benchmark.stopMeasurement calls
simonjbeaumont Jul 11, 2024
21fd1cc
feedback: Remove TODO following discussion about internal inits
simonjbeaumont Jul 11, 2024
9bf65f6
feedback: Make scheduleCallback throwing
simonjbeaumont Jul 11, 2024
dbba9e3
feedback: Add a missing explicit self
simonjbeaumont Jul 11, 2024
86b8ac3
Merge remote-tracking branch 'upstream/main' into sb/timer-api
simonjbeaumont Jul 12, 2024
b16a575
Remove broken DocC disambiguatoin and fix test calls
simonjbeaumont Jul 12, 2024
4e71ffb
Update implementation comments in EmbeddedEventLoop and AsyncTestingE…
simonjbeaumont Jul 15, 2024
b909365
feedback: Fix preconditionFailure message
simonjbeaumont Jul 15, 2024
5559772
feedback: Move SheduledTask.Kind and kind above other properties
simonjbeaumont Jul 15, 2024
7f159be
feedback: Measure .instructions instead of .cpuTotal
simonjbeaumont Jul 15, 2024
3ad5647
feedback: Remove vestigial references to setTimer in impl comments
simonjbeaumont Jul 15, 2024
bd8fa73
Merge remote-tracking branch 'upstream/main' into HEAD
simonjbeaumont Jul 25, 2024
6af561d
Add cancellation callback and implicitly cancel on shutdown
simonjbeaumont Jul 19, 2024
81cc415
format: Update for new format and lint rules
simonjbeaumont Jul 25, 2024
37ebfde
Merge remote-tracking branch 'upstream/main' into sb/timer-api
simonjbeaumont Jul 30, 2024
4467728
Remove use of Task.sleep(for:) in tests
simonjbeaumont Jul 30, 2024
6544461
Merge remote-tracking branch 'upstream/main' into sb/timer-api
simonjbeaumont Aug 15, 2024
c48b7aa
Update benchmark to use same config as other benchmarks
simonjbeaumont Aug 15, 2024
151c2c8
Rename onCancelScheduledCallback to didCancelScheduledCallback
simonjbeaumont Aug 15, 2024
5e61930
Merge branch 'main' into sb/timer-api
simonjbeaumont Sep 30, 2024
0a2baf3
Merge branch 'main' into sb/timer-api
simonjbeaumont Oct 3, 2024
ba63fda
Make AsyncStream.makeStream backfill internal
simonjbeaumont Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions Benchmarks/Benchmarks/NIOPosixBenchmarks/Benchmarks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

import Benchmark
import NIOCore
import NIOPosix

private let eventLoop = MultiThreadedEventLoopGroup.singleton.next()
Expand Down Expand Up @@ -64,4 +65,45 @@ let benchmarks = {
)
}
#endif

Benchmark(
"MTELG.scheduleTask(in:_:)",
configuration: Benchmark.Configuration(
metrics: [.mallocCountTotal, .cpuTotal],
scalingFactor: .kilo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we align the maximum duration/iterations with #2839

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I gently push back on that ask.

You're asking me to align with a new precedent in an un-merged PR that was opened well after this one, instead of this PR keeping the conventions of the branch it is targeting.

This PR has been subject to a lot of scope creep already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it's been merged already. So I've now merged this PR with main and updated the benchmarks to use the same configuration as the rest.

)
) { benchmark in
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { try! group.syncShutdownGracefully() }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should do this setup inside the benchmark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? IIUC, that's what startMeasurement is for.

If you're suggesting that I use the setup and teardown closures of the Benchmark() function, then I cannot since I need to actually use these variables in the benchmark. The only alternative would be to make them implicitly unwrapped optionals at global scope, which was pretty gross.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reuse ELs across benchmarks so I was thinking it might just be better if we create one EL that we share. I would just define a let eventLoop = MultithreadedEventLoopGroup.singleton.any() at the top.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that's great, I think I'd rather each benchmark do it's own setup and teardown to ensure its unpolluted by the side effects of running other benchmarks.

Where we can, I'm happy to try and use the setup/teardown closure style, if you prefer it, and where we cannot, use this.

It also has the benefit for local reasoning of what's being benchmarked. It's all contained in the call to Benchmark { ... }.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree and I just looked at the code in the benchmark again and we are already defining an EL for exactly this purpose. There is a static EL defined in this file called eventLoop.

The reason why I think it's not important to keep is that it should be irrelevant for the benchmark here. The important part of the benchmark is the scheduling and not how the EL is constructed and shut down so it keeps the benchmark more concise.

IMO we should definitely only have one style here and not mix static EL with one EL per benchmark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I still disagree with this feedback and think we should change the other benchmarks too for local reasoning reasons, I'm more interested in converging this PR, so I've updated to accommodate this feedback.

let loop = group.next()

benchmark.startMeasurement()
for _ in benchmark.scaledIterations {
loop.scheduleTask(in: .hours(1), {})
}
benchmark.stopMeasurement()
}

Benchmark(
"MTELG.scheduleCallback(in:_:)",
configuration: Benchmark.Configuration(
metrics: [.mallocCountTotal, .cpuTotal],
scalingFactor: .kilo
)
) { benchmark in
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { try! group.syncShutdownGracefully() }
let loop = group.next()

final class Timer: NIOScheduledCallbackHandler {
func onSchedule(eventLoop: some EventLoop) {}
}
let timer = Timer()

benchmark.startMeasurement()
for _ in benchmark.scaledIterations {
let handle = loop.scheduleCallback(in: .hours(1), handler: timer)
}
benchmark.stopMeasurement()
}
}
23 changes: 23 additions & 0 deletions Sources/NIOCore/EventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,29 @@ public protocol EventLoop: EventLoopGroup {
/// It is valid for an `EventLoop` not to implement any of the two `_promise` functions. If either of them are implemented,
/// however, both of them should be implemented.
func _promiseCompleted(futureIdentifier: _NIOEventLoopFutureIdentifier) -> (file: StaticString, line: UInt)?

/// Schedule a callback at a given time.
///
/// - NOTE: Event loops that provide a custom scheduled callback implementation **must** implement _both_
/// `sheduleCallback(at deadline:handler:)` _and_ `cancelScheduledCallback(_:)`. Failure to do so will
/// result in a runtime error.
@discardableResult
func scheduleCallback(at deadline: NIODeadline, handler: some NIOScheduledCallbackHandler) -> NIOScheduledCallback

/// Schedule a callback after given time.
///
/// - NOTE: Event loops that provide a custom scheduled callback implementation **must** implement _both_
/// `sheduleCallback(at deadline:handler:)` _and_ `cancelScheduledCallback(_:)`. Failure to do so will
/// result in a runtime error.
@discardableResult
func scheduleCallback(in amount: TimeAmount, handler: some NIOScheduledCallbackHandler) -> NIOScheduledCallback

/// Cancel a scheduled callback.
///
/// - NOTE: Event loops that provide a custom scheduled callback implementation **must** implement _both_
/// `sheduleCallback(at deadline:handler:)` _and_ `cancelScheduledCallback(_:)`. Failure to do so will
/// result in a runtime error.
func cancelScheduledCallback(_ scheduledCallback: NIOScheduledCallback)
}

extension EventLoop {
Expand Down
107 changes: 107 additions & 0 deletions Sources/NIOCore/NIOScheduledCallback.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

/// A type that handles callbacks scheduled with `EventLoop.scheduleCallback(at:handler:)`.
///
/// - Seealso: `EventLoop.scheduleCallback(at:handler:)`.
public protocol NIOScheduledCallbackHandler {
/// This function is called at the scheduled time, unless the scheduled callback is cancelled.
///
/// - Parameter eventLoop: The event loop on which the callback was scheduled.
func onSchedule(eventLoop: some EventLoop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming reads (to me) like this function is called at the time when the callback is scheduled (when eventLoop.scheduledCallback is called), as opposed to the time when the callback is scheduled to run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleScheduledCallback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the name to handleScheduledCallback in 2ec5543.

Question: do we need to be more considerate about name clashes since we're expecting folks to conform their types to this protocol; e.g. should this be something like handleNIOScheduledCallback?

This case isn't quite covered by https://github.com/apple/swift-nio/blob/main/docs/public-api.md, but it seems similar in nature.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glbrntt did you have any thoughts regarding the namespacing or is it fine the way it is?

The function takes a NIO type, so at worst it could cause an overload? Maybe it's OK the way it is now.

}

/// An opaque handle that can be used to cancel a scheduled callback.
///
/// Users should not create an instance of this type; it is returned by `EventLoop.scheduleCallback(at:handler:)`.
///
/// - Seealso: `EventLoop.scheduleCallback(at:handler:)`.
public struct NIOScheduledCallback: Sendable {
@usableFromInline
enum Backing: Sendable {
/// A task created using `EventLoop.scheduleTask(deadline:_:)` by the default implementation.
case `default`(_ task: Scheduled<Void>)
/// A custom callback identifier, used by event loops that provide a custom implementation.
case custom(id: UInt64)
}

@usableFromInline
var eventLoop: any EventLoop

@usableFromInline
var backing: Backing

/// This initializer is only for the default implementation and is fileprivate to avoid use in EL implementations.
fileprivate init(_ eventLoop: any EventLoop, _ task: Scheduled<Void>) {
self.eventLoop = eventLoop
self.backing = .default(task)
}

/// Create a handle for the scheduled callback with an opaque identifier managed by the event loop.
///
/// - NOTE: This initializer is for event loop implementors only, end users should use `EventLoop.scheduleCallback`.
///
/// - Seealso: `EventLoop.scheduleCallback(at:handler:)`.
@inlinable
public init(_ eventLoop: any EventLoop, id: UInt64) {
self.eventLoop = eventLoop
self.backing = .custom(id: id)
}

/// Cancel the scheduled callback associated with this handle.
@inlinable
public func cancel() {
self.eventLoop.cancelScheduledCallback(self)
}

/// The callback identifier, if the event loop uses a custom scheduled callback implementation; nil otherwise.
///
/// - NOTE: This property is for event loop implementors only.
@inlinable
public var customCallbackID: UInt64? {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Do we need the custom here in the naming?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it adds value when glancing at it as the property name implies that it's only relevant for custom implementations. How strongly do you feel about it. It's public API so if there's a consensus that this needs a different name I'll suck it up 😄

guard case .custom(let id) = backing else { return nil }
return id
}
}

extension EventLoop {
/// Default implementation of `scheduleCallback(at deadline:handler:)`: backed by `EventLoop.scheduleTask`.
@discardableResult
public func scheduleCallback(at deadline: NIODeadline, handler: some NIOScheduledCallbackHandler) -> NIOScheduledCallback {
let task = self.scheduleTask(deadline: deadline) { handler.onSchedule(eventLoop: self) }
return NIOScheduledCallback(self, task)
}

/// Default implementation of `scheduleCallback(in amount:handler:)`: calls `scheduleCallback(at deadline:handler:)`.
@discardableResult
@inlinable
public func scheduleCallback(in amount: TimeAmount, handler: some NIOScheduledCallbackHandler) -> NIOScheduledCallback {
self.scheduleCallback(at: .now() + amount, handler: handler)
}

/// Default implementation of `cancelScheduledCallback(_:)`: only cancels callbacks scheduled by the default implementation of `scheduleCallback`.
///
/// - NOTE: Event loops that provide a custom scheduled callback implementation **must** implement _both_
/// `sheduleCallback(at deadline:handler:)` _and_ `cancelScheduledCallback(_:)`. Failure to do so will
/// result in a runtime error.
@inlinable
public func cancelScheduledCallback(_ scheduledCallback: NIOScheduledCallback) {
switch scheduledCallback.backing {
case .default(let task):
task.cancel()
case .custom:
preconditionFailure("EventLoop missing custom implementation of cancelTimer(_:)")
}
}
}
9 changes: 9 additions & 0 deletions Sources/NIOEmbedded/AsyncTestingEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable {
return self.scheduleTask(deadline: self.now + `in`, task)
}

@discardableResult
public func scheduleCallback(in amount: TimeAmount, handler: some NIOScheduledCallbackHandler) -> NIOScheduledCallback {
// TODO: docs
/// Even though this type does not conform to `CustomTimerImplemenation`, it has a manual clock so we cannot
/// rely on the default implemntation of `setTimer(for duration:_:)`, which computes the deadline for
/// `setTimer(for deadline:_:)` naively using `NIODeadline.now`, but we must use `self.now`.
self.scheduleCallback(at: self.now + amount, handler: handler)
}

/// On an `NIOAsyncTestingEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. Unlike with the other operations, this will
/// immediately execute, to eliminate a common class of bugs.
public func execute(_ task: @escaping () -> Void) {
Expand Down
9 changes: 9 additions & 0 deletions Sources/NIOEmbedded/Embedded.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ public final class EmbeddedEventLoop: EventLoop {
return scheduleTask(deadline: self._now + `in`, task)
}

@discardableResult
public func scheduleCallback(in amount: TimeAmount, handler: some NIOScheduledCallbackHandler) -> NIOScheduledCallback {
// TODO: docs
/// Even though this type does not conform to `CustomTimerImplemenation`, it has a manual clock so we cannot
/// rely on the default implemntation of `setTimer(for duration:_:)`, which computes the deadline for
/// `setTimer(for deadline:_:)` naively using `NIODeadline.now`, but we must use `self._now`.
self.scheduleCallback(at: self._now + amount, handler: handler)
}

/// On an `EmbeddedEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. This means that
/// `task` will be run the next time you call `EmbeddedEventLoop.run`.
public func execute(_ task: @escaping () -> Void) {
Expand Down
23 changes: 17 additions & 6 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -460,21 +460,32 @@ internal struct ScheduledTask {
/// This means, the ids need to be unique for a given ``SelectableEventLoop`` and they need to be in ascending order.
@usableFromInline
let id: UInt64
let task: () -> Void
private let failFn: (Error) -> Void

@usableFromInline
internal let readyTime: NIODeadline

@usableFromInline
enum Kind {
case task(task: () -> Void, failFn: (Error) -> Void)
case callback(any NIOScheduledCallbackHandler)
}

@usableFromInline
let kind: Kind

// TODO: Should these be .init() or should they be static functions?
@usableFromInline
init(id: UInt64, _ task: @escaping () -> Void, _ failFn: @escaping (Error) -> Void, _ time: NIODeadline) {
self.id = id
self.task = task
self.failFn = failFn
self.readyTime = time
self.kind = .task(task: task, failFn: failFn)
}

func fail(_ error: Error) {
failFn(error)
@usableFromInline
init(id: UInt64, _ handler: any NIOScheduledCallbackHandler, _ time: NIODeadline) {
self.id = id
self.readyTime = time
self.kind = .callback(handler)
}
}

Expand Down
37 changes: 34 additions & 3 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ Further information:
fatalError("Tried to run an UnownedJob without runtime support")
}
#endif
case .callback(let handler):
handler.onSchedule(eventLoop: self)
}
}
}
Expand Down Expand Up @@ -565,7 +567,10 @@ Further information:
if moreScheduledTasksToConsider, tasksCopy.count < tasksCopyBatchSize, let task = scheduledTasks.peek() {
if task.readyTime.readyIn(now) <= .nanoseconds(0) {
scheduledTasks.pop()
tasksCopy.append(.function(task.task))
switch task.kind {
case .task(let task, _): tasksCopy.append(.function(task))
case .callback(let handler): tasksCopy.append(.callback(handler))
}
} else {
nextScheduledTaskDeadline = task.readyTime
moreScheduledTasksToConsider = false
Expand Down Expand Up @@ -658,9 +663,14 @@ Further information:
for task in immediateTasksCopy {
self.run(task)
}
// Fail all the scheduled tasks.
// Fail all the scheduled tasks (callbacks have no failFn and can just be dropped).
for task in scheduledTasksCopy {
task.fail(EventLoopError.shutdown)
switch task.kind {
case .task(_, let failFn):
failFn(EventLoopError.shutdown)
case .callback:
break
}
}

iterations += 1
Expand Down Expand Up @@ -869,6 +879,7 @@ enum UnderlyingTask {
#if compiler(>=5.9)
case unownedJob(ErasedUnownedJob)
#endif
case callback(any NIOScheduledCallbackHandler)
}

@usableFromInline
Expand All @@ -883,3 +894,23 @@ internal func assertExpression(_ body: () -> Bool) {
return body()
}())
}

extension SelectableEventLoop {
@inlinable
func scheduleCallback(at deadline: NIODeadline, handler: some NIOScheduledCallbackHandler) -> NIOScheduledCallback {
let taskId = self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed)
let task = ScheduledTask(id: taskId, handler, deadline)
try! self._schedule0(.scheduled(task))
return NIOScheduledCallback(self, id: taskId)
}

@inlinable
func cancelScheduledCallback(_ scheduledCallback: NIOScheduledCallback) {
guard let id = scheduledCallback.customCallbackID else {
preconditionFailure("No custom ID for callback")
}
self._tasksLock.withLock {
self._scheduledTasks.removeFirst(where: { $0.id == id })
}
}
}
Loading