Skip to content

Commit 3e16d1f

Browse files
committed
Remove OperationQueue implementation altogether and use equivalent dispatchQoS
1 parent 66928b7 commit 3e16d1f

4 files changed

+135
-306
lines changed

Package.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ let package = Package(
1212
targets: ["TSFCAS", "TSFCASFileTree", "TSFCASUtilities"]),
1313
],
1414
dependencies: [
15-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.8.0"),
15+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.40.0"),
1616
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.8.0"),
1717
.package(url: "https://github.com/apple/swift-tools-support-core.git", from: "0.2.3"),
1818
],

Sources/TSFFutures/BatchingFutureDispatchQueue.swift

-151
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1-
//===----------------------------------------------------------------------===//
2-
//
31
// This source file is part of the Swift.org open source project
42
//
5-
// Copyright (c) 2022 Apple Inc. and the Swift project authors
3+
// Copyright (c) 2020 Apple Inc. and the Swift project authors
64
// Licensed under Apache License v2.0 with Runtime Library Exception
75
//
8-
// See https://swift.org/LICENSE.txt for license information
9-
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10-
//
11-
//===----------------------------------------------------------------------===//
6+
// See http://swift.org/LICENSE.txt for license information
7+
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
128

139
import Dispatch
1410
import Foundation
11+
import NIOConcurrencyHelpers
12+
import NIO
13+
14+
import TSCUtility
15+
1516

