Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resume to resume failed downloads #33

Merged
merged 4 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,25 @@ extension S3FileTransferManager {
return try await self.sync(from: srcFolder, to: destFolder, delete: delete, options: options).get()
}

/// Resume download from S3 that previously failed
///
/// When a copy or sync to file system operation fails it will throw a
/// S3TransferManager.Error.downloadFailed error. This contains a `DownloadOperation`.
/// struct. You can resume the download by passing the struct to the this function.
///
/// - Parameters:
/// - download: Details of remaining downloads to perform
/// - options: Download options
/// - progress: Progress function
/// - Returns: EventLoopFuture fulfilled when operation is complete
public func resume(
download: DownloadOperation,
options: GetOptions = .init(),
progress: @escaping (Double) throws -> Void = { _ in }
) async throws {
try await self.resume(download: download, options: options, progress: progress).get()
}

/// delete a file on S3
public func delete(_ file: S3File) async throws {
try await self.delete(file).get()
Expand Down
110 changes: 82 additions & 28 deletions Sources/SotoS3FileTransfer/S3FileTransferManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class S3FileTransferManager {
}
}

/// List of file downloads to perform. This is return in a downloadFailed
/// error and can be passed to the resume function to resume the download
public struct DownloadOperation {
let transfers: [(from: S3FileDescriptor, to: String)]
}

/// Errors created by S3TransferManager
public enum Error: Swift.Error {
/// File you referenced doesn't exist
Expand All @@ -57,6 +63,8 @@ public class S3FileTransferManager {
case failedToEnumerateFolder(String)
/// Cannot download file from S3 as it is a folder on your local file system
case fileFolderClash(String, String)
/// download failed
case downloadFailed(Swift.Error, DownloadOperation)
}

/// S3 service object
Expand Down Expand Up @@ -330,19 +338,12 @@ public class S3FileTransferManager {
.flatMap { files in
let taskQueue = TaskQueue<Void>(maxConcurrentTasks: self.configuration.maxConcurrentTasks, on: eventLoop)
let transfers = Self.targetFiles(files: files, from: s3Folder, to: folder)
let folderProgress = FolderUploadProgress(files, progress: progress)
transfers.forEach { transfer in
taskQueue.submitTask {
self.copy(from: transfer.from.file, to: transfer.to, options: options) {
try folderProgress.updateProgress(transfer.from.file.key, progress: $0)
}.map { _ in
folderProgress.setFileUploaded(transfer.from.file.key)
}
}
}
return self.complete(taskQueue: taskQueue).map { _ in
assert(folderProgress.finished == true)
}
return self.copy(
transfers: transfers,
taskQueue: taskQueue,
options: options,
progress: progress
)
}
}

Expand Down Expand Up @@ -457,16 +458,6 @@ public class S3FileTransferManager {
guard file.modificationDate > transfer.from.modificationDate else { return transfer }
return nil
}
let folderProgress = FolderUploadProgress(transfers.map { $0.from }, progress: progress)
transfers.forEach { transfer in
taskQueue.submitTask {
self.copy(from: transfer.from.file, to: transfer.to, options: options) {
try folderProgress.updateProgress(transfer.from.file.key, progress: $0)
}.map { _ in
folderProgress.setFileUploaded(transfer.from.file.key)
}
}
}
// construct list of files to delete, if we are doing deletion
if delete == true {
let deletions = files.compactMap { file -> String? in
Expand All @@ -478,9 +469,12 @@ public class S3FileTransferManager {
}
deletions.forEach { deletion in taskQueue.submitTask { self.delete(deletion) } }
}
return self.complete(taskQueue: taskQueue).map { _ in
assert(folderProgress.finished == true)
}
return self.copy(
transfers: transfers,
taskQueue: taskQueue,
options: options,
progress: progress
)
}
}

Expand Down Expand Up @@ -531,6 +525,33 @@ public class S3FileTransferManager {
}
}

