Skip to content

Commit

Permalink
stream a layer to s3 if possible, instead of getting it then sending it
Browse files Browse the repository at this point in the history
  • Loading branch information
azr committed Jan 12, 2024
1 parent 8a7e5c7 commit 70db2a6
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 8 deletions.
59 changes: 59 additions & 0 deletions cache/remotecache/s3/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package s3

import (
"io"
"os"

"github.com/containerd/containerd/content"
)

func ReadSeekerFromReaderAt(ra content.ReaderAt) io.ReadSeeker {
return &readerAtSeeker{
ReaderAt: ra,
}
}

type readerAtSeeker struct {
pos int64
content.ReaderAt
}

var _ io.ReadSeeker = &readerAtSeeker{}

// Read according to offset position
// Read reads up to len(p) bytes into p. It returns the number of bytes
// read (0 <= n <= len(p)) and any error encountered.
func (r *readerAtSeeker) Read(p []byte) (n int, err error) {
// Delegate to ReadAt, using current position
n, err = r.ReadAt(p, r.pos)
if err == nil {
// Move the position forward
r.pos += int64(n)
}
return n, err
}

// Seek sets the offset for the next Read, interpreted according to whence:
// io.SeekStart means relative to the origin of the file, io.SeekCurrent means
// relative to the current offset, and io.SeekEnd means relative to the EOF.
func (r *readerAtSeeker) Seek(offset int64, whence int) (int64, error) {
var newPos int64

switch whence {
case io.SeekStart:
newPos = offset
case io.SeekCurrent:
newPos = r.pos + offset
case io.SeekEnd:
newPos = r.Size() + offset
default:
return 0, os.ErrInvalid
}

if newPos < 0 || newPos > r.Size() {
return 0, os.ErrInvalid
}

r.pos = newPos
return r.pos, nil
}
35 changes: 27 additions & 8 deletions cache/remotecache/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,23 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
}
}
} else {
layerDone := progress.OneOff(ctx, fmt.Sprintf("writing layer %s", l.Blob))
dt, err := content.ReadBlob(ctx, dgstPair.Provider, dgstPair.Descriptor)
if err != nil {
return nil, layerDone(err)
}
if err := e.s3Client.saveMutable(ctx, key, dt); err != nil {
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
layerDone := progress.OneOff(ctx, fmt.Sprintf("writing layer %s, %s", codenam.Ize(l.Blob.String()), l.Blob))
if int64(len(dgstPair.Descriptor.Data)) == dgstPair.Descriptor.Size && digest.FromBytes(dgstPair.Descriptor.Data) == dgstPair.Descriptor.Digest {
if err := e.s3Client.saveMutable(ctx, key, dgstPair.Descriptor.Data); err != nil {
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
}
layerDone(nil)
} else {
ra, err := dgstPair.Provider.ReaderAt(ctx, dgstPair.Descriptor)
if err != nil {
return nil, layerDone(errors.Wrap(err, "error reading layer blob from provider"))
}
defer ra.Close()
if err := e.s3Client.saveMutableAt(ctx, key, ra); err != nil {
return nil, layerDone(errors.Wrap(err, "error writing layer blob"))
}
layerDone(nil)
}
layerDone(nil)
}

la := &v1.LayerAnnotations{
Expand Down Expand Up @@ -425,6 +433,17 @@ func (s3Client *s3Client) saveMutable(ctx context.Context, key string, value []b
return err
}


func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body content.ReaderAt) error {
input := &s3.PutObjectInput{
Bucket: &s3Client.bucket,
Key: &key,
Body: ReadSeekerFromReaderAt(body),
}
_, err := s3Client.Upload(ctx, input)
return err
}

func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, error) {
input := &s3.HeadObjectInput{
Bucket: &s3Client.bucket,
Expand Down

0 comments on commit 70db2a6

Please sign in to comment.