1617
/// Run the given computations on a given array in batches, exercising
1718
/// a specified amount of parallelism.
@@ -21,70 +22,153 @@ import Foundation
2122
/// them on the NIO loops is very expensive since it blocks the event
2223
/// processing machinery. Here we use extra threads for such operations.
2324
public class LLBBatchingFutureOperationQueue {
24-
// OperationQueue based implementation
25-
private var oq: LLBBatchingFutureOperationQueueDeprecated?
26-
27-
// DispatchQueue based implementation
28-
private var dq: LLBBatchingFutureDispatchQueue?
29-
30-
public var group: LLBFuturesDispatchGroup { oq?.group ?? dq!.group }
31-
25+
26+
/// Threads capable of running futures.
27+
public let group: LLBFuturesDispatchGroup
28+
29+
/// Whether the queue is suspended.
30+
@available(*, deprecated, message: "Property 'isSuspended' is deprecated.")
31+
public var isSuspended: Bool {
32+
// Cannot suspend a DispatchQueue
33+
false
34+
}
35+
3236
/// Maximum number of operations executed concurrently.
3337
public var maxOpCount: Int {
34-
get { oq?.maxOpCount ?? dq!.maxOpCount }
35-
set {
36-
if var q = oq {
37-
q.maxOpCount = newValue
38-
return
39-
}
40-
dq!.maxOpCount = newValue
41-
}
38+
get { lock.withLock { maxOpCount_ } }
39+
set { scheduleMoreTasks { maxOpCount_ = newValue } }
4240
}
43-
44-
public var opCount: Int { oq?.opCount ?? dq!.opCount }
45-
46-
@available(*, deprecated, message: "isSuspended is deprecated")
47-
public var isSuspended: Bool { oq?.isSuspended ?? dq!.isSuspended }
48-
49-
///
50-
/// - Parameters:
51-
/// - name: Unique string label, for logging.
52-
/// - group: Threads capable of running futures.
53-
/// - maxConcurrentOperationCount:
54-
/// Operations to execute in parallel.
41+
42+
/// Return the number of operations currently queued.
43+
public var opCount: Int { lock.withLock { opCount_ } }
44+
45+
/// Queue of outstanding operations
46+
private let dispatchQueue: DispatchQueue
47+
48+
/// Lock protecting state.
49+
private let lock = NIOConcurrencyHelpers.Lock()
50+
51+
private var maxOpCount_: Int
52+
53+
private var opCount_: Int
54+
55+
/// The queue of operations to run.
56+
private var workQueue = NIO.CircularBuffer<DispatchWorkItem>()
57+
5558
@available(*, deprecated, message: "'qualityOfService' is deprecated: Use 'dispatchQoS'")
56-
public init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int, qualityOfService: QualityOfService = .default) {
57-
self.oq = LLBBatchingFutureOperationQueueDeprecated(name: name, group: group, maxConcurrentOperationCount: maxOpCount, qualityOfService: qualityOfService)
58-
self.dq = nil
59+
public convenience init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int, qualityOfService: QualityOfService) {
60+
let dispatchQoS: DispatchQoS
61+
62+
switch qualityOfService {
63+
case .userInteractive:
64+
dispatchQoS = .userInteractive
65+
case .userInitiated:
66+
dispatchQoS = .userInitiated
67+
case .utility:
68+
dispatchQoS = .utility
69+
case .background:
70+
dispatchQoS = .background
71+
default:
72+
dispatchQoS = .default
73+
}
74+
75+
self.init(name: name, group: group, maxConcurrentOperationCount: maxOpCount, dispatchQoS: dispatchQoS)
5976
}
60-
77+
6178
///
6279
/// - Parameters:
6380
/// - name: Unique string label, for logging.
6481
/// - group: Threads capable of running futures.
6582
/// - maxConcurrentOperationCount:
6683
/// Operations to execute in parallel.
67-
public init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int, dispatchQoS: DispatchQoS) {
68-
self.dq = LLBBatchingFutureDispatchQueue(name: name, group: group, maxConcurrentOperationCount: maxOpCount, dispatchQoS: dispatchQoS)
69-
self.oq = nil
84+
public convenience init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCount: Int) {
85+
self.init(name: name, group: group, maxConcurrentOperationCount: maxOpCount, dispatchQoS: .default)
7086
}
71-
87+
88+
public init(name: String, group: LLBFuturesDispatchGroup, maxConcurrentOperationCount maxOpCnt: Int, dispatchQoS: DispatchQoS) {
89+
self.group = group
90+
self.dispatchQueue = DispatchQueue(label: name, qos: dispatchQoS, attributes: .concurrent)
91+
self.opCount_ = 0
92+
self.maxOpCount_ = maxOpCnt
93+
}
94+
7295
public func execute<T>(_ body: @escaping () throws -> T) -> LLBFuture<T> {
73-
return oq?.execute(body) ?? dq!.execute(body)
96+
let promise = group.any().makePromise(of: T.self)
97+
98+
let workItem = DispatchWorkItem {
99+
promise.fulfill(body)
100+
self.scheduleMoreTasks {
101+
self.opCount_ -= 1
102+
}
103+
}
104+
105+
self.scheduleMoreTasks {
106+
workQueue.append(workItem)
107+
}
108+
109+
return promise.futureResult
74110
}
75-
111+
76112
public func execute<T>(_ body: @escaping () -> LLBFuture<T>) -> LLBFuture<T> {
77-
return oq?.execute(body) ?? dq!.execute(body)
113+
let promise = group.any().makePromise(of: T.self)
114+
115+
let workItem = DispatchWorkItem {
116+
let f = body()
117+
f.cascade(to: promise)
118+
119+
_ = try? f.wait()
120+
121+
self.scheduleMoreTasks {
122+
self.opCount_ -= 1
123+
}
124+
}
125+
126+
self.scheduleMoreTasks {
127+
workQueue.append(workItem)
128+
}
129+
130+
return promise.futureResult
78131
}
79-
132+
80133
/// Order-preserving parallel execution. Wait for everything to complete.
134+
@inlinable
81135
public func execute<A,T>(_ args: [A], minStride: Int = 1, _ body: @escaping (ArraySlice<A>) throws -> [T]) -> LLBFuture<[T]> {
82-
return oq?.execute(args, minStride: minStride, body) ?? dq!.execute(args, minStride: minStride, body)
136+
let futures: [LLBFuture<[T]>] = executeNoWait(args, minStride: minStride, body)
137+
let loop = futures.first?.eventLoop ?? group.next()
138+
return LLBFuture<[T]>.whenAllSucceed(futures, on: loop).map{$0.flatMap{$0}}
83139
}
84-
140+
85141
/// Order-preserving parallel execution.
86142
/// Do not wait for all executions to complete, returning individual futures.
143+
@inlinable
87144
public func executeNoWait<A,T>(_ args: [A], minStride: Int = 1, maxStride: Int = Int.max, _ body: @escaping (ArraySlice<A>) throws -> [T]) -> [LLBFuture<[T]>] {
88-
return oq?.executeNoWait(args, minStride: minStride, maxStride: maxStride, body) ?? dq!.executeNoWait(args, minStride: minStride, maxStride: maxStride, body)
145+
let batches: [ArraySlice<A>] = args.tsc_sliceBy(maxStride: max(minStride, min(maxStride, args.count / maxOpCount)))
146+
return batches.map{arg in execute{try body(arg)}}
147+
}
148+
149+
private func scheduleMoreTasks(performUnderLock: () -> Void) {
150+
let toExecute: [DispatchWorkItem] = lock.withLock {
151+
performUnderLock()
152+
153+
var scheduleItems: [DispatchWorkItem] = []
154+
155+
while opCount_ < maxOpCount_ {
156+
157+
// Schedule a new operation, if available.
158+
guard let op = workQueue.popFirst() else {
159+
break
160+
}
161+
162+
self.opCount_ += 1
163+
scheduleItems.append(op)
164+
}
165+
166+
return scheduleItems
167+
}
168+
169+
for workItem in toExecute {
170+
dispatchQueue.async(execute: workItem)
171+
}
89172
}
173+
90174
}

0 commit comments

Comments
 (0)