Skip to content

Commit

Permalink
NIOSingletons: Use NIO in easy mode
Browse files Browse the repository at this point in the history
  • Loading branch information
weissi committed Jul 13, 2023
1 parent 5f54289 commit a148af8
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 5 deletions.
19 changes: 19 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ let package = Package(
.library(name: "NIOFoundationCompat", targets: ["NIOFoundationCompat"]),
.library(name: "NIOWebSocket", targets: ["NIOWebSocket"]),
.library(name: "NIOTestUtils", targets: ["NIOTestUtils"]),
.library(name: "NIOSingletonsPosix", targets: ["NIOSingletonsPosix"]),
.library(name: "NIOSingletonResources", targets: ["NIOSingletonResources"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.1.0"),
Expand Down Expand Up @@ -168,6 +170,19 @@ let package = Package(
swiftAtomics,
]
),
.target(
name: "NIOSingletonResources",
dependencies: [
"NIOCore",
]
),
.target(
name: "NIOSingletonsPosix",
dependencies: [
"NIOSingletonResources",
"NIOPosix",
]
),

// MARK: - Examples

Expand Down Expand Up @@ -393,5 +408,9 @@ let package = Package(
name: "NIOTests",
dependencies: ["NIO"]
),
.testTarget(
name: "NIOSingletonsTests",
dependencies: ["NIOSingletonResources", "NIOSingletonsPosix", "NIOCore", "NIOPosix"]
),
]
)
64 changes: 60 additions & 4 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
private let index = ManagedAtomic<Int>(0)
private var eventLoops: [SelectableEventLoop]
private let shutdownLock: NIOLock = NIOLock()
private let threadNamePrefix: String
private var runState: RunState = .running
private let canBeShutDown: Bool

private static func runTheLoop(thread: NIOThread,
parentGroup: MultiThreadedEventLoopGroup? /* nil iff thread take-over */,
Expand Down Expand Up @@ -138,29 +140,73 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
/// - arguments:
/// - numberOfThreads: The number of `Threads` to use.
public convenience init(numberOfThreads: Int) {
self.init(numberOfThreads: numberOfThreads, selectorFactory: NIOPosix.Selector<NIORegistration>.init)
self.init(numberOfThreads: numberOfThreads,
canBeShutDown: true,
selectorFactory: NIOPosix.Selector<NIORegistration>.init)
}

/// Create a ``MultiThreadedEventLoopGroup`` that cannot be shut down and must not be `deinit`ed.
///
/// This is only useful for global singletons.
public static func _makePerpetualGroup(threadNamePrefix: String,
numberOfThreads: Int) -> MultiThreadedEventLoopGroup {
return self.init(numberOfThreads: numberOfThreads,
canBeShutDown: false,
threadNamePrefix: threadNamePrefix,
selectorFactory: NIOPosix.Selector<NIORegistration>.init)
}

internal convenience init(numberOfThreads: Int,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers, canBeShutDown: true, selectorFactory: selectorFactory)
}

internal convenience init(numberOfThreads: Int,
canBeShutDown: Bool,
threadNamePrefix: String,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers, selectorFactory: selectorFactory)
self.init(threadInitializers: initializers,
canBeShutDown: canBeShutDown,
threadNamePrefix: threadNamePrefix,
selectorFactory: selectorFactory)
}

internal convenience init(numberOfThreads: Int,
canBeShutDown: Bool,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers,
canBeShutDown: canBeShutDown,
selectorFactory: selectorFactory)
}

internal convenience init(threadInitializers: [ThreadInitializer],
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>.init) {
self.init(threadInitializers: threadInitializers, canBeShutDown: true, selectorFactory: selectorFactory)
}

