From 09b81c50b4811a22d05c634c7bdeb0c0ade94f08 Mon Sep 17 00:00:00 2001 From: Yan Song Date: Thu, 23 Jan 2025 07:45:24 +0000 Subject: [PATCH] nydusify: fix layer push retry for copy subcommand Add push retry mechanism, enhance the success rate for image copy when a single layer copy failure. Signed-off-by: Yan Song --- contrib/nydusify/go.mod | 2 +- contrib/nydusify/go.sum | 4 +- contrib/nydusify/pkg/backend/backend.go | 2 + contrib/nydusify/pkg/backend/oss.go | 15 +++ contrib/nydusify/pkg/backend/registry.go | 5 + contrib/nydusify/pkg/backend/s3.go | 20 ++++ contrib/nydusify/pkg/copier/copier.go | 107 +++++++++++++++------ contrib/nydusify/pkg/packer/pusher_test.go | 5 + 8 files changed, 127 insertions(+), 33 deletions(-) diff --git a/contrib/nydusify/go.mod b/contrib/nydusify/go.mod index cfb76bbf287..a0999a0c41d 100644 --- a/contrib/nydusify/go.mod +++ b/contrib/nydusify/go.mod @@ -123,4 +123,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/containerd/containerd => github.com/nydusaccelerator/containerd v0.0.0-20240605070649-62e0d4d66f9f +replace github.com/containerd/containerd => github.com/nydusaccelerator/containerd v1.7.18-nydus.10 diff --git a/contrib/nydusify/go.sum b/contrib/nydusify/go.sum index 07b67fac1de..01fe1f56b63 100644 --- a/contrib/nydusify/go.sum +++ b/contrib/nydusify/go.sum @@ -199,8 +199,8 @@ github.com/moby/sys/signal v0.7.0 h1:25RW3d5TnQEoKvRbEKUGay6DCQ46IxAVTT9CUMgmsSI github.com/moby/sys/signal v0.7.0/go.mod h1:GQ6ObYZfqacOwTtlXvcmh9A26dVRul/hbOZn88Kg8Tg= github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg= github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU= -github.com/nydusaccelerator/containerd v0.0.0-20240605070649-62e0d4d66f9f h1:jbWfZohlnnbKXcYykpfw0VT8baJpI90sWg0hxvD596g= -github.com/nydusaccelerator/containerd v0.0.0-20240605070649-62e0d4d66f9f/go.mod h1:IYEk9/IO6wAPUz2bCMVUbsfXjzw5UNP5fLz4PsUygQ4= +github.com/nydusaccelerator/containerd v1.7.18-nydus.10 h1:ir28uQOPtYtFP+gry7sbiwaOHUISC1viPeogTDTff+Q= +github.com/nydusaccelerator/containerd v1.7.18-nydus.10/go.mod h1:IYEk9/IO6wAPUz2bCMVUbsfXjzw5UNP5fLz4PsUygQ4= github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= diff --git a/contrib/nydusify/pkg/backend/backend.go b/contrib/nydusify/pkg/backend/backend.go index 2a7dc0d32c8..eff11271fd8 100644 --- a/contrib/nydusify/pkg/backend/backend.go +++ b/contrib/nydusify/pkg/backend/backend.go @@ -9,6 +9,7 @@ import ( "fmt" "io" + "github.com/containerd/containerd/remotes" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/remote" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/utils" "github.com/opencontainers/go-digest" @@ -27,6 +28,7 @@ type Backend interface { Check(blobID string) (bool, error) Type() Type Reader(blobID string) (io.ReadCloser, error) + RangeReader(blobID string) (remotes.RangeReadCloser, error) Size(blobID string) (int64, error) } diff --git a/contrib/nydusify/pkg/backend/oss.go b/contrib/nydusify/pkg/backend/oss.go index 1c02d9099d0..accd6546abc 100644 --- a/contrib/nydusify/pkg/backend/oss.go +++ b/contrib/nydusify/pkg/backend/oss.go @@ -17,6 +17,7 @@ import ( "time" "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/containerd/containerd/remotes" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -259,6 +260,20 @@ func (b *OSSBackend) Type() Type { return OssBackend } +type RangeReader struct { + b *OSSBackend + blobID string +} + +func (rr *RangeReader) Reader(offset int64, size int64) (io.ReadCloser, error) { + return rr.b.bucket.GetObject(rr.blobID, oss.Range(offset, offset+size-1)) +} + +func (b *OSSBackend) RangeReader(blobID string) (remotes.RangeReadCloser, error) { + blobID = b.objectPrefix + blobID + return &RangeReader{b: b, blobID: blobID}, nil +} + func (b *OSSBackend) Reader(blobID string) (io.ReadCloser, error) { blobID = b.objectPrefix + blobID rc, err := b.bucket.GetObject(blobID) diff --git a/contrib/nydusify/pkg/backend/registry.go b/contrib/nydusify/pkg/backend/registry.go index 7853fe5a47c..54bc3d35e84 100644 --- a/contrib/nydusify/pkg/backend/registry.go +++ b/contrib/nydusify/pkg/backend/registry.go @@ -5,6 +5,7 @@ import ( "io" "os" + "github.com/containerd/containerd/remotes" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/remote" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -47,6 +48,10 @@ func (r *Registry) Type() Type { return RegistryBackend } +func (r *Registry) RangeReader(_ string) (remotes.RangeReadCloser, error) { + panic("not implemented") +} + func (r *Registry) Reader(_ string) (io.ReadCloser, error) { panic("not implemented") } diff --git a/contrib/nydusify/pkg/backend/s3.go b/contrib/nydusify/pkg/backend/s3.go index e91af7750e1..91ff837082d 100644 --- a/contrib/nydusify/pkg/backend/s3.go +++ b/contrib/nydusify/pkg/backend/s3.go @@ -22,6 +22,7 @@ import ( "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/containerd/containerd/remotes" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -160,6 +161,25 @@ func (b *S3Backend) blobObjectKey(blobID string) string { return b.objectPrefix + blobID } +type rangeReader struct { + b *S3Backend + objectKey string +} + +func (rr *rangeReader) Reader(offset int64, size int64) (io.ReadCloser, error) { + output, err := rr.b.client.GetObject(context.TODO(), &s3.GetObjectInput{ + Bucket: &rr.b.bucketName, + Key: &rr.objectKey, + Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)), + }) + return output.Body, err +} + +func (b *S3Backend) RangeReader(blobID string) (remotes.RangeReadCloser, error) { + objectKey := b.blobObjectKey(blobID) + return &rangeReader{b: b, objectKey: objectKey}, nil +} + func (b *S3Backend) Reader(blobID string) (io.ReadCloser, error) { objectKey := b.blobObjectKey(blobID) output, err := b.client.GetObject(context.TODO(), &s3.GetObjectInput{ diff --git a/contrib/nydusify/pkg/copier/copier.go b/contrib/nydusify/pkg/copier/copier.go index b684629558e..ea89b1667ea 100644 --- a/contrib/nydusify/pkg/copier/copier.go +++ b/contrib/nydusify/pkg/copier/copier.go @@ -15,11 +15,12 @@ import ( "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" - containerdErrdefs "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/reference/docker" + "github.com/containerd/containerd/remotes" + containerdErrdefs "github.com/containerd/errdefs" "github.com/containerd/nydus-snapshotter/pkg/converter" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/backend" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/checker/tool" @@ -66,6 +67,23 @@ type output struct { Blobs []string } +func withRetry(handle func() error, total int) error { + for { + total-- + err := handle() + if err == nil { + return nil + } + + if total > 0 && !errors.Is(err, context.Canceled) { + logrus.WithError(err).Warnf("retry (remain %d times)", total) + continue + } + + return err + } +} + func hosts(opt Opt) remote.HostFunc { maps := map[string]bool{ opt.Source: opt.SourceInsecure, @@ -76,7 +94,7 @@ func hosts(opt Opt) remote.HostFunc { } } -func getPushWriter(ctx context.Context, pvd *provider.Provider, desc ocispec.Descriptor, opt Opt) (content.Writer, error) { +func getPusherInChunked(ctx context.Context, pvd *provider.Provider, desc ocispec.Descriptor, opt Opt) (remotes.PusherInChunked, error) { resolver, err := pvd.Resolver(opt.Target) if err != nil { return nil, errors.Wrap(err, "get resolver") @@ -85,18 +103,13 @@ func getPushWriter(ctx context.Context, pvd *provider.Provider, desc ocispec.Des if !strings.Contains(ref, "@") { ref = ref + "@" + desc.Digest.String() } - pusher, err := resolver.Pusher(ctx, ref) - if err != nil { - return nil, errors.Wrap(err, "create pusher") - } - writer, err := pusher.Push(ctx, desc) + + pusherInChunked, err := resolver.PusherInChunked(ctx, ref) if err != nil { - if containerdErrdefs.IsAlreadyExists(err) { - return nil, nil - } - return nil, err + return nil, errors.Wrap(err, "create pusher in chunked") } - return writer, nil + + return pusherInChunked, nil } func pushBlobFromBackend( @@ -167,11 +180,6 @@ func pushBlobFromBackend( blobSizeStr := humanize.Bytes(uint64(blobSize)) logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushing blob from backend") - rc, err := backend.Reader(blobID) - if err != nil { - return errors.Wrap(err, "get blob reader") - } - defer rc.Close() blobDescs[idx] = ocispec.Descriptor{ Digest: blobDigest, Size: blobSize, @@ -180,22 +188,61 @@ func pushBlobFromBackend( converter.LayerAnnotationNydusBlob: "true", }, } - writer, err := getPushWriter(ctx, pvd, blobDescs[idx], opt) - if err != nil { - if errdefs.NeedsRetryWithHTTP(err) { - pvd.UsePlainHTTP() - writer, err = getPushWriter(ctx, pvd, blobDescs[idx], opt) - } + + if err := withRetry(func() error { + pusher, err := getPusherInChunked(ctx, pvd, blobDescs[idx], opt) if err != nil { - return errors.Wrap(err, "get push writer") + if errdefs.NeedsRetryWithHTTP(err) { + pvd.UsePlainHTTP() + pusher, err = getPusherInChunked(ctx, pvd, blobDescs[idx], opt) + } + if err != nil { + return errors.Wrapf(err, "get push writer: %s", blobDigest) + } + } + + push := func() error { + if blobSize > opt.PushChunkSize { + rr, err := backend.RangeReader(blobID) + if err != nil { + return errors.Wrapf(err, "get push reader: %s", blobDigest) + } + if err := pusher.PushInChunked(ctx, blobDescs[idx], rr); err != nil { + return errors.Wrapf(err, "push blob in chunked: %s", blobDigest) + } + } else { + rc, err := backend.Reader(blobID) + if err != nil { + return errors.Wrap(err, "get blob reader") + } + defer rc.Close() + writer, err := pusher.Push(ctx, blobDescs[idx]) + if err != nil { + return errors.Wrapf(err, "get push writer: %s", blobDigest) + } + if writer != nil { + defer writer.Close() + if err := content.Copy(ctx, writer, rc, blobSize, blobDigest); err != nil { + return errors.Wrapf(err, "push blob: %s", blobDigest) + } + } + } + return nil } - } - if writer != nil { - defer writer.Close() - return content.Copy(ctx, writer, rc, blobSize, blobDigest) - } - logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushed blob from backend") + if err := push(); err != nil { + if containerdErrdefs.IsAlreadyExists(err) { + logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushed blob from backend (exists)") + return nil + } + return errors.Wrapf(err, "copy blob content: %s", blobDigest) + } + logrus.WithField("digest", blobDigest).WithField("size", blobSizeStr).Infof("pushed blob from backend") + + return nil + }, 3); err != nil { + return errors.Wrapf(err, "push blob: %s", blobDigest) + } return nil }) diff --git a/contrib/nydusify/pkg/packer/pusher_test.go b/contrib/nydusify/pkg/packer/pusher_test.go index 6bf6eb6ca7b..b53e470837d 100644 --- a/contrib/nydusify/pkg/packer/pusher_test.go +++ b/contrib/nydusify/pkg/packer/pusher_test.go @@ -11,6 +11,7 @@ import ( "path/filepath" "testing" + "github.com/containerd/containerd/remotes" "github.com/dragonflyoss/nydus/contrib/nydusify/pkg/backend" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" @@ -44,6 +45,10 @@ func (m *mockBackend) Reader(_ string) (io.ReadCloser, error) { panic("not implemented") } +func (m *mockBackend) RangeReader(_ string) (remotes.RangeReadCloser, error) { + panic("not implemented") +} + func (m *mockBackend) Size(_ string) (int64, error) { panic("not implemented") }