diff --git a/Sources/Basics/Concurrency/ConcurrencyHelpers.swift b/Sources/Basics/Concurrency/ConcurrencyHelpers.swift index 52f25250d47..9faf32694fc 100644 --- a/Sources/Basics/Concurrency/ConcurrencyHelpers.swift +++ b/Sources/Basics/Concurrency/ConcurrencyHelpers.swift @@ -15,6 +15,7 @@ import Dispatch import class Foundation.NSLock import class Foundation.ProcessInfo import struct Foundation.URL +import struct Foundation.UUID import func TSCBasic.tsc_await public enum Concurrency { @@ -76,3 +77,203 @@ extension DispatchQueue { } } } + +/// A queue for running async operations with a limit on the number of concurrent tasks. +public final class AsyncOperationQueue: @unchecked Sendable { + + // This implementation is identical to the AsyncOperationQueue in swift-build. + // Any modifications made here should also be made there. + // https://github.com/swiftlang/swift-build/blob/main/Sources/SWBUtil/AsyncOperationQueue.swift#L13 + + fileprivate typealias ID = UUID + fileprivate typealias WaitingContinuation = CheckedContinuation + + private let concurrentTasks: Int + private var waitingTasks: [WorkTask] = [] + private let waitingTasksLock = NSLock() + + fileprivate enum WorkTask { + case creating(ID) + case waiting(ID, WaitingContinuation) + case running(ID) + case cancelled(ID) + + var id: ID { + switch self { + case .creating(let id), .waiting(let id, _), .running(let id), .cancelled(let id): + return id + } + } + + var continuation: WaitingContinuation? { + guard case .waiting(_, let continuation) = self else { + return nil + } + return continuation + } + } + + /// Creates an `AsyncOperationQueue` with a specified number of concurrent tasks. + /// - Parameter concurrentTasks: The maximum number of concurrent tasks that can be executed concurrently. + public init(concurrentTasks: Int) { + self.concurrentTasks = concurrentTasks + } + + deinit { + waitingTasksLock.withLock { + if !waitingTasks.isEmpty { + preconditionFailure("Deallocated with waiting tasks") + } + } + } + + /// Executes an asynchronous operation, ensuring that the number of concurrent tasks + // does not exceed the specified limit. + /// - Parameter operation: The asynchronous operation to execute. + /// - Returns: The result of the operation. + /// - Throws: An error thrown by the operation, or a `CancellationError` if the operation is cancelled. + public func withOperation( + _ operation: () async throws -> sending ReturnValue + ) async throws -> ReturnValue { + let taskId = try await waitIfNeeded() + defer { signalCompletion(taskId) } + return try await operation() + } + + private func waitIfNeeded() async throws -> ID { + let workTask = waitingTasksLock.withLock({ + let shouldWait = waitingTasks.count >= concurrentTasks + let workTask = shouldWait ? WorkTask.creating(ID()) : .running(ID()) + waitingTasks.append(workTask) + return workTask + }) + + // If we aren't creating a task that needs to wait, we're under the concurrency limit. + guard case .creating(let taskId) = workTask else { + return workTask.id + } + + enum TaskAction { + case start(WaitingContinuation) + case cancel(WaitingContinuation) + } + + try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { (continuation: WaitingContinuation) -> Void in + let action: TaskAction? = waitingTasksLock.withLock { + guard let index = waitingTasks.firstIndex(where: { $0.id == taskId }) else { + // The task may have been marked as cancelled already and then removed from + // waitingTasks in `signalCompletion`. + return .cancel(continuation) + } + + switch waitingTasks[index] { + case .cancelled: + // If the task was cancelled in between creating the task cancellation handler and acquiring the lock, + // we should resume the continuation with a `CancellationError`. + waitingTasks.remove(at: index) + return .cancel(continuation) + case .creating, .running, .waiting: + // A task may have completed since we initially checked if we should wait. Check again in this locked + // section and if we can start it, remove it from the waiting tasks and start it immediately. + if waitingTasks.count >= concurrentTasks { + waitingTasks[index] = .waiting(taskId, continuation) + return nil + } else { + waitingTasks.remove(at: index) + return .start(continuation) + } + } + } + + switch action { + case .some(.cancel(let continuation)): + continuation.resume(throwing: _Concurrency.CancellationError()) + case .some(.start(let continuation)): + continuation.resume() + case .none: + return + } + } + } onCancel: { + let continuation: WaitingContinuation? = self.waitingTasksLock.withLock { + guard let taskIndex = self.waitingTasks.firstIndex(where: { $0.id == taskId }) else { + return nil + } + + switch self.waitingTasks[taskIndex] { + case .waiting(_, let continuation): + self.waitingTasks.remove(at: taskIndex) + + // If the parent task is cancelled then we need to manually handle resuming the + // continuation for the waiting task with a `CancellationError`. Return the continuation + // here so it can be resumed once the `waitingTasksLock` is released. + return continuation + case .creating, .running: + // If the task was still being created, mark it as cancelled in `waitingTasks` so that + // the handler for `withCheckedThrowingContinuation` can immediately cancel it. + self.waitingTasks[taskIndex] = .cancelled(taskId) + return nil + case .cancelled: + preconditionFailure("Attempting to cancel a task that was already cancelled") + } + } + + continuation?.resume(throwing: _Concurrency.CancellationError()) + } + return workTask.id + } + + private func signalCompletion(_ taskId: ID) { + let continuationToResume = waitingTasksLock.withLock { () -> WaitingContinuation? in + guard !waitingTasks.isEmpty else { + return nil + } + + // Remove the completed task from the list to decrement the active task count. + if let taskIndex = self.waitingTasks.firstIndex(where: { $0.id == taskId }) { + waitingTasks.remove(at: taskIndex) + } + + // We cannot remove elements from `waitingTasks` while iterating over it, so we make + // a pass to collect operations and then apply them after the loop. + func createTaskListOperations() -> (CollectionDifference?, WaitingContinuation?) { + var changes: [CollectionDifference.Change] = [] + for (index, task) in waitingTasks.enumerated() { + switch task { + case .running: + // Skip tasks that are already running, looking for the first one that is waiting or creating. + continue + case .creating: + // If the next task is in the process of being created, let the + // creation code in the `withCheckedThrowingContinuation` in `waitIfNeeded` + // handle starting the task. + break + case .waiting: + // Begin the next waiting task + changes.append(.remove(offset: index, element: task, associatedWith: nil)) + return (CollectionDifference(changes), task.continuation) + case .cancelled: + // If the next task is cancelled, continue removing cancelled + // tasks until we find one that hasn't run yet, or we exaust the list of waiting tasks. + changes.append(.remove(offset: index, element: task, associatedWith: nil)) + continue + } + } + return (CollectionDifference(changes), nil) + } + + let (collectionOperations, continuation) = createTaskListOperations() + if let operations = collectionOperations { + guard let appliedDiff = waitingTasks.applying(operations) else { + preconditionFailure("Failed to apply changes to waiting tasks") + } + waitingTasks = appliedDiff + } + + return continuation + } + + continuationToResume?.resume() + } +} diff --git a/Sources/PackageGraph/PackageContainer.swift b/Sources/PackageGraph/PackageContainer.swift index f9827fa8d1c..515dfdbb3f4 100644 --- a/Sources/PackageGraph/PackageContainer.swift +++ b/Sources/PackageGraph/PackageContainer.swift @@ -197,33 +197,27 @@ extension PackageContainerConstraint: CustomStringConvertible { /// An interface for resolving package containers. public protocol PackageContainerProvider { /// Get the container for a particular identifier asynchronously. - - @available(*, noasync, message: "Use the async alternative") func getContainer( for package: PackageReference, updateStrategy: ContainerUpdateStrategy, - observabilityScope: ObservabilityScope, - on queue: DispatchQueue, - completion: @escaping @Sendable (Result) -> Void - ) + observabilityScope: ObservabilityScope + ) async throws -> PackageContainer } public extension PackageContainerProvider { + @available(*, noasync, message: "Use the async alternative") func getContainer( for package: PackageReference, updateStrategy: ContainerUpdateStrategy, observabilityScope: ObservabilityScope, - on queue: DispatchQueue - ) async throws -> PackageContainer { - try await withCheckedThrowingContinuation { continuation in - self.getContainer( + on queue: DispatchQueue, + completion: @escaping @Sendable (Result) -> Void + ) { + queue.asyncResult(completion) { + try await self.getContainer( for: package, updateStrategy: updateStrategy, - observabilityScope: observabilityScope, - on: queue, - completion: { - continuation.resume(with: $0) - } + observabilityScope: observabilityScope ) } } diff --git a/Sources/PackageMetadata/PackageMetadata.swift b/Sources/PackageMetadata/PackageMetadata.swift index 531430a09dd..efc3f6926a6 100644 --- a/Sources/PackageMetadata/PackageMetadata.swift +++ b/Sources/PackageMetadata/PackageMetadata.swift @@ -232,7 +232,7 @@ public struct PackageSearchClient { let fetchStandalonePackageByURL = { (error: Error?) async throws -> [Package] in let url = SourceControlURL(query) do { - return try withTemporaryDirectory(removeTreeOnDeinit: true) { (tempDir: AbsolutePath) in + return try await withTemporaryDirectory(removeTreeOnDeinit: true) { (tempDir: AbsolutePath) in let tempPath = tempDir.appending(component: url.lastPathComponent) let repositorySpecifier = RepositorySpecifier(url: url) try self.repositoryProvider.fetch( @@ -240,7 +240,7 @@ public struct PackageSearchClient { to: tempPath, progressHandler: nil ) - guard try self.repositoryProvider.isValidDirectory(tempPath), let repository = try self.repositoryProvider.open( + guard try self.repositoryProvider.isValidDirectory(tempPath), let repository = try await self.repositoryProvider.open( repository: repositorySpecifier, at: tempPath ) as? GitRepository else { diff --git a/Sources/PackageRegistry/RegistryDownloadsManager.swift b/Sources/PackageRegistry/RegistryDownloadsManager.swift index cadddb585be..db92f4543af 100644 --- a/Sources/PackageRegistry/RegistryDownloadsManager.swift +++ b/Sources/PackageRegistry/RegistryDownloadsManager.swift @@ -27,7 +27,7 @@ public class RegistryDownloadsManager: AsyncCancellable { private let path: Basics.AbsolutePath private let cachePath: Basics.AbsolutePath? private let registryClient: RegistryClient - private let delegate: Delegate? + private let delegate: RegistryDownloadManagerDelegateProxy? struct PackageLookup: Hashable { let package: PackageIdentity @@ -48,14 +48,13 @@ public class RegistryDownloadsManager: AsyncCancellable { self.path = path self.cachePath = cachePath self.registryClient = registryClient - self.delegate = delegate + self.delegate = RegistryDownloadManagerDelegateProxy(delegate) } public func lookup( package: PackageIdentity, version: Version, - observabilityScope: ObservabilityScope, - delegateQueue: DispatchQueue + observabilityScope: ObservabilityScope ) async throws -> Basics.AbsolutePath { let packageRelativePath: Basics.RelativePath let packagePath: Basics.AbsolutePath @@ -82,9 +81,9 @@ public class RegistryDownloadsManager: AsyncCancellable { // inform delegate that we are starting to fetch // calculate if cached (for delegate call) outside queue as it may change while queue is processing let isCached = self.cachePath.map { self.fileSystem.exists($0.appending(packageRelativePath)) } ?? false - delegateQueue.async { [delegate = self.delegate] in + Task { let details = FetchDetails(fromCache: isCached, updatedCache: false) - delegate?.willFetch(package: package, version: version, fetchDetails: details) + await delegate?.willFetch(package: package, version: version, fetchDetails: details) } // make sure destination is free. @@ -96,18 +95,17 @@ public class RegistryDownloadsManager: AsyncCancellable { package: package, version: version, packagePath: packagePath, - observabilityScope: observabilityScope, - delegateQueue: delegateQueue + observabilityScope: observabilityScope ) // inform delegate that we finished to fetch let duration = start.distance(to: .now()) - delegateQueue.async { [delegate = self.delegate] in - delegate?.didFetch(package: package, version: version, result: .success(result), duration: duration) + Task { + await delegate?.didFetch(package: package, version: version, result: .success(result), duration: duration) } } catch { let duration = start.distance(to: .now()) - delegateQueue.async { [delegate = self.delegate] in - delegate?.didFetch(package: package, version: version, result: .failure(error), duration: duration) + Task { + await delegate?.didFetch(package: package, version: version, result: .failure(error), duration: duration) } throw error } @@ -126,7 +124,6 @@ public class RegistryDownloadsManager: AsyncCancellable { package: PackageIdentity, version: Version, observabilityScope: ObservabilityScope, - delegateQueue: DispatchQueue, callbackQueue: DispatchQueue, completion: @escaping @Sendable (Result) -> Void ) { @@ -134,8 +131,7 @@ public class RegistryDownloadsManager: AsyncCancellable { try await self.lookup( package: package, version: version, - observabilityScope: observabilityScope, - delegateQueue: delegateQueue + observabilityScope: observabilityScope ) } } @@ -149,8 +145,7 @@ public class RegistryDownloadsManager: AsyncCancellable { package: PackageIdentity, version: Version, packagePath: Basics.AbsolutePath, - observabilityScope: ObservabilityScope, - delegateQueue: DispatchQueue + observabilityScope: ObservabilityScope ) async throws -> FetchDetails { if let cachePath { do { @@ -238,8 +233,8 @@ public class RegistryDownloadsManager: AsyncCancellable { // utility to update progress @Sendable func updateDownloadProgress(downloaded: Int64, total: Int64?) { - delegateQueue.async { [delegate = self.delegate] in - delegate?.fetching( + Task { + await delegate?.fetching( package: package, version: version, bytesDownloaded: downloaded, @@ -327,6 +322,34 @@ public protocol RegistryDownloadsManagerDelegate: Sendable { func fetching(package: PackageIdentity, version: Version, bytesDownloaded: Int64, totalBytesToDownload: Int64?) } +actor RegistryDownloadManagerDelegateProxy { + private let delegate: RegistryDownloadsManagerDelegate + + init?(_ delegate: RegistryDownloadsManagerDelegate?) { + guard let delegate else { + return nil + } + self.delegate = delegate + } + + func willFetch(package: PackageIdentity, version: Version, fetchDetails: RegistryDownloadsManager.FetchDetails) { + self.delegate.willFetch(package: package, version: version, fetchDetails: fetchDetails) + } + + func didFetch( + package: PackageIdentity, + version: Version, + result: Result, + duration: DispatchTimeInterval + ) { + self.delegate.didFetch(package: package, version: version, result: result, duration: duration) + } + + func fetching(package: PackageIdentity, version: Version, bytesDownloaded: Int64, totalBytesToDownload: Int64?) { + self.delegate.fetching(package: package, version: version, bytesDownloaded: bytesDownloaded, totalBytesToDownload: totalBytesToDownload) + } +} + extension Dictionary where Key == RegistryDownloadsManager.PackageLookup { fileprivate mutating func removeValue(forPackage package: PackageIdentity) { self.keys diff --git a/Sources/SourceControl/Repository.swift b/Sources/SourceControl/Repository.swift index 22c7895811f..6f8efa52890 100644 --- a/Sources/SourceControl/Repository.swift +++ b/Sources/SourceControl/Repository.swift @@ -79,7 +79,7 @@ extension RepositorySpecifier: CustomStringConvertible { /// This protocol defines the lower level interface used to to access /// repositories. High-level clients should access repositories via a /// `RepositoryManager`. -public protocol RepositoryProvider: Cancellable { +public protocol RepositoryProvider: Cancellable, Sendable { /// Fetch the complete repository at the given location to `path`. /// /// - Parameters: @@ -98,7 +98,7 @@ public protocol RepositoryProvider: Cancellable { /// repository has previously been created via `fetch`. /// /// - Throws: If the repository is unable to be opened. - func open(repository: RepositorySpecifier, at path: AbsolutePath) throws -> Repository + func open(repository: RepositorySpecifier, at path: AbsolutePath) async throws -> Repository /// Create a working copy from a managed repository. /// @@ -276,7 +276,7 @@ public protocol WorkingCheckout { } /// A single repository revision. -public struct Revision: Hashable { +public struct Revision: Hashable, Sendable { /// A precise identifier for a single repository revision, in a repository-specified manner. /// /// This string is intended to be opaque to the client, but understandable @@ -289,7 +289,7 @@ public struct Revision: Hashable { } } -public protocol FetchProgress { +public protocol FetchProgress: Sendable { typealias Handler = (FetchProgress) -> Void var message: String { get } diff --git a/Sources/SourceControl/RepositoryManager.swift b/Sources/SourceControl/RepositoryManager.swift index b238d5b65bd..13388a752e8 100644 --- a/Sources/SourceControl/RepositoryManager.swift +++ b/Sources/SourceControl/RepositoryManager.swift @@ -34,22 +34,17 @@ public class RepositoryManager: Cancellable { private let provider: RepositoryProvider /// The delegate interface. - private let delegate: Delegate? - - /// DispatchSemaphore to restrict concurrent operations on manager. - private let concurrencySemaphore: DispatchSemaphore - /// OperationQueue to park pending lookups - private let lookupQueue: OperationQueue + private let delegate: RepositoryManagerDelegateProxy? /// The filesystem to operate on. private let fileSystem: FileSystem // tracks outstanding lookups for de-duping requests - private var pendingLookups = [RepositorySpecifier: DispatchGroup]() + private var pendingLookups = [RepositorySpecifier: Task]() private var pendingLookupsLock = NSLock() - // tracks outstanding lookups for cancellation - private var outstandingLookups = ThreadSafeKeyValueStore) -> Void, queue: DispatchQueue)>() + // Limits how many concurrent operations can be performed at once. + private let asyncOperationQueue: AsyncOperationQueue private var emitNoConnectivityWarning = ThreadSafeBox(true) @@ -82,35 +77,11 @@ public class RepositoryManager: Cancellable { self.cacheLocalPackages = cacheLocalPackages self.provider = provider - self.delegate = delegate + self.delegate = RepositoryManagerDelegateProxy(delegate) // this queue and semaphore is used to limit the amount of concurrent git operations taking place - let maxConcurrentOperations = max(1, maxConcurrentOperations ?? 3*Concurrency.maxOperations/4) - self.lookupQueue = OperationQueue() - self.lookupQueue.name = "org.swift.swiftpm.repository-manager" - self.lookupQueue.maxConcurrentOperationCount = maxConcurrentOperations - self.concurrencySemaphore = DispatchSemaphore(value: maxConcurrentOperations) - } - - public func lookup( - package: PackageIdentity, - repository: RepositorySpecifier, - updateStrategy: RepositoryUpdateStrategy, - observabilityScope: ObservabilityScope, - delegateQueue: DispatchQueue, - callbackQueue: DispatchQueue - ) async throws -> RepositoryHandle { - try await withCheckedThrowingContinuation { continuation in - self.lookup( - package: package, - repository: repository, - updateStrategy: updateStrategy, - observabilityScope: observabilityScope, - delegateQueue: delegateQueue, - callbackQueue: callbackQueue, - completion: { continuation.resume(with: $0) } - ) - } + let maxConcurrentOperations = max(1, maxConcurrentOperations ?? (3 * Concurrency.maxOperations / 4)) + self.asyncOperationQueue = AsyncOperationQueue(concurrentTasks: maxConcurrentOperations) } /// Get a handle to a repository. @@ -134,96 +105,105 @@ public class RepositoryManager: Cancellable { repository: RepositorySpecifier, updateStrategy: RepositoryUpdateStrategy, observabilityScope: ObservabilityScope, - delegateQueue: DispatchQueue, callbackQueue: DispatchQueue, - completion: @escaping (Result) -> Void + completion: @escaping @Sendable (Result) -> Void ) { - // records outstanding lookups for cancellation purposes - let lookupKey = UUID() - self.outstandingLookups[lookupKey] = (repository: repository, completion: completion, queue: callbackQueue) - - // wrap the callback in the requested queue and cleanup operations - let completion: (Result) -> Void = { result in - // free concurrency control semaphore - self.concurrencySemaphore.signal() - // remove any pending lookup - self.pendingLookupsLock.lock() - self.pendingLookups[repository]?.leave() - self.pendingLookups[repository] = nil - self.pendingLookupsLock.unlock() - // cancellation support - // if the callback is no longer on the pending lists it has been canceled already - // read + remove from outstanding requests atomically - if let (_, callback, queue) = self.outstandingLookups.removeValue(forKey: lookupKey) { - // call back on the request queue - queue.async { callback(result) } - } + callbackQueue.asyncResult(completion) { + // check if the repository is already being looked up + // if so, wait for it to finish and return the result + try await self.lookup( + package: package, + repository: repository, + updateStrategy: updateStrategy, + observabilityScope: observabilityScope + ) } + } + + public func lookup( + package: PackageIdentity, + repository repositorySpecifier: RepositorySpecifier, + updateStrategy: RepositoryUpdateStrategy, + observabilityScope: ObservabilityScope + ) async throws -> RepositoryHandle { + return try await self.asyncOperationQueue.withOperation { + let task = await withCheckedContinuation { continuation in + self.pendingLookupsLock.lock() + defer { self.pendingLookupsLock.unlock() } + + let lookupTask: Task + if let inFlight = self.pendingLookups[repositorySpecifier] { + lookupTask = Task { + // Let the existing in-flight task finish before queuing up the new one + let _ = try await inFlight.value + + if Task.isCancelled { + throw CancellationError() + } - // we must not block the calling thread (for concurrency control) so nesting this in a queue - self.lookupQueue.addOperation { - // park the lookup thread based on the max concurrency allowed - self.concurrencySemaphore.wait() - - // check if there is a pending lookup - self.pendingLookupsLock.lock() - if let pendingLookup = self.pendingLookups[repository] { - self.pendingLookupsLock.unlock() - // chain onto the pending lookup - return pendingLookup.notify(queue: .sharedConcurrent) { - // at this point the previous lookup should be complete and we can re-lookup - completion(.init(catching: { - try self.lookup( + let result = try await self.performLookup( package: package, - repository: repository, + repository: repositorySpecifier, updateStrategy: updateStrategy, - observabilityScope: observabilityScope, - delegateQueue: delegateQueue + observabilityScope: observabilityScope ) - })) + + if Task.isCancelled { + throw CancellationError() + } + + return result + } + } else { + lookupTask = Task { + if Task.isCancelled { + throw CancellationError() + } + + let result = try await self.performLookup( + package: package, + repository: repositorySpecifier, + updateStrategy: updateStrategy, + observabilityScope: observabilityScope + ) + + if Task.isCancelled { + throw CancellationError() + } + + return result + } } - } else { - // record the pending lookup - assert(self.pendingLookups[repository] == nil) - let group = DispatchGroup() - group.enter() - self.pendingLookups[repository] = group - self.pendingLookupsLock.unlock() + + self.pendingLookups[repositorySpecifier] = lookupTask + continuation.resume(returning: lookupTask) } - completion(.init(catching: { - try self.lookup( - package: package, - repository: repository, - updateStrategy: updateStrategy, - observabilityScope: observabilityScope, - delegateQueue: delegateQueue - ) - })) + do { + let result = try await task.value + return result + } catch { + throw error + } } } - // sync version of the lookup, - // this is here because it simplifies reading & maintaining the logical flow - // while the underlying git client is sync - // once we move to an async git client we would need to get rid of this - // sync func and roll the logic into the async version above - private func lookup( + private func performLookup( package: PackageIdentity, repository repositorySpecifier: RepositorySpecifier, updateStrategy: RepositoryUpdateStrategy, - observabilityScope: ObservabilityScope, - delegateQueue: DispatchQueue - ) throws -> RepositoryHandle { + observabilityScope: ObservabilityScope + ) async throws -> RepositoryHandle { let relativePath = try repositorySpecifier.storagePath() let repositoryPath = self.path.appending(relativePath) let handle = RepositoryHandle(manager: self, repository: repositorySpecifier, subpath: relativePath) + let delegate = self.delegate // check if a repository already exists // errors when trying to check if a repository already exists are legitimate // and recoverable, and as such can be ignored quick: if (try? self.provider.isValidDirectory(repositoryPath)) ?? false { - let repository = try handle.open() + let repository = try await handle.open() guard ((try? self.provider.isValidDirectory(repositoryPath, for: repositorySpecifier)) ?? false) else { observabilityScope.emit(warning: "\(repositoryPath) is not valid git repository for '\(repositorySpecifier.location)', will fetch again.") @@ -234,14 +214,14 @@ public class RepositoryManager: Cancellable { if self.fetchRequired(repository: repository, updateStrategy: updateStrategy) { let start = DispatchTime.now() - delegateQueue.async { - self.delegate?.willUpdate(package: package, repository: handle.repository) + Task { + await delegate?.willUpdate(package: package, repository: handle.repository) } try repository.fetch() let duration = start.distance(to: .now()) - delegateQueue.async { - self.delegate?.didUpdate(package: package, repository: handle.repository, duration: duration) + Task { + await delegate?.didUpdate(package: package, repository: handle.repository, duration: duration) } } @@ -250,50 +230,51 @@ public class RepositoryManager: Cancellable { // inform delegate that we are starting to fetch // calculate if cached (for delegate call) outside queue as it may change while queue is processing - let isCached = self.cachePath.map{ self.fileSystem.exists($0.appending(handle.subpath)) } ?? false - delegateQueue.async { + let isCached = self.cachePath.map { self.fileSystem.exists($0.appending(handle.subpath)) } ?? false + Task { let details = FetchDetails(fromCache: isCached, updatedCache: false) - self.delegate?.willFetch(package: package, repository: handle.repository, details: details) + await delegate?.willFetch(package: package, repository: handle.repository, details: details) } // perform the fetch let start = DispatchTime.now() - let fetchResult = Result(catching: { + do { // make sure destination is free. try? self.fileSystem.removeFileTree(repositoryPath) // fetch the repo and cache the results - return try self.fetchAndPopulateCache( + let result = try await self.fetchAndPopulateCache( package: package, handle: handle, repositoryPath: repositoryPath, updateStrategy: updateStrategy, - observabilityScope: observabilityScope, - delegateQueue: delegateQueue + observabilityScope: observabilityScope ) - }) - - // inform delegate fetch is done - let duration = start.distance(to: .now()) - delegateQueue.async { - self.delegate?.didFetch(package: package, repository: handle.repository, result: fetchResult, duration: duration) + // inform delegate fetch is done + let duration = start.distance(to: .now()) + Task { + await delegate?.didFetch(package: package, repository: handle.repository, result: .success(result), duration: duration) + } + return handle + } catch { + // inform delegate fetch is done + let duration = start.distance(to: .now()) + Task { + await delegate?.didFetch(package: package, repository: handle.repository, result: .failure(error), duration: duration) + } + throw error } - - // at this point we can throw, as we already notified the delegate above - _ = try fetchResult.get() - - return handle } public func cancel(deadline: DispatchTime) throws { // ask the provider to cancel try self.provider.cancel(deadline: deadline) - // cancel any outstanding lookups - let outstanding = self.outstandingLookups.clear() - for (_, callback, queue) in outstanding.values { - queue.async { - callback(.failure(CancellationError())) - } + + self.pendingLookupsLock.lock() + defer { self.pendingLookupsLock.unlock() } + for task in self.pendingLookups.values { + task.cancel() } + self.pendingLookups = [:] } /// Fetches the repository into the cache. If no `cachePath` is set or an error occurred fall back to fetching the repository without populating the cache. @@ -309,18 +290,17 @@ public class RepositoryManager: Cancellable { handle: RepositoryHandle, repositoryPath: Basics.AbsolutePath, updateStrategy: RepositoryUpdateStrategy, - observabilityScope: ObservabilityScope, - delegateQueue: DispatchQueue - ) throws -> FetchDetails { + observabilityScope: ObservabilityScope + ) async throws -> FetchDetails { var cacheUsed = false var cacheUpdated = false // utility to update progress - func updateFetchProgress(progress: FetchProgress) -> Void { if let total = progress.totalSteps { - delegateQueue.async { - self.delegate?.fetching( + let delegate = self.delegate + Task { + await delegate?.fetching( package: package, repository: handle.repository, objectsFetched: progress.step, @@ -337,11 +317,11 @@ public class RepositoryManager: Cancellable { let cachedRepositoryPath = try cachePath.appending(handle.repository.storagePath()) do { try self.initializeCacheIfNeeded(cachePath: cachePath) - try self.fileSystem.withLock(on: cachePath, type: .shared) { - try self.fileSystem.withLock(on: cachedRepositoryPath, type: .exclusive) { + try await self.fileSystem.withLock(on: cachePath, type: .shared) { + try await self.fileSystem.withLock(on: cachedRepositoryPath, type: .exclusive) { // Fetch the repository into the cache. if (self.fileSystem.exists(cachedRepositoryPath)) { - let repo = try self.provider.open(repository: handle.repository, at: cachedRepositoryPath) + let repo = try await self.provider.open(repository: handle.repository, at: cachedRepositoryPath) if self.fetchRequired(repository: repo, updateStrategy: updateStrategy) { try repo.fetch(progress: updateFetchProgress(progress:)) } @@ -421,8 +401,8 @@ public class RepositoryManager: Cancellable { } /// Open a repository from a handle. - private func open(_ handle: RepositoryHandle) throws -> Repository { - try self.provider.open( + private func open(_ handle: RepositoryHandle) async throws -> Repository { + try await self.provider.open( repository: handle.repository, at: self.path.appending(handle.subpath) ) @@ -516,7 +496,7 @@ public class RepositoryManager: Cancellable { extension RepositoryManager { /// Handle to a managed repository. - public struct RepositoryHandle { + public struct RepositoryHandle: Sendable { /// The manager this repository is owned by. private unowned let manager: RepositoryManager @@ -537,8 +517,8 @@ extension RepositoryManager { } /// Open the given repository. - public func open() throws -> Repository { - return try self.manager.open(self) + public func open() async throws -> Repository { + return try await self.manager.open(self) } /// Create a working copy at on the local file system. @@ -556,7 +536,7 @@ extension RepositoryManager { extension RepositoryManager { /// Additional information about a fetch - public struct FetchDetails: Equatable { + public struct FetchDetails: Equatable, Sendable { /// Indicates if the repository was fetched from the cache or from the remote. public let fromCache: Bool /// Indicates whether the repository was already present in the cache and updated or if a clean fetch was performed. @@ -564,14 +544,14 @@ extension RepositoryManager { } } -public enum RepositoryUpdateStrategy { +public enum RepositoryUpdateStrategy: Sendable { case never case always case ifNeeded(revision: Revision) } /// Delegate to notify clients about actions being performed by RepositoryManager. -public protocol RepositoryManagerDelegate { +public protocol RepositoryManagerDelegate: Sendable { /// Called when a repository is about to be fetched. func willFetch(package: PackageIdentity, repository: RepositorySpecifier, details: RepositoryManager.FetchDetails) @@ -588,6 +568,38 @@ public protocol RepositoryManagerDelegate { func didUpdate(package: PackageIdentity, repository: RepositorySpecifier, duration: DispatchTimeInterval) } +/// Actor to proxy the delegate methods to the actual delegate, ensuring serialized delegate calls. +fileprivate actor RepositoryManagerDelegateProxy { + private let delegate: RepositoryManagerDelegate + + init?(_ delegate: RepositoryManagerDelegate?) { + guard let delegate else { + return nil + } + self.delegate = delegate + } + + func willFetch(package: PackageIdentity, repository: RepositorySpecifier, details: RepositoryManager.FetchDetails) { + delegate.willFetch(package: package, repository: repository, details: details) + } + + func fetching(package: PackageIdentity, repository: RepositorySpecifier, objectsFetched: Int, totalObjectsToFetch: Int) { + delegate.fetching(package: package, repository: repository, objectsFetched: objectsFetched, totalObjectsToFetch: totalObjectsToFetch) + } + + func didFetch(package: PackageIdentity, repository: RepositorySpecifier, result: Result, duration: DispatchTimeInterval) { + delegate.didFetch(package: package, repository: repository, result: result, duration: duration) + } + + func willUpdate(package: PackageIdentity, repository: RepositorySpecifier) { + delegate.willUpdate(package: package, repository: repository) + } + + func didUpdate(package: PackageIdentity, repository: RepositorySpecifier, duration: DispatchTimeInterval) { + delegate.didUpdate(package: package, repository: repository, duration: duration) + } +} + extension RepositoryManager.RepositoryHandle: CustomStringConvertible { public var description: String { diff --git a/Sources/Workspace/ResolverPrecomputationProvider.swift b/Sources/Workspace/ResolverPrecomputationProvider.swift index 5319db97c10..630c1dc56ae 100644 --- a/Sources/Workspace/ResolverPrecomputationProvider.swift +++ b/Sources/Workspace/ResolverPrecomputationProvider.swift @@ -57,36 +57,32 @@ struct ResolverPrecomputationProvider: PackageContainerProvider { func getContainer( for package: PackageReference, updateStrategy: ContainerUpdateStrategy, - observabilityScope: ObservabilityScope, - on queue: DispatchQueue, - completion: @escaping (Result) -> Void - ) { - queue.async { - // Start by searching manifests from the Workspace's resolved dependencies. - if let manifest = self.dependencyManifests.dependencies.first(where: { _, managed, _, _ in managed.packageRef == package }) { - let container = LocalPackageContainer( - package: package, - manifest: manifest.manifest, - dependency: manifest.dependency, - currentToolsVersion: self.currentToolsVersion - ) - return completion(.success(container)) - } - - // Continue searching from the Workspace's root manifests. - if let rootPackage = self.dependencyManifests.root.packages[package.identity] { - let container = LocalPackageContainer( - package: package, - manifest: rootPackage.manifest, - dependency: nil, - currentToolsVersion: self.currentToolsVersion - ) - return completion(.success(container)) - } + observabilityScope: ObservabilityScope + ) async throws -> PackageContainer { + // Start by searching manifests from the Workspace's resolved dependencies. + if let manifest = self.dependencyManifests.dependencies.first(where: { _, managed, _, _ in managed.packageRef == package }) { + let container = LocalPackageContainer( + package: package, + manifest: manifest.manifest, + dependency: manifest.dependency, + currentToolsVersion: self.currentToolsVersion + ) + return container + } - // As we don't have anything else locally, error out. - completion(.failure(ResolverPrecomputationError.missingPackage(package: package))) + // Continue searching from the Workspace's root manifests. + if let rootPackage = self.dependencyManifests.root.packages[package.identity] { + let container = LocalPackageContainer( + package: package, + manifest: rootPackage.manifest, + dependency: nil, + currentToolsVersion: self.currentToolsVersion + ) + return container } + + // As we don't have anything else locally, error out. + throw ResolverPrecomputationError.missingPackage(package: package) } } diff --git a/Sources/Workspace/Workspace+Dependencies.swift b/Sources/Workspace/Workspace+Dependencies.swift index af20f11c39f..0dcacd84e2a 100644 --- a/Sources/Workspace/Workspace+Dependencies.swift +++ b/Sources/Workspace/Workspace+Dependencies.swift @@ -72,7 +72,6 @@ extension Workspace { // Create cache directories. self.createCacheDirectories(observabilityScope: observabilityScope) - // FIXME: this should not block // Load the root manifests and currently checked out manifests. let rootManifests = try await self.loadRootManifests( packages: root.packages, @@ -345,7 +344,6 @@ extension Workspace { // Ensure the cache path exists. self.createCacheDirectories(observabilityScope: observabilityScope) - // FIXME: this should not block let rootManifests = try await self.loadRootManifests( packages: root.packages, observabilityScope: observabilityScope @@ -403,8 +401,7 @@ extension Workspace { _ = try await self.packageContainerProvider.getContainer( for: resolvedPackage.packageRef, updateStrategy: updateStrategy, - observabilityScope: observabilityScope, - on: .sharedConcurrent + observabilityScope: observabilityScope ) } } @@ -747,12 +744,10 @@ extension Workspace { ) async throws -> AbsolutePath { switch requirement { case .version(let version): - // FIXME: this should not block let container = try await packageContainerProvider.getContainer( for: package, updateStrategy: ContainerUpdateStrategy.never, - observabilityScope: observabilityScope, - on: .sharedConcurrent + observabilityScope: observabilityScope ) if let container = container as? SourceControlPackageContainer { @@ -1066,8 +1061,7 @@ extension Workspace { packageContainerProvider.getContainer( for: binding.package, updateStrategy: .never, - observabilityScope: observabilityScope, - on: .sharedConcurrent + observabilityScope: observabilityScope ) as? SourceControlPackageContainer else { throw InternalError( diff --git a/Sources/Workspace/Workspace+Editing.swift b/Sources/Workspace/Workspace+Editing.swift index 0e2609aeedf..62d5a7a96ef 100644 --- a/Sources/Workspace/Workspace+Editing.swift +++ b/Sources/Workspace/Workspace+Editing.swift @@ -98,17 +98,14 @@ extension Workspace { // Otherwise, create a checkout at the destination from our repository store. // // Get handle to the repository. - // TODO: replace with async/await when available let repository = try dependency.packageRef.makeRepositorySpecifier() let handle = try await repositoryManager.lookup( package: dependency.packageRef.identity, repository: repository, updateStrategy: .never, - observabilityScope: observabilityScope, - delegateQueue: .sharedConcurrent, - callbackQueue: .sharedConcurrent + observabilityScope: observabilityScope ) - let repo = try handle.open() + let repo = try await handle.open() // Do preliminary checks on branch and revision, if provided. if let branch = checkoutBranch, repo.exists(revision: Revision(identifier: branch)) { diff --git a/Sources/Workspace/Workspace+Manifests.swift b/Sources/Workspace/Workspace+Manifests.swift index ea90595433b..6a23983e8a5 100644 --- a/Sources/Workspace/Workspace+Manifests.swift +++ b/Sources/Workspace/Workspace+Manifests.swift @@ -990,8 +990,7 @@ extension Workspace { let container = try await self.packageContainerProvider.getContainer( for: dependency.packageRef, updateStrategy: .never, - observabilityScope: observabilityScope, - on: .sharedConcurrent + observabilityScope: observabilityScope ) if let customContainer = container as? CustomPackageContainer { let newPath = try customContainer.retrieve(at: version, observabilityScope: observabilityScope) diff --git a/Sources/Workspace/Workspace+PackageContainer.swift b/Sources/Workspace/Workspace+PackageContainer.swift index 0b0a824f8fd..d2fd3eda3a3 100644 --- a/Sources/Workspace/Workspace+PackageContainer.swift +++ b/Sources/Workspace/Workspace+PackageContainer.swift @@ -14,7 +14,6 @@ import SourceControl import TSCBasic import class Basics.ObservabilityScope -import func Dispatch.dispatchPrecondition import class Dispatch.DispatchQueue import enum PackageFingerprint.FingerprintCheckingMode import enum PackageGraph.ContainerUpdateStrategy @@ -28,79 +27,61 @@ extension Workspace: PackageContainerProvider { public func getContainer( for package: PackageReference, updateStrategy: ContainerUpdateStrategy, - observabilityScope: ObservabilityScope, - on queue: DispatchQueue, - completion: @escaping (Result) -> Void - ) { - do { - switch package.kind { - // If the container is local, just create and return a local package container. - case .root, .fileSystem: - let container = try FileSystemPackageContainer( - package: package, - identityResolver: self.identityResolver, - dependencyMapper: self.dependencyMapper, - manifestLoader: self.manifestLoader, - currentToolsVersion: self.currentToolsVersion, - fileSystem: self.fileSystem, - observabilityScope: observabilityScope - ) - queue.async { - completion(.success(container)) - } - // Resolve the container using the repository manager. - case .localSourceControl, .remoteSourceControl: - let repositorySpecifier = try package.makeRepositorySpecifier() - self.repositoryManager.lookup( - package: package.identity, - repository: repositorySpecifier, - updateStrategy: updateStrategy.repositoryUpdateStrategy, - observabilityScope: observabilityScope, - delegateQueue: queue, - callbackQueue: queue - ) { result in - dispatchPrecondition(condition: .onQueue(queue)) - // Create the container wrapper. - let result = result.tryMap { handle -> PackageContainer in - // Open the repository. - // - // FIXME: Do we care about holding this open for the lifetime of the container. - let repository = try handle.open() - return try SourceControlPackageContainer( - package: package, - identityResolver: self.identityResolver, - dependencyMapper: self.dependencyMapper, - repositorySpecifier: repositorySpecifier, - repository: repository, - manifestLoader: self.manifestLoader, - currentToolsVersion: self.currentToolsVersion, - fingerprintStorage: self.fingerprints, - fingerprintCheckingMode: FingerprintCheckingMode - .map(self.configuration.fingerprintCheckingMode), - observabilityScope: observabilityScope - ) - } - completion(result) - } - // Resolve the container using the registry - case .registry: - let container = RegistryPackageContainer( - package: package, - identityResolver: self.identityResolver, - dependencyMapper: self.dependencyMapper, - registryClient: self.registryClient, - manifestLoader: self.manifestLoader, - currentToolsVersion: self.currentToolsVersion, - observabilityScope: observabilityScope - ) - queue.async { - completion(.success(container)) - } - } - } catch { - queue.async { - completion(.failure(error)) - } + observabilityScope: ObservabilityScope + ) async throws -> any PackageContainer { + switch package.kind { + // If the container is local, just create and return a local package container. + case .root, .fileSystem: + let container = try FileSystemPackageContainer( + package: package, + identityResolver: self.identityResolver, + dependencyMapper: self.dependencyMapper, + manifestLoader: self.manifestLoader, + currentToolsVersion: self.currentToolsVersion, + fileSystem: self.fileSystem, + observabilityScope: observabilityScope + ) + return container + // Resolve the container using the repository manager. + case .localSourceControl, .remoteSourceControl: + let repositorySpecifier = try package.makeRepositorySpecifier() + let handle = try await self.repositoryManager.lookup( + package: package.identity, + repository: repositorySpecifier, + updateStrategy: updateStrategy.repositoryUpdateStrategy, + observabilityScope: observabilityScope + ) + + // Open the repository. + // + // FIXME: Do we care about holding this open for the lifetime of the container. + let repository = try await handle.open() + let result = try SourceControlPackageContainer( + package: package, + identityResolver: self.identityResolver, + dependencyMapper: self.dependencyMapper, + repositorySpecifier: repositorySpecifier, + repository: repository, + manifestLoader: self.manifestLoader, + currentToolsVersion: self.currentToolsVersion, + fingerprintStorage: self.fingerprints, + fingerprintCheckingMode: FingerprintCheckingMode + .map(self.configuration.fingerprintCheckingMode), + observabilityScope: observabilityScope + ) + return result + // Resolve the container using the registry + case .registry: + let container = RegistryPackageContainer( + package: package, + identityResolver: self.identityResolver, + dependencyMapper: self.dependencyMapper, + registryClient: self.registryClient, + manifestLoader: self.manifestLoader, + currentToolsVersion: self.currentToolsVersion, + observabilityScope: observabilityScope + ) + return container } } } diff --git a/Sources/Workspace/Workspace+Registry.swift b/Sources/Workspace/Workspace+Registry.swift index 329ff414491..c9b3feebed1 100644 --- a/Sources/Workspace/Workspace+Registry.swift +++ b/Sources/Workspace/Workspace+Registry.swift @@ -395,12 +395,10 @@ extension Workspace { at version: Version, observabilityScope: ObservabilityScope ) async throws -> AbsolutePath { - // FIXME: this should not block let downloadPath = try await self.registryDownloadsManager.lookup( package: package.identity, version: version, - observabilityScope: observabilityScope, - delegateQueue: .sharedConcurrent + observabilityScope: observabilityScope ) // Record the new state. diff --git a/Sources/Workspace/Workspace+SourceControl.swift b/Sources/Workspace/Workspace+SourceControl.swift index 6d6eb611176..8c8c63f6fe3 100644 --- a/Sources/Workspace/Workspace+SourceControl.swift +++ b/Sources/Workspace/Workspace+SourceControl.swift @@ -172,14 +172,11 @@ extension Workspace { } // If not, we need to get the repository from the checkouts. - // FIXME: this should not block let handle = try await self.repositoryManager.lookup( package: package.identity, repository: repository, updateStrategy: .never, - observabilityScope: observabilityScope, - delegateQueue: .sharedConcurrent, - callbackQueue: .sharedConcurrent + observabilityScope: observabilityScope ) // Clone the repository into the checkouts. diff --git a/Sources/_InternalTestSupport/MockPackageContainer.swift b/Sources/_InternalTestSupport/MockPackageContainer.swift index e63361988c8..a6fcb46d605 100644 --- a/Sources/_InternalTestSupport/MockPackageContainer.swift +++ b/Sources/_InternalTestSupport/MockPackageContainer.swift @@ -171,14 +171,11 @@ public struct MockPackageContainerProvider: PackageContainerProvider { public func getContainer( for package: PackageReference, updateStrategy: ContainerUpdateStrategy, - observabilityScope: ObservabilityScope, - on queue: DispatchQueue, - completion: @escaping (Result - ) -> Void - ) { - queue.async { - completion(self.containersByIdentifier[package].map { .success($0) } ?? - .failure(StringError("unknown module \(package)"))) + observabilityScope: ObservabilityScope + ) async throws -> PackageContainer { + guard let container = self.containersByIdentifier[package] else { + throw StringError("unknown module \(package)") } + return container } } diff --git a/Sources/_InternalTestSupport/MockWorkspace.swift b/Sources/_InternalTestSupport/MockWorkspace.swift index 9091ae4fb41..0eed3d9293f 100644 --- a/Sources/_InternalTestSupport/MockWorkspace.swift +++ b/Sources/_InternalTestSupport/MockWorkspace.swift @@ -216,17 +216,11 @@ public final class MockWorkspace { identity: PackageIdentity(url: url), kind: .remoteSourceControl(url) ) - let container = try await withCheckedThrowingContinuation { continuation in - containerProvider.getContainer( - for: packageRef, - updateStrategy: .never, - observabilityScope: observability.topScope, - on: .sharedConcurrent, - completion: { - continuation.resume(with: $0) - } - ) - } + let container = try await containerProvider.getContainer( + for: packageRef, + updateStrategy: .never, + observabilityScope: observability.topScope + ) guard let customContainer = container as? CustomPackageContainer else { throw StringError("invalid custom container: \(container)") } diff --git a/Tests/BasicsTests/ConcurrencyHelpersTests.swift b/Tests/BasicsTests/ConcurrencyHelpersTests.swift index 0b589c5f4b1..e3ae5616b8e 100644 --- a/Tests/BasicsTests/ConcurrencyHelpersTests.swift +++ b/Tests/BasicsTests/ConcurrencyHelpersTests.swift @@ -16,69 +16,73 @@ import TSCTestSupport import Testing struct ConcurrencyHelpersTest { - let queue = DispatchQueue(label: "ConcurrencyHelpersTest", attributes: .concurrent) + @Suite + struct ThreadSafeKeyValueStoreTests { + let queue = DispatchQueue(label: "ConcurrencyHelpersTest", attributes: .concurrent) - @Test - func threadSafeKeyValueStore() throws { - for _ in 0 ..< 100 { - let sync = DispatchGroup() + @Test + func threadSafeKeyValueStore() throws { + for _ in 0 ..< 100 { + let sync = DispatchGroup() - var expected = [Int: Int]() - let lock = NSLock() + var expected = [Int: Int]() + let lock = NSLock() - let cache = ThreadSafeKeyValueStore() - for index in 0 ..< 1000 { - self.queue.async(group: sync) { - Thread.sleep(forTimeInterval: Double.random(in: 100 ... 300) * 1.0e-6) - let value = Int.random(in: Int.min ..< Int.max) - lock.withLock { - expected[index] = value - } - cache.memoize(index) { - value - } - cache.memoize(index) { - Int.random(in: Int.min ..< Int.max) + let cache = ThreadSafeKeyValueStore() + for index in 0 ..< 1000 { + self.queue.async(group: sync) { + Thread.sleep(forTimeInterval: Double.random(in: 100 ... 300) * 1.0e-6) + let value = Int.random(in: Int.min ..< Int.max) + lock.withLock { + expected[index] = value + } + cache.memoize(index) { + value + } + cache.memoize(index) { + Int.random(in: Int.min ..< Int.max) + } } } - } - try #require(sync.wait(timeout: .now() + .seconds(2)) == .success) - expected.forEach { key, value in - #expect(cache[key] == value) + try #require(sync.wait(timeout: .now() + .seconds(2)) == .success) + expected.forEach { key, value in + #expect(cache[key] == value) + } } } - } - @Test - func threadSafeArrayStore() throws { - for _ in 0 ..< 100 { - let sync = DispatchGroup() + @Test + func threadSafeArrayStore() throws { + for _ in 0 ..< 100 { + let sync = DispatchGroup() - var expected = [Int]() - let lock = NSLock() + var expected = [Int]() + let lock = NSLock() - let cache = ThreadSafeArrayStore() - for _ in 0 ..< 1000 { - self.queue.async(group: sync) { - Thread.sleep(forTimeInterval: Double.random(in: 100 ... 300) * 1.0e-6) - let value = Int.random(in: Int.min ..< Int.max) - lock.withLock { - expected.append(value) + let cache = ThreadSafeArrayStore() + for _ in 0 ..< 1000 { + self.queue.async(group: sync) { + Thread.sleep(forTimeInterval: Double.random(in: 100 ... 300) * 1.0e-6) + let value = Int.random(in: Int.min ..< Int.max) + lock.withLock { + expected.append(value) + } + cache.append(value) } - cache.append(value) } - } - try #require(sync.wait(timeout: .now() + .seconds(2)) == .success) - let expectedSorted = expected.sorted() - let resultsSorted = cache.get().sorted() - #expect(expectedSorted == resultsSorted) + try #require(sync.wait(timeout: .now() + .seconds(2)) == .success) + let expectedSorted = expected.sorted() + let resultsSorted = cache.get().sorted() + #expect(expectedSorted == resultsSorted) + } } } @Test func threadSafeBox() throws { + let queue = DispatchQueue(label: "ConcurrencyHelpersTest", attributes: .concurrent) for _ in 0 ..< 100 { let sync = DispatchGroup() @@ -89,7 +93,7 @@ struct ConcurrencyHelpersTest { let cache = ThreadSafeBox() for index in 0 ..< 1000 { - self.queue.async(group: sync) { + queue.async(group: sync) { Thread.sleep(forTimeInterval: Double.random(in: 100 ... 300) * 1.0e-6) serial.async(group: sync) { lock.withLock { @@ -108,4 +112,158 @@ struct ConcurrencyHelpersTest { #expect(cache.get() == winner) } } + + @Suite + struct AsyncOperationQueueTests { + fileprivate actor ResultsTracker { + var results = [Int]() + var maxConcurrent = 0 + var currentConcurrent = 0 + + func incrementConcurrent() { + currentConcurrent += 1 + maxConcurrent = max(maxConcurrent, currentConcurrent) + } + + func decrementConcurrent() { + currentConcurrent -= 1 + } + + func appendResult(_ value: Int) { + results.append(value) + } + } + + @Test + func limitsConcurrentOperations() async throws { + let queue = AsyncOperationQueue(concurrentTasks: 5) + + let totalTasks = 20 + let tracker = ResultsTracker() + + try await withThrowingTaskGroup(of: Void.self) { group in + for index in 0.. - ) -> Void) { - queue.async { - completion(self.containersByIdentifier[package].map{ .success($0) } ?? - .failure(_MockLoadingError.unknownModule)) + observabilityScope: ObservabilityScope + ) async throws -> PackageContainer { + guard let container = self.containersByIdentifier[package] else { + throw _MockLoadingError.unknownModule } + return container } } diff --git a/Tests/PackageRegistryTests/RegistryDownloadsManagerTests.swift b/Tests/PackageRegistryTests/RegistryDownloadsManagerTests.swift index 65bbd774dcf..7da84e7b493 100644 --- a/Tests/PackageRegistryTests/RegistryDownloadsManagerTests.swift +++ b/Tests/PackageRegistryTests/RegistryDownloadsManagerTests.swift @@ -409,17 +409,6 @@ private final class MockRegistryDownloadsManagerDelegate: RegistryDownloadsManag } } -extension RegistryDownloadsManager { - fileprivate func lookup(package: PackageIdentity, version: Version, observabilityScope: ObservabilityScope) async throws -> AbsolutePath { - try await self.lookup( - package: package, - version: version, - observabilityScope: observabilityScope, - delegateQueue: .sharedConcurrent - ) - } -} - fileprivate struct PackageVersion: Hashable, Equatable, Sendable { let package: PackageIdentity let version: Version diff --git a/Tests/SourceControlTests/RepositoryManagerTests.swift b/Tests/SourceControlTests/RepositoryManagerTests.swift index fd788eb2c6a..49efc60904b 100644 --- a/Tests/SourceControlTests/RepositoryManagerTests.swift +++ b/Tests/SourceControlTests/RepositoryManagerTests.swift @@ -48,7 +48,7 @@ final class RepositoryManagerTests: XCTestCase { XCTAssertEqual(provider.numFetches, 0) // Open the repository. - let repository = try! handle.open() + let repository = try! await handle.open() XCTAssertEqual(try! repository.getTags(), ["1.0.0"]) // Create a checkout of the repository. @@ -64,7 +64,6 @@ final class RepositoryManagerTests: XCTestCase { } // Get a bad repository. - do { delegate.prepare(fetchExpected: true, updateExpected: false) await XCTAssertAsyncThrowsError(try await manager.lookup(repository: badDummyRepo, observabilityScope: observability.topScope)) { error in @@ -359,9 +358,7 @@ final class RepositoryManagerTests: XCTestCase { package: PackageIdentity(path: dummyRepoPath), repository: dummyRepo, updateStrategy: .always, - observabilityScope: observability.topScope, - delegateQueue: .sharedConcurrent, - callbackQueue: .sharedConcurrent + observabilityScope: observability.topScope ) } } @@ -449,7 +446,6 @@ final class RepositoryManagerTests: XCTestCase { cancellator.register(name: "repository manager", handler: manager) - //let startGroup = DispatchGroup() let finishGroup = DispatchGroup() let results = ThreadSafeKeyValueStore>() for index in 0 ..< total { @@ -462,7 +458,6 @@ final class RepositoryManagerTests: XCTestCase { repository: repository, updateStrategy: .never, observabilityScope: observability.topScope, - delegateQueue: .sharedConcurrent, callbackQueue: .sharedConcurrent ) { result in defer { finishGroup.leave() } @@ -499,7 +494,7 @@ final class RepositoryManagerTests: XCTestCase { // the provider called in a thread managed by the RepositoryManager // the use of blocking semaphore is intentional - class MockRepositoryProvider: RepositoryProvider { + class MockRepositoryProvider: RepositoryProvider, @unchecked Sendable { let total: Int // this DispatchGroup is used to wait for the requests to start before calling cancel let startGroup = DispatchGroup() @@ -524,7 +519,6 @@ final class RepositoryManagerTests: XCTestCase { defer { self.outstandingGroup.leave() } print("\(repository) waiting to be cancelled") XCTAssertEqual(.success, self.terminatedGroup.wait(timeout: .now() + 5), "timeout waiting on terminated signal") - throw StringError("\(repository) should be cancelled") } print("\(repository) okay") } @@ -590,7 +584,7 @@ final class RepositoryManagerTests: XCTestCase { } } - class MockRepositoryProvider: RepositoryProvider { + class MockRepositoryProvider: RepositoryProvider, @unchecked Sendable { let repository: RepositorySpecifier var fetch: Int = 0 @@ -698,19 +692,12 @@ extension RepositoryManager { updateStrategy: RepositoryUpdateStrategy = .always, observabilityScope: ObservabilityScope ) async throws -> RepositoryHandle { - return try await withCheckedThrowingContinuation { continuation in - self.lookup( - package: .init(url: SourceControlURL(repository.url)), - repository: repository, - updateStrategy: updateStrategy, - observabilityScope: observabilityScope, - delegateQueue: .sharedConcurrent, - callbackQueue: .sharedConcurrent, - completion: { - continuation.resume(with: $0) - } - ) - } + try await self.lookup( + package: .init(url: SourceControlURL(repository.url)), + repository: repository, + updateStrategy: updateStrategy, + observabilityScope: observabilityScope + ) } } @@ -718,7 +705,7 @@ private enum DummyError: Swift.Error { case invalidRepository } -private class DummyRepositoryProvider: RepositoryProvider { +private class DummyRepositoryProvider: RepositoryProvider, @unchecked Sendable { private let fileSystem: FileSystem private let lock = NSLock() @@ -730,7 +717,7 @@ private class DummyRepositoryProvider: RepositoryProvider { } func fetch(repository: RepositorySpecifier, to path: AbsolutePath, progressHandler: FetchProgress.Handler? = nil) throws { - assert(!self.fileSystem.exists(path)) + assert(!self.fileSystem.exists(path), "\(path) should not exist") try self.fileSystem.createDirectory(path, recursive: true) try self.fileSystem.writeFileContents(path.appending("readme.md"), string: repository.location.description) @@ -860,7 +847,7 @@ private class DummyRepositoryProvider: RepositoryProvider { } } -fileprivate class DummyRepositoryManagerDelegate: RepositoryManager.Delegate { +fileprivate class DummyRepositoryManagerDelegate: RepositoryManager.Delegate, @unchecked Sendable { private var _willFetch = ThreadSafeArrayStore<(repository: RepositorySpecifier, details: RepositoryManager.FetchDetails)>() private var _didFetch = ThreadSafeArrayStore<(repository: RepositorySpecifier, result: Result)>() private var _willUpdate = ThreadSafeArrayStore() diff --git a/Tests/WorkspaceTests/RegistryPackageContainerTests.swift b/Tests/WorkspaceTests/RegistryPackageContainerTests.swift index 127fcdf1137..d7eb6a20461 100644 --- a/Tests/WorkspaceTests/RegistryPackageContainerTests.swift +++ b/Tests/WorkspaceTests/RegistryPackageContainerTests.swift @@ -330,7 +330,7 @@ final class RegistryPackageContainerTests: XCTestCase { let manifest = try await container.loadManifest(version: packageVersion) XCTAssertEqual(manifest.toolsVersion, .v5_5) } - + do { let provider = try createProvider(.v5) // the version of the alternate let ref = PackageReference.registry(identity: packageIdentity) @@ -504,8 +504,7 @@ extension PackageContainerProvider { try await self.getContainer( for: package, updateStrategy: updateStrategy, - observabilityScope: ObservabilitySystem.NOOP, - on: .global() + observabilityScope: ObservabilitySystem.NOOP ) } }