Skip to content

Commit 7ad076c

Browse files
committed
Introduce concurreny limiter abstraction, a trimmed down
version of token bucket. Add tests
1 parent 17d4826 commit 7ad076c

6 files changed

+307
-83
lines changed

Package.resolved

-5
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,8 @@
1515
"repositoryURL": "https://github.com/apple/swift-nio.git",
1616
"state": {
1717
"branch": null,
18-
<<<<<<< HEAD
1918
"revision": "ece5057615d1bee848341eceafdf04ca54d60177",
2019
"version": "2.41.0"
21-
=======
22-
"revision": "124119f0bb12384cef35aa041d7c3a686108722d",
23-
"version": "2.40.0"
24-
>>>>>>> 66928b7 (To support linux better, replace operation queue with dispatch queue in BatchingFutureOperationQueue)
2520
}
2621
},
2722
{

Package.swift

-4
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,7 @@ let package = Package(
1212
targets: ["TSFCAS", "TSFCASFileTree", "TSFCASUtilities"]),
1313
],
1414
dependencies: [
15-
<<<<<<< HEAD
1615
.package(url: "https://github.com/apple/swift-nio.git", from: "2.32.0"),
17-
=======
18-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.40.0"),
19-
>>>>>>> d4b5e97 (Remove OperationQueue implementation altogether and use equivalent dispatchQoS)
2016
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.8.0"),
2117
.package(url: "https://github.com/apple/swift-tools-support-core.git", from: "0.2.3"),
2218
],

Sources/TSFFutures/BatchingFutureOperationQueue.swift

+51-70
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,18 @@
88

99
import Dispatch
1010
import Foundation
11-
import NIOConcurrencyHelpers
1211
import NIO
13-
12+
import NIOConcurrencyHelpers
1413
import TSCUtility
1514

