Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework ArchiveAsync interface to make it return any archiving errors #369

Merged
merged 2 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions formats.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ func (caf CompressedArchive) Archive(ctx context.Context, output io.Writer, file
return caf.Archival.Archive(ctx, output, files)
}

// ArchiveAsync adds files to the output archive while compressing the result asynchronously.
func (caf CompressedArchive) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error {
do, ok := caf.Archival.(ArchiverAsync)
if !ok {
return fmt.Errorf("%s archive does not support async writing", caf.Name())
}
if caf.Compression != nil {
wc, err := caf.Compression.OpenWriter(output)
if err != nil {
return err
}
defer wc.Close()
output = wc
}
return do.ArchiveAsync(ctx, output, jobs)
}

// Extract reads files out of an archive while decompressing the results.
func (caf CompressedArchive) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchive []string, handleFile FileHandler) error {
if caf.Compression != nil {
Expand Down Expand Up @@ -358,7 +375,8 @@ var formats = make(map[string]Format)

// Interface guards
var (
_ Format = (*CompressedArchive)(nil)
_ Archiver = (*CompressedArchive)(nil)
_ Extractor = (*CompressedArchive)(nil)
_ Format = (*CompressedArchive)(nil)
_ Archiver = (*CompressedArchive)(nil)
_ ArchiverAsync = (*CompressedArchive)(nil)
_ Extractor = (*CompressedArchive)(nil)
)
13 changes: 11 additions & 2 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,25 @@ type Archiver interface {
Archive(ctx context.Context, output io.Writer, files []File) error
}

// ArchiveAsyncJob contains a File to be archived and a channel that
// the result of the archiving should be returned on.
type ArchiveAsyncJob struct {
File File
Result chan<- error
}

// ArchiverAsync is an Archiver that can also create archives
// asynchronously by pumping files into a channel as they are
// discovered.
type ArchiverAsync interface {
Archiver

// Use ArchiveAsync if you can't pre-assemble a list of all
// the files for the archive. Close the files channel after
// the files for the archive. Close the jobs channel after
// all the files have been sent.
ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error
//
// This won't return until the channel is closed.
ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error
}

// Extractor can extract files from an archive.
Expand Down
19 changes: 7 additions & 12 deletions tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,12 @@ func (t Tar) Archive(ctx context.Context, output io.Writer, files []File) error
return nil
}

func (t Tar) ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error {
func (t Tar) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error {
tw := tar.NewWriter(output)
defer tw.Close()

for file := range files {
if err := t.writeFileToArchive(ctx, tw, file); err != nil {
if t.ContinueOnError && ctx.Err() == nil { // context errors should always abort
log.Printf("[ERROR] %v", err)
continue
}
return err
}
for job := range jobs {
job.Result <- t.writeFileToArchive(ctx, tw, job.File)
}

return nil
Expand Down Expand Up @@ -234,7 +228,8 @@ func (t Tar) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchiv

// Interface guards
var (
_ Archiver = (*Tar)(nil)
_ Extractor = (*Tar)(nil)
_ Inserter = (*Tar)(nil)
_ Archiver = (*Tar)(nil)
_ ArchiverAsync = (*Tar)(nil)
_ Extractor = (*Tar)(nil)
_ Inserter = (*Tar)(nil)
)
21 changes: 11 additions & 10 deletions zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,13 @@ func (z Zip) Archive(ctx context.Context, output io.Writer, files []File) error
return nil
}

func (z Zip) ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error {
func (z Zip) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error {
zw := zip.NewWriter(output)
defer zw.Close()

var i int
for file := range files {
if err := z.archiveOneFile(ctx, zw, i, file); err != nil {
if z.ContinueOnError && ctx.Err() == nil { // context errors should always abort
log.Printf("[ERROR] %v", err)
continue
}
return err
}
for job := range jobs {
job.Result <- z.archiveOneFile(ctx, zw, i, job.File)
i++
}

Expand Down Expand Up @@ -202,7 +196,7 @@ func (z Zip) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchiv
skipDirs := skipList{}

for i, f := range zr.File {
f := f // make a copy for the Open closure
f := f // make a copy for the Open closure
if err := ctx.Err(); err != nil {
return err // honor context cancellation
}
Expand Down Expand Up @@ -383,3 +377,10 @@ func decodeText(input, charset string) (string, error) {
}

var zipHeader = []byte("PK\x03\x04") // NOTE: headers of empty zip files might end with 0x05,0x06 or 0x06,0x06 instead of 0x03,0x04

// Interface guards
var (
_ Archiver = Zip{}
_ ArchiverAsync = Zip{}
_ Extractor = Zip{}
)