Skip to content

Commit

Permalink
NIOSingletons: Use NIO in easy mode
Browse files Browse the repository at this point in the history
  • Loading branch information
weissi committed Jul 21, 2023
1 parent e5416f4 commit 592f509
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 19 deletions.
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -393,5 +393,9 @@ let package = Package(
name: "NIOTests",
dependencies: ["NIO"]
),
.testTarget(
name: "NIOSingletonsTests",
dependencies: ["NIOCore", "NIOPosix"]
),
]
)
133 changes: 133 additions & 0 deletions Sources/NIOCore/GlobalSingletons.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Atomics
#if canImport(Darwin)
import Darwin
#elseif os(Windows)
import ucrt
import WinSDK
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported C library")
#endif

/// SwiftNIO provided singleton resources for programs & libraries that don't need full control over all operating
/// system resources. This type holds sizing (how many loops/threads) suggestions.
public enum NIOGlobalSingletons {
}

extension NIOGlobalSingletons {
/// A suggestion of how many ``EventLoop``s the global singleton ``EventLoopGroup``s are supposed to consist of.
///
/// The thread count is ``System/coreCount`` unless the environment variable `NIO_SINGLETON_GROUP_LOOP_COUNT`
/// is set or this value was set manually by the user.
///
/// - note: You must not set this value more than once.
public static var suggestedGlobalSingletonGroupLoopCount: Int {
set {
Self.userSetSingletonThreadCount(rawStorage: globalRawSuggestedLoopCount, userValue: newValue)
}

get {
return Self.getTrustworthyThreadCount(rawStorage: globalRawSuggestedLoopCount,
environmentVariable: "NIO_SINGLETON_GROUP_LOOP_COUNT")
}
}

/// A suggestion of how many threads the global singleton thread pools that can be used for synchronous, blocking
/// functions (such as ``NIOThreadPool``) are supposed to consist of
///
/// The thread count is ``System/coreCount`` unless the environment variable
/// `NIO_SINGLETON_BLOCKING_POOL_THREAD_COUNT` is set or this value was set manually by the user.
///
/// - note: You must not set this value more than once.
public static var suggestedBlockingPoolThreadCount: Int {
set {
Self.userSetSingletonThreadCount(rawStorage: globalRawSuggestedBlockingThreadCount, userValue: newValue)
}

get {
return Self.getTrustworthyThreadCount(rawStorage: globalRawSuggestedBlockingThreadCount,
environmentVariable: "NIO_SINGLETON_BLOCKING_POOL_THREAD_COUNT")
}
}
}

// DO NOT TOUCH THESE DIRECTLY, use `userSetSingletonThreadCount` and `getTrustworthyThreadCount`.
private let globalRawSuggestedLoopCount = ManagedAtomic(0)
private let globalRawSuggestedBlockingThreadCount = ManagedAtomic(0)

extension NIOGlobalSingletons {
private static func userSetSingletonThreadCount(rawStorage: ManagedAtomic<Int>, userValue: Int) {
precondition(userValue > 0, "illegal value: needs to be strictly positive")

// The user is trying to set it. We can only do this if the value is at 0 and we will set the
// negative value. So if the user wants `5`, we will set `-5`. Once it's used (set getter), it'll be upped
// to 5.
let (exchanged, _) = rawStorage.compareExchange(expected: 0, desired: -userValue, ordering: .relaxed)
guard exchanged else {
fatalError("singleton suggested loop/thread count has been set more than once by user")
}
}

private static func validateTrustedThreadCount(_ threadCount: Int) {
assert(threadCount > 0,
"BUG IN NIO, please report: negative suggested loop/thread count: \(threadCount)")
assert(threadCount <= 1024,
"BUG IN NIO, please report: overly big suggested loop/thread count: \(threadCount)")
}

private static func getTrustworthyThreadCount(rawStorage: ManagedAtomic<Int>, environmentVariable: String) -> Int {
let returnedValueUnchecked: Int

let rawSuggestion = rawStorage.load(ordering: .relaxed)
switch rawSuggestion {
case 0: // == 0
// Not set by user, not yet finalised, let's try to get it from the env var and fall back to
// `System.coreCount`.
let envVarString = getenv(environmentVariable).map { String(cString: $0) }
returnedValueUnchecked = envVarString.flatMap(Int.init) ?? System.coreCount
case .min ..< 0: // < 0
// Untrusted and unchecked user value. Let's invert and then sanitise/check.
returnedValueUnchecked = -rawSuggestion
case 1 ... .max: // > 0
// Trustworthy value that has been evaluated and sanitised before.
let returnValue = rawSuggestion
Self.validateTrustedThreadCount(returnValue)
return returnValue
default:
// Unreachable
preconditionFailure()
}

// Can't have fewer than 1, don't want more than 1024.
let returnValue = max(1, min(1024, returnedValueUnchecked))
Self.validateTrustedThreadCount(returnValue)

// Store it for next time.
let (exchanged, _) = rawStorage.compareExchange(expected: rawSuggestion,
desired: returnValue,
ordering: .relaxed)
if !exchanged {
// We lost the race, this must mean it has been concurrently set correctly so we can safely recurse
// and try again.
return Self.getTrustworthyThreadCount(rawStorage: rawStorage, environmentVariable: environmentVariable)
}
return returnValue
}
}
16 changes: 3 additions & 13 deletions Sources/NIOHTTP1Server/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -514,18 +514,14 @@ default:
bindTarget = BindTo.ip(host: defaultHost, port: defaultPort)
}