16-
1715
/// Run the given computations on a given array in batches, exercising
1816
/// a specified amount of parallelism.
1917
///
2018
/// - Discussion:
2119
/// For some blocking operations (such as file system accesses) executing
2220
/// them on the NIO loops is very expensive since it blocks the event
2321
/// processing machinery. Here we use extra threads for such operations.
24-
public class LLBBatchingFutureOperationQueue {
22+
public struct LLBBatchingFutureOperationQueue {
2523

2624
/// Threads capable of running futures.
2725
public let group: LLBFuturesDispatchGroup
@@ -35,28 +33,33 @@ public class LLBBatchingFutureOperationQueue {
3533

3634
/// Maximum number of operations executed concurrently.
3735
public var maxOpCount: Int {
38-
get { lock.withLock { maxOpCount_ } }
39-
set { scheduleMoreTasks { maxOpCount_ = newValue } }
36+
get { concurrencyLimiter.maximumConcurrency }
37+
set { concurrencyLimiter.maximumConcurrency = Self.bridged(maxOpCount: newValue) }
4038
}
4139

4240
/// Return the number of operations currently queued.
43-
public var opCount: Int { lock.withLock { opCount_ } }
41+
public var opCount: Int { concurrencyLimiter.sharesInUse }
42+
43+
/// Name to be used for dispatch queue
44+
private let name: String
4445

45-
/// Queue of outstanding operations
46-
private let dispatchQueue: DispatchQueue
46+
/// QoS passed to DispatchQueue
47+
private let qos: DispatchQoS
4748

4849
/// Lock protecting state.
4950
private let lock = NIOConcurrencyHelpers.Lock()
5051

51-
private var maxOpCount_: Int
52-
53-
private var opCount_: Int
52+
/// Limits number of concurrent operations being executed
53+
private let concurrencyLimiter: ConcurrencyLimiter
5454

5555
/// The queue of operations to run.
5656
private var workQueue = NIO.CircularBuffer<DispatchWorkItem>()
5757

5858
@available(*, deprecated, message: "'qualityOfService' is deprecated: Use 'dispatchQoS'")
59-
public convenience init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int, qualityOfService: QualityOfService) {
59+
public init(
60+
name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int,
61+
qualityOfService: QualityOfService
62+
) {
6063
let dispatchQoS: DispatchQoS
6164

6265
switch qualityOfService {
@@ -81,94 +84,72 @@ public class LLBBatchingFutureOperationQueue {
8184
/// - group: Threads capable of running futures.
8285
/// - maxConcurrentOperationCount:
8386
/// Operations to execute in parallel.
84-
public convenience init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int) {
87+
public init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int) {
8588
self.init(name: name, group: group, maxConcurrentOperationCount: maxOpCount, dispatchQoS: .default)
8689
}
8790

88-
public init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCnt: Int, dispatchQoS: DispatchQoS) {
91+
public init(
92+
name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCnt: Int,
93+
dispatchQoS: DispatchQoS
94+
) {
8995
self.group = group
90-
self.dispatchQueue = DispatchQueue(label: name, qos: dispatchQoS, attributes: .concurrent)
91-
self.opCount_ = 0
92-
self.maxOpCount_ = maxOpCnt
96+
self.name = name
97+
self.qos = dispatchQoS
98+
99+
self.concurrencyLimiter = ConcurrencyLimiter(maximumConcurrency: Self.bridged(maxOpCount: maxOpCnt))
93100
}
94101

95102
public func execute<T>(_ body: @escaping () throws -> T) -> LLBFuture<T> {
96-
let promise = group.any().makePromise(of: T.self)
103+
return self.concurrencyLimiter.withReplenishableLimit(eventLoop: group.any()) { eventLoop in
104+
let promise = eventLoop.makePromise(of: T.self)
97105

98-
let workItem = DispatchWorkItem {
99-
promise.fulfill(body)
100-
self.scheduleMoreTasks {
101-
self.opCount_ -= 1
106+
DispatchQueue(label: self.name, qos: self.qos).async {
107+
promise.fulfill(body)
102108
}
103-
}
104109

105-
self.scheduleMoreTasks {
106-
workQueue.append(workItem)
110+
return promise.futureResult
107111
}
108-
109-
return promise.futureResult
110112
}
111113

112114
public func execute<T>(_ body: @escaping () -> LLBFuture<T>) -> LLBFuture<T> {
113-
let promise = group.any().makePromise(of: T.self)
115+
return self.concurrencyLimiter.withReplenishableLimit(eventLoop: group.any()) { eventLoop in
116+
let promise = eventLoop.makePromise(of: T.self)
114117

115-
let workItem = DispatchWorkItem {
116-
let f = body()
117-
f.cascade(to: promise)
118-
119-
_ = try? f.wait()
120-
121-
self.scheduleMoreTasks {
122-
self.opCount_ -= 1
118+
DispatchQueue(label: self.name, qos: self.qos).async {
119+
body().cascade(to: promise)
123120
}
124-
}
125121

126-
self.scheduleMoreTasks {
127-
workQueue.append(workItem)
122+
return promise.futureResult
128123
}
129-
130-
return promise.futureResult
131124
}
132125

133126
/// Order-preserving parallel execution. Wait for everything to complete.
134127
@inlinable
135-
public func execute<A,T>(_ args: [A], minStride: Int = 1, _ body: @escaping (ArraySlice<A>) throws -> [T]) -> LLBFuture<[T]> {
128+
public func execute<A, T>(_ args: [A], minStride: Int = 1, _ body: @escaping (ArraySlice<A>) throws -> [T])
129+
-> LLBFuture<[T]>
130+
{
136131
let futures: [LLBFuture<[T]>] = executeNoWait(args, minStride: minStride, body)
137132
let loop = futures.first?.eventLoop ?? group.next()
138-
return LLBFuture<[T]>.whenAllSucceed(futures, on: loop).map{$0.flatMap{$0}}
133+
return LLBFuture<[T]>.whenAllSucceed(futures, on: loop).map { $0.flatMap { $0 } }
139134
}
140135

141136
/// Order-preserving parallel execution.
142137
/// Do not wait for all executions to complete, returning individual futures.
143138
@inlinable
144-
public func executeNoWait<A,T>(_ args: [A], minStride: Int = 1, maxStride: Int = Int.max, _ body: @escaping (ArraySlice<A>) throws -> [T]) -> [LLBFuture<[T]>] {
145-
let batches: [ArraySlice<A>] = args.tsc_sliceBy(maxStride: max(minStride, min(maxStride, args.count / maxOpCount)))
146-
return batches.map{arg in execute{try body(arg)}}
139+
public func executeNoWait<A, T>(
140+
_ args: [A], minStride: Int = 1, maxStride: Int = Int.max, _ body: @escaping (ArraySlice<A>) throws -> [T]
141+
) -> [LLBFuture<[T]>] {
142+
let batches: [ArraySlice<A>] = args.tsc_sliceBy(
143+
maxStride: max(minStride, min(maxStride, args.count / maxOpCount)))
144+
return batches.map { arg in execute { try body(arg) } }
147145
}
148146

149-
private func scheduleMoreTasks(performUnderLock: () -> Void) {
150-
let toExecute: [DispatchWorkItem] = lock.withLock {
151-
performUnderLock()
152-
153-
var scheduleItems: [DispatchWorkItem] = []
154-
155-
while opCount_ < maxOpCount_ {
156-
157-
// Schedule a new operation, if available.
158-
guard let op = workQueue.popFirst() else {
159-
break
160-
}
161-
162-
self.opCount_ += 1
163-
scheduleItems.append(op)
164-
}
165-
166-
return scheduleItems
167-
}
168-
169-
for workItem in toExecute {
170-
dispatchQueue.async(execute: workItem)
147+
private static func bridged(maxOpCount: Int) -> Int {
148+
switch maxOpCount {
149+
case OperationQueue.defaultMaxConcurrentOperationCount:
150+
return System.coreCount
151+
default:
152+
return maxOpCount
171153
}
172154
}
173-
174155
}
+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
import NIO
14+
import NIOConcurrencyHelpers
15+
16+
internal final class ConcurrencyLimiter {
17+
public var maximumConcurrency: Int {
18+
get { lock.withLock { maximumConcurrency_ } }
19+
set {
20+
var waiters: [Waiter] = []
21+
lock.withLockVoid {
22+
let differenceInCapacity = (newValue - maximumConcurrency_)
23+
unitsLeft += differenceInCapacity
24+
maximumConcurrency_ = newValue
25+
waiters = tryFulfillSomeLocked()
26+
}
27+
waiters.fulfillWaiters()
28+
}
29+
}
30+
public var sharesInUse: Int { lock.withLock { maximumConcurrency_ - unitsLeft } }
31+
32+
private var unitsLeft: Int // protected by `self.lock`
33+
private var waiters: CircularBuffer<Waiter> = [] // protected by `self.lock`
34+
private let lock = Lock()
35+
private var maximumConcurrency_: Int // protected by `self.lock`
36+
37+
public init(maximumConcurrency: Int) {
38+
precondition(maximumConcurrency >= 0)
39+
40+
self.maximumConcurrency_ = maximumConcurrency
41+
self.unitsLeft = maximumConcurrency
42+
}
43+
44+
/// Reserves 1 unit of concurrency, executes body after which it restores the 1 unit.
45+
public func withReplenishableLimit<T>(
46+
eventLoop: EventLoop,
47+
_ body: @escaping (EventLoop) -> EventLoopFuture<T>
48+
) -> EventLoopFuture<T> {
49+
return self.withdraw(eventLoop: eventLoop).flatMap { lease in
50+
body(eventLoop).always { _ in
51+
self.replenish(lease)
52+
}
53+
}
54+
}
55+
56+
private func tryFulfillSomeLocked() -> [Waiter] {
57+
var toSucceed: [Waiter] = []
58+
let unitsLeftAtStart = self.unitsLeft
59+
60+
while !self.waiters.isEmpty, self.unitsLeft >= 1 {
61+
let waiter = self.waiters.removeFirst()
62+
63+
self.unitsLeft -= 1
64+
assert(self.unitsLeft >= 0)
65+
toSucceed.append(waiter)
66+
}
67+
68+
assert(unitsLeftAtStart - toSucceed.count == self.unitsLeft)
69+
return toSucceed
70+
}
71+
72+
private func replenish(_ lease: Lease) {
73+
self.lock.withLock { () -> [Waiter] in
74+
self.unitsLeft += 1
75+
assert(self.unitsLeft <= self.maximumConcurrency_)
76+
return self.tryFulfillSomeLocked()
77+
}.fulfillWaiters()
78+
}
79+
80+
/// Reserve 1 unit of the limit if available
81+
private func withdraw(eventLoop: EventLoop) -> EventLoopFuture<Lease> {
82+
let future = self.lock.withLock { () -> EventLoopFuture<Lease> in
83+
if self.waiters.isEmpty && self.unitsLeft >= 1 {
84+
self.unitsLeft -= 1
85+
86+
return eventLoop.makeSucceededFuture(Lease())
87+
}
88+
89+
let promise = eventLoop.makePromise(of: Lease.self)
90+
self.waiters.append(Waiter(promise: promise))
91+
92+
return promise.futureResult
93+
}
94+
95+
return future
96+
}
97+
98+
fileprivate struct Waiter {
99+
var promise: EventLoopPromise<Lease>
100+
}
101+
102+
fileprivate struct Lease {}
103+
}
104+
105+
extension Array where Element == ConcurrencyLimiter.Waiter {
106+
fileprivate func fulfillWaiters() {
107+
self.forEach { waiter in
108+
return waiter.promise.succeed(.init())
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)