From fedac4a0eca44569c9a9191a597c1c9baf853575 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 3 Feb 2023 08:33:21 +0000 Subject: [PATCH 1/2] Rework ArchiveAsync interface to make it return any archiving errors After experience with the current ArchiveAsync interface while creating an rclone backend it became clear that it wasn't quite right. With the old interface, callers did not know when the File had been archived and thus did not know when to release resources associated with that file. This patch changes the interface so that it returns the error archiving each File. This means the caller can know when the File has been archived and when to release resources. It returns the error for each file archived which is useful too. Fixes #368 --- interfaces.go | 13 +++++++++++-- tar.go | 19 +++++++------------ zip.go | 21 +++++++++++---------- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/interfaces.go b/interfaces.go index 782cae64..bfc53163 100644 --- a/interfaces.go +++ b/interfaces.go @@ -60,6 +60,13 @@ type Archiver interface { Archive(ctx context.Context, output io.Writer, files []File) error } +// ArchiveAsyncJob contains a File to be archived and a channel that +// the result of the archiving should be returned on. +type ArchiveAsyncJob struct { + File File + Result chan<- error +} + // ArchiverAsync is an Archiver that can also create archives // asynchronously by pumping files into a channel as they are // discovered. @@ -67,9 +74,11 @@ type ArchiverAsync interface { Archiver // Use ArchiveAsync if you can't pre-assemble a list of all - // the files for the archive. Close the files channel after + // the files for the archive. Close the jobs channel after // all the files have been sent. - ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error + // + // This won't return until the channel is closed. + ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error } // Extractor can extract files from an archive. diff --git a/tar.go b/tar.go index 81f958b1..ce719695 100644 --- a/tar.go +++ b/tar.go @@ -60,18 +60,12 @@ func (t Tar) Archive(ctx context.Context, output io.Writer, files []File) error return nil } -func (t Tar) ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error { +func (t Tar) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error { tw := tar.NewWriter(output) defer tw.Close() - for file := range files { - if err := t.writeFileToArchive(ctx, tw, file); err != nil { - if t.ContinueOnError && ctx.Err() == nil { // context errors should always abort - log.Printf("[ERROR] %v", err) - continue - } - return err - } + for job := range jobs { + job.Result <- t.writeFileToArchive(ctx, tw, job.File) } return nil @@ -234,7 +228,8 @@ func (t Tar) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchiv // Interface guards var ( - _ Archiver = (*Tar)(nil) - _ Extractor = (*Tar)(nil) - _ Inserter = (*Tar)(nil) + _ Archiver = (*Tar)(nil) + _ ArchiverAsync = (*Tar)(nil) + _ Extractor = (*Tar)(nil) + _ Inserter = (*Tar)(nil) ) diff --git a/zip.go b/zip.go index 30e7b5b1..421fe6ec 100644 --- a/zip.go +++ b/zip.go @@ -114,19 +114,13 @@ func (z Zip) Archive(ctx context.Context, output io.Writer, files []File) error return nil } -func (z Zip) ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error { +func (z Zip) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error { zw := zip.NewWriter(output) defer zw.Close() var i int - for file := range files { - if err := z.archiveOneFile(ctx, zw, i, file); err != nil { - if z.ContinueOnError && ctx.Err() == nil { // context errors should always abort - log.Printf("[ERROR] %v", err) - continue - } - return err - } + for job := range jobs { + job.Result <- z.archiveOneFile(ctx, zw, i, job.File) i++ } @@ -202,7 +196,7 @@ func (z Zip) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchiv skipDirs := skipList{} for i, f := range zr.File { - f := f // make a copy for the Open closure + f := f // make a copy for the Open closure if err := ctx.Err(); err != nil { return err // honor context cancellation } @@ -383,3 +377,10 @@ func decodeText(input, charset string) (string, error) { } var zipHeader = []byte("PK\x03\x04") // NOTE: headers of empty zip files might end with 0x05,0x06 or 0x06,0x06 instead of 0x03,0x04 + +// Interface guards +var ( + _ Archiver = Zip{} + _ ArchiverAsync = Zip{} + _ Extractor = Zip{} +) From ef94c8d473a7a7fac429004031442d6dbabe7bbc Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 3 Feb 2023 10:28:30 +0000 Subject: [PATCH 2/2] Add ArchiveAsync support to CompressedArchive --- formats.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/formats.go b/formats.go index e59c9c93..db22811b 100644 --- a/formats.go +++ b/formats.go @@ -235,6 +235,23 @@ func (caf CompressedArchive) Archive(ctx context.Context, output io.Writer, file return caf.Archival.Archive(ctx, output, files) } +// ArchiveAsync adds files to the output archive while compressing the result asynchronously. +func (caf CompressedArchive) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error { + do, ok := caf.Archival.(ArchiverAsync) + if !ok { + return fmt.Errorf("%s archive does not support async writing", caf.Name()) + } + if caf.Compression != nil { + wc, err := caf.Compression.OpenWriter(output) + if err != nil { + return err + } + defer wc.Close() + output = wc + } + return do.ArchiveAsync(ctx, output, jobs) +} + // Extract reads files out of an archive while decompressing the results. func (caf CompressedArchive) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchive []string, handleFile FileHandler) error { if caf.Compression != nil { @@ -358,7 +375,8 @@ var formats = make(map[string]Format) // Interface guards var ( - _ Format = (*CompressedArchive)(nil) - _ Archiver = (*CompressedArchive)(nil) - _ Extractor = (*CompressedArchive)(nil) + _ Format = (*CompressedArchive)(nil) + _ Archiver = (*CompressedArchive)(nil) + _ ArchiverAsync = (*CompressedArchive)(nil) + _ Extractor = (*CompressedArchive)(nil) )