/// Creates a `MultiThreadedEventLoopGroup` instance which uses the given `ThreadInitializer`s. One `NIOThread` per `ThreadInitializer` is created and used.
///
/// - arguments:
/// - threadInitializers: The `ThreadInitializer`s to use.
internal init(threadInitializers: [ThreadInitializer],
canBeShutDown: Bool,
threadNamePrefix: String = "NIO-ELT-",
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>.init) {
self.threadNamePrefix = threadNamePrefix
let myGroupID = nextEventLoopGroupID.loadThenWrappingIncrement(ordering: .relaxed)
self.myGroupID = myGroupID
var idx = 0
self.canBeShutDown = canBeShutDown
self.eventLoops = [] // Just so we're fully initialised and can vend `self` to the `SelectableEventLoop`.
self.eventLoops = threadInitializers.map { initializer in
// Maximum name length on linux is 16 by default.
let ev = MultiThreadedEventLoopGroup.setupThreadAndEventLoop(name: "NIO-ELT-\(myGroupID)-#\(idx)",
let ev = MultiThreadedEventLoopGroup.setupThreadAndEventLoop(name: "\(threadNamePrefix)\(myGroupID)-#\(idx)",
parentGroup: self,
selectorFactory: selectorFactory,
initializer: initializer)
Expand All @@ -169,6 +215,10 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
}
}

deinit {
assert(self.canBeShutDown, "Perpetual MTELG shut down, you must ensure that perpetual MTELGs don't deinit")
}

/// Returns the `EventLoop` for the calling thread.
///
/// - returns: The current `EventLoop` for the calling thread or `nil` if none is assigned to the thread.
Expand Down Expand Up @@ -244,6 +294,12 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
#endif

private func _shutdownGracefully(queue: DispatchQueue, _ handler: @escaping ShutdownGracefullyCallback) {
guard self.canBeShutDown else {
queue.async {
handler(EventLoopError.unsupportedOperation)
}
return
}
// This method cannot perform its final cleanup using EventLoopFutures, because it requires that all
// our event loops still be alive, and they may not be. Instead, we use Dispatch to manage
// our shutdown signaling, and then do our cleanup once the DispatchQueue is empty.
Expand Down Expand Up @@ -362,7 +418,7 @@ extension MultiThreadedEventLoopGroup: @unchecked Sendable {}

extension MultiThreadedEventLoopGroup: CustomStringConvertible {
public var description: String {
return "MultiThreadedEventLoopGroup { threadPattern = NIO-ELT-\(self.myGroupID)-#* }"
return "MultiThreadedEventLoopGroup { threadPattern = \(self.threadNamePrefix)\(self.myGroupID)-#* }"
}
}

Expand Down
29 changes: 28 additions & 1 deletion Sources/NIOPosix/NIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public enum NIOThreadPoolError {

/// The `NIOThreadPool` was not active.
public struct ThreadPoolInactive: Error { }

/// The `NIOThreadPool` operation is unsupported (e.g. shutdown of a perpetual pool).
public struct UnsupportedOperation: Error { }
}


Expand Down Expand Up @@ -69,6 +72,7 @@ public final class NIOThreadPool {
private var threads: [NIOThread]? = nil // protected by `lock`
private var state: State = .stopped
private let numberOfThreads: Int
private let canBeStopped: Bool

#if swift(>=5.7)
/// Gracefully shutdown this `NIOThreadPool`. All tasks will be run before shutdown will take place.
Expand All @@ -92,6 +96,12 @@ public final class NIOThreadPool {
#endif

private func _shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
guard self.canBeStopped else {
queue.async {
callback(NIOThreadPoolError.UnsupportedOperation())
}
return
}
let g = DispatchGroup()
let threadsToJoin = self.lock.withLock { () -> [NIOThread] in
switch self.state {
Expand Down Expand Up @@ -167,10 +177,25 @@ public final class NIOThreadPool {
///
/// - parameters:
/// - numberOfThreads: The number of threads to use for the thread pool.
public init(numberOfThreads: Int) {
public convenience init(numberOfThreads: Int) {
self.init(numberOfThreads: numberOfThreads, canBeStopped: true)
}

/// Create a ``NIOThreadPool`` that is already started, cannot be shut down and must not be `deinit`ed.
///
/// This is only useful for global singletons.
public static func _makePerpetualStartedPool(numberOfThreads: Int) -> NIOThreadPool {
let pool = self.init(numberOfThreads: numberOfThreads, canBeStopped: false)
pool.start()
return pool
}

private init(numberOfThreads: Int, canBeStopped: Bool) {
self.numberOfThreads = numberOfThreads
self.canBeStopped = canBeStopped
}


private func process(identifier: Int) {
var item: WorkItem? = nil
repeat {
Expand Down Expand Up @@ -243,6 +268,8 @@ public final class NIOThreadPool {
}

deinit {
assert(self.canBeStopped,
"Perpetual NIOThreadPool has been deinited, you must make sure that perpetual pools don't deinit")
switch self.state {
case .stopped, .shuttingDown:
()
Expand Down
75 changes: 75 additions & 0 deletions Sources/NIOSingletonResources/SingletonType.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if canImport(Darwin)
import Darwin
#elseif os(Windows)
import ucrt
import WinSDK
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported C library")
#endif
import NIOCore

/// Singleton resources provided by SwiftNIO for programs & libraries that don't need full control over all operating system resources.
///
/// SwiftNIO allows and encourages the precise management of all operating system resources such as threads and file descriptors.
/// Certain resources (such as the main ``EventLoopGroup``) however are usually globally shared across the program. This means
/// that many programs have to carry around an ``EventLoopGroup`` despite the fact the don't require the ability to fully return
/// all the operating resources which would imply shutting down the ``EventLoopGroup``. This type is the global handle for singleton
/// resources that applications (and some libraries) can use to obtain never-shut-down singleton resources.
///
/// Programs and libraries that do not use these singletons will not incur extra resource usage, these resources are lazily initialized on
/// first use.
public enum NIOSingletons {}

extension NIOSingletons {
/// How many threads will be used for the multi-threaded ``EventLoopGroup``s.
///
/// The thread count is ``System/coreCount`` unless the environment variable `NIO_SINGLETONS_MULTI_THREADED_ELG_LOOP_COUNT`
/// is set.
public static var suggestedMultiThreadedEventLoopGroupLoopCount: Int {
return globalSuggestedMultiThreadedEventLoopGroupLoopCount
}

/// How many threads will be used for the blocking `NIOThreadPool`s.
///
/// The thread count is determined automatically unless the environment variable `NIO_SINGLETONS_BLOCKING_POOL_THREAD_COUNT` is set.
public static var suggestedBlockingPoolThreadCount: Int {
return globalSuggestedBlockingPoolThreadCount
}
}

private let globalSuggestedMultiThreadedEventLoopGroupLoopCount: Int = {
if let threadCountEnv = getenv("NIO_SINGLETONS_MULTI_THREADED_ELG_LOOP_COUNT"),
let threadCount = Int(String(cString: threadCountEnv)), threadCount > 0 {
return threadCount
} else {
return System.coreCount
}
}()

private let globalSuggestedBlockingPoolThreadCount: Int = {
if let threadCountEnv = getenv("NIO_SINGLETONS_BLOCKING_POOL_THREAD_COUNT"),
let threadCount = Int(String(cString: threadCountEnv)), threadCount > 0 {
return threadCount
} else {
return System.coreCount * 2
}
}()

66 changes: 66 additions & 0 deletions Sources/NIOSingletonsPosix/PosixSingletons.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@_exported import NIOSingletonResources
import NIOPosix
import NIOCore

extension NIOSingletons {
/// A globally shared, lazily initialized ``MultiThreadedEventLoopGroup`` that uses `epoll` as the selector mechanism.
///
/// The number of threads is determined by ``NIOSingletons.suggestedMultiThreadedEventLoopGroupThreadCount``.
public static var multiThreadedPosixEventLoopGroup: MultiThreadedEventLoopGroup {
return globalMultiThreadedPosixEventLoopGroup
}

/// A globally shared, lazily initialized ``MultiThreadedEventLoopGroup`` with just one thread that uses `epoll` as the selector mechanism.
public static var singleThreadedPosixEventLoopGroup: MultiThreadedEventLoopGroup {
return globalSingleThreadedPosixEventLoopGroup
}

/// A globally shared, lazily initialized ``EventLoop`` that uses `epoll` as the selector mechanism. Backed by `NIOSingletons.singleThreadedPosixEventLoopGroup`.
public static var posixEventLoop: EventLoop {
return Self.singleThreadedPosixEventLoopGroup.next()
}

/// A globally shared, lazily initialized ``NIOThreadPool`` that can be used for blocking I/O and other blocking operations.
///
/// /// The number of threads is determined by ``NIOSingletons.suggestedBlockingPoolThreadCount``.
public static var posixBlockingPool: NIOThreadPool {
return globalPosixBlockingPool
}
}

private let globalMultiThreadedPosixEventLoopGroup: MultiThreadedEventLoopGroup = {
let threadCount = NIOSingletons.suggestedMultiThreadedEventLoopGroupLoopCount
let group = MultiThreadedEventLoopGroup._makePerpetualGroup(threadNamePrefix: "NIO-MSGTN-",
numberOfThreads: threadCount)
_ = Unmanaged.passUnretained(group).retain() // Never gonna give you up,
return group
}()

private let globalSingleThreadedPosixEventLoopGroup: MultiThreadedEventLoopGroup = {
let group = MultiThreadedEventLoopGroup._makePerpetualGroup(threadNamePrefix: "NIO-SSGTN-",
numberOfThreads: 1)
_ = Unmanaged.passUnretained(group).retain() // never gonna let you down.
return group
}()

private let globalPosixBlockingPool: NIOThreadPool = {
let pool = NIOThreadPool._makePerpetualStartedPool(numberOfThreads: NIOSingletons.suggestedBlockingPoolThreadCount)
_ = Unmanaged.passUnretained(pool).retain() // Make sure this is never deallocated (not strictly necessary).
return pool
}()


Loading

0 comments on commit a148af8

Please sign in to comment.