let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
let threadPool = NIOThreadPool(numberOfThreads: 6)
threadPool.start()

func childChannelInitializer(channel: Channel) -> EventLoopFuture<Void> {
return channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
channel.pipeline.addHandler(HTTPHandler(fileIO: fileIO, htdocsPath: htdocs))
}
}

let fileIO = NonBlockingFileIO(threadPool: threadPool)
let socketBootstrap = ServerBootstrap(group: group)
let fileIO = NonBlockingFileIO(threadPool: .globalSingleton)
let socketBootstrap = ServerBootstrap(group: MultiThreadedEventLoopGroup.globalSingleton)
// Specify backlog and enable SO_REUSEADDR for the server itself
.serverChannelOption(ChannelOptions.backlog, value: 256)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
Expand All @@ -537,18 +533,12 @@ let socketBootstrap = ServerBootstrap(group: group)
.childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1)
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: allowHalfClosure)
let pipeBootstrap = NIOPipeBootstrap(group: group)
let pipeBootstrap = NIOPipeBootstrap(group: MultiThreadedEventLoopGroup.globalSingleton)
// Set the handlers that are applied to the accepted Channels
.channelInitializer(childChannelInitializer(channel:))

.channelOption(ChannelOptions.maxMessagesPerRead, value: 1)
.channelOption(ChannelOptions.allowRemoteHalfClosure, value: allowHalfClosure)

defer {
try! group.syncShutdownGracefully()
try! threadPool.syncShutdownGracefully()
}

print("htdocs = \(htdocs)")

let channel = try { () -> Channel in
Expand Down
64 changes: 60 additions & 4 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
private let index = ManagedAtomic<Int>(0)
private var eventLoops: [SelectableEventLoop]
private let shutdownLock: NIOLock = NIOLock()
private let threadNamePrefix: String
private var runState: RunState = .running
private let canBeShutDown: Bool

private static func runTheLoop(thread: NIOThread,
parentGroup: MultiThreadedEventLoopGroup? /* nil iff thread take-over */,
Expand Down Expand Up @@ -138,29 +140,73 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
/// - arguments:
/// - numberOfThreads: The number of `Threads` to use.
public convenience init(numberOfThreads: Int) {
self.init(numberOfThreads: numberOfThreads, selectorFactory: NIOPosix.Selector<NIORegistration>.init)
self.init(numberOfThreads: numberOfThreads,
canBeShutDown: true,
selectorFactory: NIOPosix.Selector<NIORegistration>.init)
}

/// Create a ``MultiThreadedEventLoopGroup`` that cannot be shut down and must not be `deinit`ed.
///
/// This is only useful for global singletons.
public static func _makePerpetualGroup(threadNamePrefix: String,
numberOfThreads: Int) -> MultiThreadedEventLoopGroup {
return self.init(numberOfThreads: numberOfThreads,
canBeShutDown: false,
threadNamePrefix: threadNamePrefix,
selectorFactory: NIOPosix.Selector<NIORegistration>.init)
}

internal convenience init(numberOfThreads: Int,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers, canBeShutDown: true, selectorFactory: selectorFactory)
}

internal convenience init(numberOfThreads: Int,
canBeShutDown: Bool,
threadNamePrefix: String,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers, selectorFactory: selectorFactory)
self.init(threadInitializers: initializers,
canBeShutDown: canBeShutDown,
threadNamePrefix: threadNamePrefix,
selectorFactory: selectorFactory)
}

internal convenience init(numberOfThreads: Int,
canBeShutDown: Bool,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers,
canBeShutDown: canBeShutDown,
selectorFactory: selectorFactory)
}

internal convenience init(threadInitializers: [ThreadInitializer],
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>.init) {
self.init(threadInitializers: threadInitializers, canBeShutDown: true, selectorFactory: selectorFactory)
}