/// Resume download from S3 that previously failed
///
/// When a copy or sync to file system operation fails it will throw a
/// S3TransferManager.Error.downloadFailed error. This contains a `DownloadOperation`.
/// struct. You can resume the download by passing the struct to the this function.
///
/// - Parameters:
/// - download: Details of remaining downloads to perform
/// - options: Download options
/// - progress: Progress function
/// - Returns: EventLoopFuture fulfilled when operation is complete
public func resume(
download: DownloadOperation,
options: GetOptions = .init(),
progress: @escaping (Double) throws -> Void = { _ in }
) -> EventLoopFuture<Void> {
let eventLoop = self.s3.eventLoopGroup.next()

let taskQueue = TaskQueue<Void>(maxConcurrentTasks: self.configuration.maxConcurrentTasks, on: eventLoop)
return self.copy(
transfers: download.transfers,
taskQueue: taskQueue,
options: options,
progress: progress
)
}

/// delete a file on S3
public func delete(_ s3File: S3File) -> EventLoopFuture<Void> {
self.logger.info("Deleting \(s3File)")
Expand All @@ -550,13 +571,13 @@ public class S3FileTransferManager {
}

extension S3FileTransferManager {
struct FileDescriptor {
struct FileDescriptor: Equatable {
let name: String
let modificationDate: Date
let size: Int
}

struct S3FileDescriptor {
struct S3FileDescriptor: Equatable {
let file: S3File
let modificationDate: Date
let size: Int
Expand Down Expand Up @@ -645,6 +666,39 @@ extension S3FileTransferManager {
}
}

/// Internal version of sync, with folder progress and task queue setup
func copy(
transfers: [(from: S3FileDescriptor, to: String)],
taskQueue: TaskQueue<Void>,
options: GetOptions,
progress: @escaping (Double) throws -> Void
) -> EventLoopFuture<Void> {
var failedTransfers: [(from: S3FileDescriptor, to: String)] = []
let lock = NIOLock()
let folderProgress = FolderUploadProgress(transfers.map { $0.from }, progress: progress)
let transfersComplete = transfers.map { transfer in
taskQueue.submitTask {
self.copy(from: transfer.from.file, to: transfer.to, options: options) {
try folderProgress.updateProgress(transfer.from.file.key, progress: $0)
}.map { _ in
folderProgress.setFileUploaded(transfer.from.file.key)
}
}.flatMapErrorThrowing { error in
lock.withLock {
failedTransfers.append(transfer)
}
throw error
}
}
return self.complete(taskQueue: taskQueue).map { _ in
assert(folderProgress.finished == true)
}.flatMapError { error in
return EventLoopFuture.andAllComplete(transfersComplete, on: taskQueue.eventLoop).flatMapThrowing {
throw Error.downloadFailed(error, .init(transfers: failedTransfers))
}
}
}

/// delete a local file
func delete(_ file: String) -> EventLoopFuture<Void> {
self.logger.info("Deleting \(file)")
Expand Down
13 changes: 8 additions & 5 deletions Sources/SotoS3FileTransfer/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ class TaskQueue<Value> {
/// Cancel all tasks in the queue and return EventLoopFuture that succeeds when all in flight tasks have completed.
func cancel() -> EventLoopFuture<Void> {
self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
self.clearQueue()
return EventLoopFuture.whenAllComplete(self.inflightTasks.map { $0.promise.futureResult }, on: self.eventLoop)
.map { _ in }
.always { _ in }
return self.clearQueue().flatMap { _ in
return EventLoopFuture.andAllComplete(self.inflightTasks.map { $0.promise.futureResult }, on: self.eventLoop)
.always { _ in }
}
}
}

Expand Down Expand Up @@ -113,10 +113,13 @@ class TaskQueue<Value> {
}
}

private func clearQueue() {
private func clearQueue() -> EventLoopFuture<Void> {
self.eventLoop.preconditionInEventLoop()
let eventLoopFutures = self.queue.map { $0.promise.futureResult }
while let task = self.queue.popFirst() {
task.promise.fail(Cancelled())
}
// return once all tasks are failed
return EventLoopFuture.andAllComplete(eventLoopFutures, on: self.eventLoop)
}
}
Loading