diff --git a/Package.swift b/Package.swift index 467f79d147..c8ada847f0 100644 --- a/Package.swift +++ b/Package.swift @@ -393,5 +393,9 @@ let package = Package( name: "NIOTests", dependencies: ["NIO"] ), + .testTarget( + name: "NIOSingletonsTests", + dependencies: ["NIOCore", "NIOPosix"] + ), ] ) diff --git a/Sources/NIOCore/GlobalSingletons.swift b/Sources/NIOCore/GlobalSingletons.swift new file mode 100644 index 0000000000..79e752a74e --- /dev/null +++ b/Sources/NIOCore/GlobalSingletons.swift @@ -0,0 +1,180 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Atomics +#if canImport(Darwin) +import Darwin +#elseif os(Windows) +import ucrt +import WinSDK +#elseif canImport(Glibc) +import Glibc +#elseif canImport(Musl) +import Musl +#else +#error("Unsupported C library") +#endif + +/// SwiftNIO provided singleton resources for programs & libraries that don't need full control over all operating +/// system resources. This type holds sizing (how many loops/threads) suggestions. +/// +/// Users who need very tight control about the exact threads and resources created may decide to set +/// `NIOSingletons.singletonsEnabledSuggestion = false`. All singleton-creating facilities should check +/// this setting and if `false` restrain from creating any global singleton resources. Please note that disabling the +/// global singletons will lead to a crash if _any_ code attempts to use any of the singletons. +public enum NIOSingletons { +} + +extension NIOSingletons { + /// A suggestion of how many ``EventLoop``s the global singleton ``EventLoopGroup``s are supposed to consist of. + /// + /// The thread count is ``System/coreCount`` unless the environment variable `NIO_SINGLETON_GROUP_LOOP_COUNT` + /// is set or this value was set manually by the user. + /// + /// - note: This value must be set _before_ any singletons are used and must only be set once. + public static var groupLoopCountSuggestion: Int { + set { + Self.userSetSingletonThreadCount(rawStorage: globalRawSuggestedLoopCount, userValue: newValue) + } + + get { + return Self.getTrustworthyThreadCount(rawStorage: globalRawSuggestedLoopCount, + environmentVariable: "NIO_SINGLETON_GROUP_LOOP_COUNT") + } + } + + /// A suggestion of how many threads the global singleton thread pools that can be used for synchronous, blocking + /// functions (such as `NIOThreadPool`) are supposed to consist of + /// + /// The thread count is ``System/coreCount`` unless the environment variable + /// `NIO_SINGLETON_BLOCKING_POOL_THREAD_COUNT` is set or this value was set manually by the user. + /// + /// - note: This value must be set _before_ any singletons are used and must only be set once. + public static var blockingPoolThreadCountSuggestion: Int { + set { + Self.userSetSingletonThreadCount(rawStorage: globalRawSuggestedBlockingThreadCount, userValue: newValue) + } + + get { + return Self.getTrustworthyThreadCount(rawStorage: globalRawSuggestedBlockingThreadCount, + environmentVariable: "NIO_SINGLETON_BLOCKING_POOL_THREAD_COUNT") + } + } + + /// A suggestion for whether the global singletons should be enabled. This is `true` unless changed by the user. + /// + /// This value cannot be changed using an environment variable. + /// + /// - note: This value must be set _before_ any singletons are used and must only be set once. + public static var singletonsEnabledSuggestion: Bool { + get { + let (exchanged, original) = globalRawSingletonsEnabled.compareExchange(expected: 0, + desired: 1, + ordering: .relaxed) + if exchanged { + // Never been set, we're committing to the default (enabled). + assert(original == 0) + return true + } else { + // This has been set before, 1: enabled; -1 disabled. + assert(original != 0) + assert(original == -1 || original == 1) + return original > 0 + } + } + + set { + let intRepresentation = newValue ? 1 : -1 + let (exchanged, _) = globalRawSingletonsEnabled.compareExchange(expected: 0, + desired: intRepresentation, + ordering: .relaxed) + guard exchanged else { + fatalError(""" + Bug in user code: Global singleton enabled suggestion has been changed after \ + user or has been changed more than once. Either is an error, you must set this value very \ + early and only once. + """) + } + } + } +} + +// DO NOT TOUCH THESE DIRECTLY, use `userSetSingletonThreadCount` and `getTrustworthyThreadCount`. +private let globalRawSuggestedLoopCount = ManagedAtomic(0) +private let globalRawSuggestedBlockingThreadCount = ManagedAtomic(0) +private let globalRawSingletonsEnabled = ManagedAtomic(0) + +extension NIOSingletons { + private static func userSetSingletonThreadCount(rawStorage: ManagedAtomic, userValue: Int) { + precondition(userValue > 0, "illegal value: needs to be strictly positive") + + // The user is trying to set it. We can only do this if the value is at 0 and we will set the + // negative value. So if the user wants `5`, we will set `-5`. Once it's used (set getter), it'll be upped + // to 5. + let (exchanged, _) = rawStorage.compareExchange(expected: 0, desired: -userValue, ordering: .relaxed) + guard exchanged else { + fatalError(""" + Bug in user code: Global singleton suggested loop/thread count has been changed after \ + user or has been changed more than once. Either is an error, you must set this value very early \ + and only once. + """) + } + } + + private static func validateTrustedThreadCount(_ threadCount: Int) { + assert(threadCount > 0, + "BUG IN NIO, please report: negative suggested loop/thread count: \(threadCount)") + assert(threadCount <= 1024, + "BUG IN NIO, please report: overly big suggested loop/thread count: \(threadCount)") + } + + private static func getTrustworthyThreadCount(rawStorage: ManagedAtomic, environmentVariable: String) -> Int { + let returnedValueUnchecked: Int + + let rawSuggestion = rawStorage.load(ordering: .relaxed) + switch rawSuggestion { + case 0: // == 0 + // Not set by user, not yet finalised, let's try to get it from the env var and fall back to + // `System.coreCount`. + let envVarString = getenv(environmentVariable).map { String(cString: $0) } + returnedValueUnchecked = envVarString.flatMap(Int.init) ?? System.coreCount + case .min ..< 0: // < 0 + // Untrusted and unchecked user value. Let's invert and then sanitise/check. + returnedValueUnchecked = -rawSuggestion + case 1 ... .max: // > 0 + // Trustworthy value that has been evaluated and sanitised before. + let returnValue = rawSuggestion + Self.validateTrustedThreadCount(returnValue) + return returnValue + default: + // Unreachable + preconditionFailure() + } + + // Can't have fewer than 1, don't want more than 1024. + let returnValue = max(1, min(1024, returnedValueUnchecked)) + Self.validateTrustedThreadCount(returnValue) + + // Store it for next time. + let (exchanged, _) = rawStorage.compareExchange(expected: rawSuggestion, + desired: returnValue, + ordering: .relaxed) + if !exchanged { + // We lost the race, this must mean it has been concurrently set correctly so we can safely recurse + // and try again. + return Self.getTrustworthyThreadCount(rawStorage: rawStorage, environmentVariable: environmentVariable) + } + return returnValue + } +} diff --git a/Sources/NIOCrashTester/CrashTests+EventLoop.swift b/Sources/NIOCrashTester/CrashTests+EventLoop.swift index 809216b530..658d5f36ae 100644 --- a/Sources/NIOCrashTester/CrashTests+EventLoop.swift +++ b/Sources/NIOCrashTester/CrashTests+EventLoop.swift @@ -105,5 +105,77 @@ struct EventLoopCrashTests { try! el.submit {}.wait() } } + + let testUsingTheSingletonGroupWhenDisabled = CrashTest( + regex: #"Fatal error: Cannot create global singleton MultiThreadedEventLoopGroup because the global singletons"# + ) { + NIOSingletons.singletonsEnabledSuggestion = false + try? NIOSingletons.posixEventLoopGroup.next().submit {}.wait() + } + + let testUsingTheSingletonBlockingPoolWhenDisabled = CrashTest( + regex: #"Fatal error: Cannot create global singleton NIOThreadPool because the global singletons have been"# + ) { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + try? group.syncShutdownGracefully() + } + NIOSingletons.singletonsEnabledSuggestion = false + try? NIOSingletons.posixBlockingThreadPool.runIfActive(eventLoop: group.next(), {}).wait() + } + + let testDisablingSingletonsEnabledValueTwice = CrashTest( + regex: #"Fatal error: Bug in user code: Global singleton enabled suggestion has been changed after"# + ) { + NIOSingletons.singletonsEnabledSuggestion = false + NIOSingletons.singletonsEnabledSuggestion = false + } + + let testEnablingSingletonsEnabledValueTwice = CrashTest( + regex: #"Fatal error: Bug in user code: Global singleton enabled suggestion has been changed after"# + ) { + NIOSingletons.singletonsEnabledSuggestion = true + NIOSingletons.singletonsEnabledSuggestion = true + } + + let testEnablingThenDisablingSingletonsEnabledValue = CrashTest( + regex: #"Fatal error: Bug in user code: Global singleton enabled suggestion has been changed after"# + ) { + NIOSingletons.singletonsEnabledSuggestion = true + NIOSingletons.singletonsEnabledSuggestion = false + } + + let testSettingTheSingletonEnabledValueAfterUse = CrashTest( + regex: #"Fatal error: Bug in user code: Global singleton enabled suggestion has been changed after"# + ) { + try? MultiThreadedEventLoopGroup.singleton.next().submit({}).wait() + NIOSingletons.singletonsEnabledSuggestion = true + } + + let testSettingTheSuggestedSingletonGroupCountTwice = CrashTest( + regex: #"Fatal error: Bug in user code: Global singleton suggested loop/thread count has been changed after"# + ) { + NIOSingletons.groupLoopCountSuggestion = 17 + NIOSingletons.groupLoopCountSuggestion = 17 + } + + let testSettingTheSuggestedSingletonGroupChangeAfterUse = CrashTest( + regex: #"Fatal error: Bug in user code: Global singleton suggested loop/thread count has been changed after"# + ) { + try? MultiThreadedEventLoopGroup.singleton.next().submit({}).wait() + NIOSingletons.groupLoopCountSuggestion = 17 + } + + let testSettingTheSuggestedSingletonGroupLoopCountToZero = CrashTest( + regex: #"Precondition failed: illegal value: needs to be strictly positive"# + ) { + NIOSingletons.groupLoopCountSuggestion = 0 + } + + let testSettingTheSuggestedSingletonGroupLoopCountToANegativeValue = CrashTest( + regex: #"Precondition failed: illegal value: needs to be strictly positive"# + ) { + NIOSingletons.groupLoopCountSuggestion = -1 + } } #endif diff --git a/Sources/NIOHTTP1Server/main.swift b/Sources/NIOHTTP1Server/main.swift index 303d08025d..5b98a3e52c 100644 --- a/Sources/NIOHTTP1Server/main.swift +++ b/Sources/NIOHTTP1Server/main.swift @@ -514,18 +514,14 @@ default: bindTarget = BindTo.ip(host: defaultHost, port: defaultPort) } -let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) -let threadPool = NIOThreadPool(numberOfThreads: 6) -threadPool.start() - func childChannelInitializer(channel: Channel) -> EventLoopFuture { return channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap { channel.pipeline.addHandler(HTTPHandler(fileIO: fileIO, htdocsPath: htdocs)) } } -let fileIO = NonBlockingFileIO(threadPool: threadPool) -let socketBootstrap = ServerBootstrap(group: group) +let fileIO = NonBlockingFileIO(threadPool: .singleton) +let socketBootstrap = ServerBootstrap(group: MultiThreadedEventLoopGroup.singleton) // Specify backlog and enable SO_REUSEADDR for the server itself .serverChannelOption(ChannelOptions.backlog, value: 256) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) @@ -537,18 +533,12 @@ let socketBootstrap = ServerBootstrap(group: group) .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1) .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: allowHalfClosure) -let pipeBootstrap = NIOPipeBootstrap(group: group) +let pipeBootstrap = NIOPipeBootstrap(group: MultiThreadedEventLoopGroup.singleton) // Set the handlers that are applied to the accepted Channels .channelInitializer(childChannelInitializer(channel:)) .channelOption(ChannelOptions.maxMessagesPerRead, value: 1) .channelOption(ChannelOptions.allowRemoteHalfClosure, value: allowHalfClosure) - -defer { - try! group.syncShutdownGracefully() - try! threadPool.syncShutdownGracefully() -} - print("htdocs = \(htdocs)") let channel = try { () -> Channel in diff --git a/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift b/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift index 80cd665f93..f9214a5f91 100644 --- a/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift +++ b/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift @@ -71,7 +71,9 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup { private let index = ManagedAtomic(0) private var eventLoops: [SelectableEventLoop] private let shutdownLock: NIOLock = NIOLock() + private let threadNamePrefix: String private var runState: RunState = .running + private let canBeShutDown: Bool private static func runTheLoop(thread: NIOThread, parentGroup: MultiThreadedEventLoopGroup? /* nil iff thread take-over */, @@ -138,14 +140,54 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup { /// - arguments: /// - numberOfThreads: The number of `Threads` to use. public convenience init(numberOfThreads: Int) { - self.init(numberOfThreads: numberOfThreads, selectorFactory: NIOPosix.Selector.init) + self.init(numberOfThreads: numberOfThreads, + canBeShutDown: true, + selectorFactory: NIOPosix.Selector.init) + } + + /// Create a ``MultiThreadedEventLoopGroup`` that cannot be shut down and must not be `deinit`ed. + /// + /// This is only useful for global singletons. + public static func _makePerpetualGroup(threadNamePrefix: String, + numberOfThreads: Int) -> MultiThreadedEventLoopGroup { + return self.init(numberOfThreads: numberOfThreads, + canBeShutDown: false, + threadNamePrefix: threadNamePrefix, + selectorFactory: NIOPosix.Selector.init) + } + + internal convenience init(numberOfThreads: Int, + selectorFactory: @escaping () throws -> NIOPosix.Selector) { + precondition(numberOfThreads > 0, "numberOfThreads must be positive") + let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads) + self.init(threadInitializers: initializers, canBeShutDown: true, selectorFactory: selectorFactory) } internal convenience init(numberOfThreads: Int, + canBeShutDown: Bool, + threadNamePrefix: String, selectorFactory: @escaping () throws -> NIOPosix.Selector) { precondition(numberOfThreads > 0, "numberOfThreads must be positive") let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads) - self.init(threadInitializers: initializers, selectorFactory: selectorFactory) + self.init(threadInitializers: initializers, + canBeShutDown: canBeShutDown, + threadNamePrefix: threadNamePrefix, + selectorFactory: selectorFactory) + } + + internal convenience init(numberOfThreads: Int, + canBeShutDown: Bool, + selectorFactory: @escaping () throws -> NIOPosix.Selector) { + precondition(numberOfThreads > 0, "numberOfThreads must be positive") + let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads) + self.init(threadInitializers: initializers, + canBeShutDown: canBeShutDown, + selectorFactory: selectorFactory) + } + + internal convenience init(threadInitializers: [ThreadInitializer], + selectorFactory: @escaping () throws -> NIOPosix.Selector = NIOPosix.Selector.init) { + self.init(threadInitializers: threadInitializers, canBeShutDown: true, selectorFactory: selectorFactory) } /// Creates a `MultiThreadedEventLoopGroup` instance which uses the given `ThreadInitializer`s. One `NIOThread` per `ThreadInitializer` is created and used. @@ -153,14 +195,18 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup { /// - arguments: /// - threadInitializers: The `ThreadInitializer`s to use. internal init(threadInitializers: [ThreadInitializer], + canBeShutDown: Bool, + threadNamePrefix: String = "NIO-ELT-", selectorFactory: @escaping () throws -> NIOPosix.Selector = NIOPosix.Selector.init) { + self.threadNamePrefix = threadNamePrefix let myGroupID = nextEventLoopGroupID.loadThenWrappingIncrement(ordering: .relaxed) self.myGroupID = myGroupID var idx = 0 + self.canBeShutDown = canBeShutDown self.eventLoops = [] // Just so we're fully initialised and can vend `self` to the `SelectableEventLoop`. self.eventLoops = threadInitializers.map { initializer in // Maximum name length on linux is 16 by default. - let ev = MultiThreadedEventLoopGroup.setupThreadAndEventLoop(name: "NIO-ELT-\(myGroupID)-#\(idx)", + let ev = MultiThreadedEventLoopGroup.setupThreadAndEventLoop(name: "\(threadNamePrefix)\(myGroupID)-#\(idx)", parentGroup: self, selectorFactory: selectorFactory, initializer: initializer) @@ -169,6 +215,10 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup { } } + deinit { + assert(self.canBeShutDown, "Perpetual MTELG shut down, you must ensure that perpetual MTELGs don't deinit") + } + /// Returns the `EventLoop` for the calling thread. /// /// - returns: The current `EventLoop` for the calling thread or `nil` if none is assigned to the thread. @@ -244,6 +294,12 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup { #endif private func _shutdownGracefully(queue: DispatchQueue, _ handler: @escaping ShutdownGracefullyCallback) { + guard self.canBeShutDown else { + queue.async { + handler(EventLoopError.unsupportedOperation) + } + return + } // This method cannot perform its final cleanup using EventLoopFutures, because it requires that all // our event loops still be alive, and they may not be. Instead, we use Dispatch to manage // our shutdown signaling, and then do our cleanup once the DispatchQueue is empty. @@ -362,7 +418,7 @@ extension MultiThreadedEventLoopGroup: @unchecked Sendable {} extension MultiThreadedEventLoopGroup: CustomStringConvertible { public var description: String { - return "MultiThreadedEventLoopGroup { threadPattern = NIO-ELT-\(self.myGroupID)-#* }" + return "MultiThreadedEventLoopGroup { threadPattern = \(self.threadNamePrefix)\(self.myGroupID)-#* }" } } diff --git a/Sources/NIOPosix/NIOThreadPool.swift b/Sources/NIOPosix/NIOThreadPool.swift index 6928284c65..a326709e0e 100644 --- a/Sources/NIOPosix/NIOThreadPool.swift +++ b/Sources/NIOPosix/NIOThreadPool.swift @@ -21,6 +21,9 @@ public enum NIOThreadPoolError { /// The `NIOThreadPool` was not active. public struct ThreadPoolInactive: Error { } + + /// The `NIOThreadPool` operation is unsupported (e.g. shutdown of a perpetual pool). + public struct UnsupportedOperation: Error { } } @@ -69,6 +72,7 @@ public final class NIOThreadPool { private var threads: [NIOThread]? = nil // protected by `lock` private var state: State = .stopped private let numberOfThreads: Int + private let canBeStopped: Bool #if swift(>=5.7) /// Gracefully shutdown this `NIOThreadPool`. All tasks will be run before shutdown will take place. @@ -92,6 +96,12 @@ public final class NIOThreadPool { #endif private func _shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + guard self.canBeStopped else { + queue.async { + callback(NIOThreadPoolError.UnsupportedOperation()) + } + return + } let g = DispatchGroup() let threadsToJoin = self.lock.withLock { () -> [NIOThread] in switch self.state { @@ -167,10 +177,25 @@ public final class NIOThreadPool { /// /// - parameters: /// - numberOfThreads: The number of threads to use for the thread pool. - public init(numberOfThreads: Int) { + public convenience init(numberOfThreads: Int) { + self.init(numberOfThreads: numberOfThreads, canBeStopped: true) + } + + /// Create a ``NIOThreadPool`` that is already started, cannot be shut down and must not be `deinit`ed. + /// + /// This is only useful for global singletons. + public static func _makePerpetualStartedPool(numberOfThreads: Int, threadNamePrefix: String) -> NIOThreadPool { + let pool = self.init(numberOfThreads: numberOfThreads, canBeStopped: false) + pool._start(threadNamePrefix: threadNamePrefix) + return pool + } + + private init(numberOfThreads: Int, canBeStopped: Bool) { self.numberOfThreads = numberOfThreads + self.canBeStopped = canBeStopped } + private func process(identifier: Int) { var item: WorkItem? = nil repeat { @@ -200,6 +225,10 @@ public final class NIOThreadPool { /// Start the `NIOThreadPool` if not already started. public func start() { + self._start(threadNamePrefix: "TP-#") + } + + public func _start(threadNamePrefix: String) { let alreadyRunning: Bool = self.lock.withLock { switch self.state { case .running(_): @@ -228,7 +257,7 @@ public final class NIOThreadPool { for id in 0..