/// Creates a `MultiThreadedEventLoopGroup` instance which uses the given `ThreadInitializer`s. One `NIOThread` per `ThreadInitializer` is created and used.
///
/// - arguments:
/// - threadInitializers: The `ThreadInitializer`s to use.
internal init(threadInitializers: [ThreadInitializer],
canBeShutDown: Bool,
threadNamePrefix: String = "NIO-ELT-",
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>.init) {
self.threadNamePrefix = threadNamePrefix
let myGroupID = nextEventLoopGroupID.loadThenWrappingIncrement(ordering: .relaxed)
self.myGroupID = myGroupID
var idx = 0
self.canBeShutDown = canBeShutDown
self.eventLoops = [] // Just so we're fully initialised and can vend `self` to the `SelectableEventLoop`.
self.eventLoops = threadInitializers.map { initializer in
// Maximum name length on linux is 16 by default.
let ev = MultiThreadedEventLoopGroup.setupThreadAndEventLoop(name: "NIO-ELT-\(myGroupID)-#\(idx)",
let ev = MultiThreadedEventLoopGroup.setupThreadAndEventLoop(name: "\(threadNamePrefix)\(myGroupID)-#\(idx)",
parentGroup: self,
selectorFactory: selectorFactory,
initializer: initializer)
Expand All @@ -169,6 +215,10 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
}
}

deinit {
assert(self.canBeShutDown, "Perpetual MTELG shut down, you must ensure that perpetual MTELGs don't deinit")
}

/// Returns the `EventLoop` for the calling thread.
///
/// - returns: The current `EventLoop` for the calling thread or `nil` if none is assigned to the thread.
Expand Down Expand Up @@ -244,6 +294,12 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
#endif

private func _shutdownGracefully(queue: DispatchQueue, _ handler: @escaping ShutdownGracefullyCallback) {
guard self.canBeShutDown else {
queue.async {
handler(EventLoopError.unsupportedOperation)
}
return
}
// This method cannot perform its final cleanup using EventLoopFutures, because it requires that all
// our event loops still be alive, and they may not be. Instead, we use Dispatch to manage
// our shutdown signaling, and then do our cleanup once the DispatchQueue is empty.
Expand Down Expand Up @@ -362,7 +418,7 @@ extension MultiThreadedEventLoopGroup: @unchecked Sendable {}

extension MultiThreadedEventLoopGroup: CustomStringConvertible {
public var description: String {
return "MultiThreadedEventLoopGroup { threadPattern = NIO-ELT-\(self.myGroupID)-#* }"
return "MultiThreadedEventLoopGroup { threadPattern = \(self.threadNamePrefix)\(self.myGroupID)-#* }"
}
}

Expand Down
35 changes: 33 additions & 2 deletions Sources/NIOPosix/NIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public enum NIOThreadPoolError {

/// The `NIOThreadPool` was not active.
public struct ThreadPoolInactive: Error { }

/// The `NIOThreadPool` operation is unsupported (e.g. shutdown of a perpetual pool).
public struct UnsupportedOperation: Error { }
}


Expand Down Expand Up @@ -69,6 +72,7 @@ public final class NIOThreadPool {
private var threads: [NIOThread]? = nil // protected by `lock`
private var state: State = .stopped
private let numberOfThreads: Int
private let canBeStopped: Bool

#if swift(>=5.7)
/// Gracefully shutdown this `NIOThreadPool`. All tasks will be run before shutdown will take place.
Expand All @@ -92,6 +96,12 @@ public final class NIOThreadPool {
#endif

private func _shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
guard self.canBeStopped else {
queue.async {
callback(NIOThreadPoolError.UnsupportedOperation())
}
return
}
let g = DispatchGroup()
let threadsToJoin = self.lock.withLock { () -> [NIOThread] in
switch self.state {
Expand Down Expand Up @@ -167,10 +177,25 @@ public final class NIOThreadPool {
///
/// - parameters:
/// - numberOfThreads: The number of threads to use for the thread pool.
public init(numberOfThreads: Int) {
public convenience init(numberOfThreads: Int) {
self.init(numberOfThreads: numberOfThreads, canBeStopped: true)
}

/// Create a ``NIOThreadPool`` that is already started, cannot be shut down and must not be `deinit`ed.
///
/// This is only useful for global singletons.
public static func _makePerpetualStartedPool(numberOfThreads: Int, threadNamePrefix: String) -> NIOThreadPool {
let pool = self.init(numberOfThreads: numberOfThreads, canBeStopped: false)
pool._start(threadNamePrefix: threadNamePrefix)
return pool
}

private init(numberOfThreads: Int, canBeStopped: Bool) {
self.numberOfThreads = numberOfThreads
self.canBeStopped = canBeStopped
}


private func process(identifier: Int) {
var item: WorkItem? = nil
repeat {
Expand Down Expand Up @@ -200,6 +225,10 @@ public final class NIOThreadPool {

/// Start the `NIOThreadPool` if not already started.
public func start() {
self._start(threadNamePrefix: "TP-#")
}

public func _start(threadNamePrefix: String) {
let alreadyRunning: Bool = self.lock.withLock {
switch self.state {
case .running(_):
Expand Down Expand Up @@ -228,7 +257,7 @@ public final class NIOThreadPool {
for id in 0..<self.numberOfThreads {
group.enter()
// We should keep thread names under 16 characters because Linux doesn't allow more.
NIOThread.spawnAndRun(name: "TP-#\(id)", detachThread: false) { thread in
NIOThread.spawnAndRun(name: "\(threadNamePrefix)\(id)", detachThread: false) { thread in
self.lock.withLock {
self.threads!.append(thread)
}
Expand All @@ -243,6 +272,8 @@ public final class NIOThreadPool {
}

deinit {
assert(self.canBeStopped,
"Perpetual NIOThreadPool has been deinited, you must make sure that perpetual pools don't deinit")
switch self.state {
case .stopped, .shuttingDown:
()
Expand Down
Loading

0 comments on commit 592f509

Please sign in to comment.