diff --git a/cache/remotecache/s3/io.go b/cache/remotecache/s3/io.go new file mode 100644 index 0000000000000..cfaa259d5f67d --- /dev/null +++ b/cache/remotecache/s3/io.go @@ -0,0 +1,59 @@ +package s3 + +import ( + "io" + "os" + + "github.com/containerd/containerd/content" +) + +func ReadSeekerFromReaderAt(ra content.ReaderAt) io.ReadSeeker { + return &readerAtSeeker{ + ReaderAt: ra, + } +} + +type readerAtSeeker struct { + pos int64 + content.ReaderAt +} + +var _ io.ReadSeeker = &readerAtSeeker{} + +// Read according to offset position +// Read reads up to len(p) bytes into p. It returns the number of bytes +// read (0 <= n <= len(p)) and any error encountered. +func (r *readerAtSeeker) Read(p []byte) (n int, err error) { + // Delegate to ReadAt, using current position + n, err = r.ReadAt(p, r.pos) + if err == nil { + // Move the position forward + r.pos += int64(n) + } + return n, err +} + +// Seek sets the offset for the next Read, interpreted according to whence: +// io.SeekStart means relative to the origin of the file, io.SeekCurrent means +// relative to the current offset, and io.SeekEnd means relative to the EOF. +func (r *readerAtSeeker) Seek(offset int64, whence int) (int64, error) { + var newPos int64 + + switch whence { + case io.SeekStart: + newPos = offset + case io.SeekCurrent: + newPos = r.pos + offset + case io.SeekEnd: + newPos = r.Size() + offset + default: + return 0, os.ErrInvalid + } + + if newPos < 0 || newPos > r.Size() { + return 0, os.ErrInvalid + } + + r.pos = newPos + return r.pos, nil +} diff --git a/cache/remotecache/s3/s3.go b/cache/remotecache/s3/s3.go index 9c3d33394d8f3..f809ed487dc8b 100644 --- a/cache/remotecache/s3/s3.go +++ b/cache/remotecache/s3/s3.go @@ -208,15 +208,23 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) { } } } else { - layerDone := progress.OneOff(ctx, fmt.Sprintf("writing layer %s", l.Blob)) - dt, err := content.ReadBlob(ctx, dgstPair.Provider, dgstPair.Descriptor) - if err != nil { - return nil, layerDone(err) - } - if err := e.s3Client.saveMutable(ctx, key, dt); err != nil { - return nil, layerDone(errors.Wrap(err, "error writing layer blob")) + layerDone := progress.OneOff(ctx, fmt.Sprintf("writing layer %s, %s", codenam.Ize(l.Blob.String()), l.Blob)) + if int64(len(dgstPair.Descriptor.Data)) == dgstPair.Descriptor.Size && digest.FromBytes(dgstPair.Descriptor.Data) == dgstPair.Descriptor.Digest { + if err := e.s3Client.saveMutable(ctx, key, dgstPair.Descriptor.Data); err != nil { + return nil, layerDone(errors.Wrap(err, "error writing layer blob")) + } + layerDone(nil) + } else { + ra, err := dgstPair.Provider.ReaderAt(ctx, dgstPair.Descriptor) + if err != nil { + return nil, layerDone(errors.Wrap(err, "error reading layer blob from provider")) + } + defer ra.Close() + if err := e.s3Client.saveMutableAt(ctx, key, ra); err != nil { + return nil, layerDone(errors.Wrap(err, "error writing layer blob")) + } + layerDone(nil) } - layerDone(nil) } la := &v1.LayerAnnotations{ @@ -425,6 +433,17 @@ func (s3Client *s3Client) saveMutable(ctx context.Context, key string, value []b return err } + +func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body content.ReaderAt) error { + input := &s3.PutObjectInput{ + Bucket: &s3Client.bucket, + Key: &key, + Body: ReadSeekerFromReaderAt(body), + } + _, err := s3Client.Upload(ctx, input) + return err +} + func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, error) { input := &s3.HeadObjectInput{ Bucket: &s3Client.bucket,