From b83ed67895cce03a15123a0da573a3158ff1d113 Mon Sep 17 00:00:00 2001 From: Maxim Vasilenko Date: Wed, 20 Nov 2024 12:47:31 +0300 Subject: [PATCH] feat: add context usage to libmirror push operations Signed-off-by: Maxim Vasilenko --- pkg/libmirror/layouts/pull.go | 5 +- pkg/libmirror/layouts/push.go | 93 ++++++++++++------- pkg/libmirror/operations/push.go | 33 +++++-- pkg/libmirror/util/retry/retry.go | 13 ++- pkg/libmirror/util/retry/retry_test.go | 13 ++- .../util/retry/task/constant_interval.go | 13 ++- 6 files changed, 115 insertions(+), 55 deletions(-) diff --git a/pkg/libmirror/layouts/pull.go b/pkg/libmirror/layouts/pull.go index 59f9f70..5772a76 100644 --- a/pkg/libmirror/layouts/pull.go +++ b/pkg/libmirror/layouts/pull.go @@ -17,6 +17,7 @@ limitations under the License. package layouts import ( + "context" "fmt" "path" "strings" @@ -184,8 +185,8 @@ func PullImageSet( err = retry.RunTask( pullCtx.Logger, fmt.Sprintf("[%d / %d] Pulling %s ", pullCount, totalCount, imageReferenceString), - task.WithConstantRetries(5, 10*time.Second, func() error { - img, err := remote.Image(ref, remoteOpts...) + task.WithConstantRetries(5, 10*time.Second, func(ctx context.Context) error { + img, err := remote.Image(ref, append(remoteOpts, remote.WithContext(ctx))...) if err != nil { if errorutil.IsImageNotFoundError(err) && pullOpts.allowMissingTags { pullCtx.Logger.WarnLn("⚠️ Not found in registry, skipping pull") diff --git a/pkg/libmirror/layouts/push.go b/pkg/libmirror/layouts/push.go index 74ada68..0448bb3 100644 --- a/pkg/libmirror/layouts/push.go +++ b/pkg/libmirror/layouts/push.go @@ -17,9 +17,10 @@ limitations under the License. package layouts import ( + "context" "errors" "fmt" - "os" + "sync" "time" "github.com/google/go-containerregistry/pkg/authn" @@ -27,6 +28,7 @@ import ( v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/layout" "github.com/google/go-containerregistry/pkg/v1/remote" + "github.com/hashicorp/go-multierror" "github.com/samber/lo" "github.com/samber/lo/parallel" @@ -46,6 +48,27 @@ func PushLayoutToRepo( logger contexts.Logger, parallelismConfig contexts.ParallelismConfig, insecure, skipVerifyTLS bool, +) error { + return PushLayoutToRepoContext( + context.Background(), + imagesLayout, + registryRepo, + authProvider, + logger, + parallelismConfig, + insecure, + skipVerifyTLS, + ) +} + +func PushLayoutToRepoContext( + ctx context.Context, + imagesLayout layout.Path, + registryRepo string, + authProvider authn.Authenticator, + logger contexts.Logger, + parallelismConfig contexts.ParallelismConfig, + insecure, skipVerifyTLS bool, ) error { refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS) if parallelismConfig.Blobs != 0 { @@ -73,7 +96,9 @@ func PushLayoutToRepo( tag := manifestSet[0].Annotations["io.deckhouse.image.short_tag"] imageRef := registryRepo + ":" + tag logger.InfoF("[%d / %d] Pushing image %s", imagesCount, len(indexManifest.Manifests), imageRef) - pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts)(manifestSet[0], 0) + if err = pushImage(ctx, registryRepo, index, manifestSet[0], refOpts, remoteOpts); err != nil { + return fmt.Errorf("Push Image: %w", err) + } imagesCount += 1 continue } @@ -84,59 +109,61 @@ func PushLayoutToRepo( logger.InfoF("- %s", registryRepo+":"+manifest.Annotations["io.deckhouse.image.short_tag"]) } - parallel.ForEach(manifestSet, pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts)) + errMu := &sync.Mutex{} + merr := &multierror.Error{} + parallel.ForEach(manifestSet, func(item v1.Descriptor, i int) { + if err = pushImage(ctx, registryRepo, index, item, refOpts, remoteOpts); err != nil { + errMu.Lock() + defer errMu.Unlock() + merr = multierror.Append(merr, err) + } + }) - return nil + return merr.ErrorOrNil() }) if err != nil { return fmt.Errorf("Push batch of images: %w", err) } - batchesCount += 1 - imagesCount += len(manifestSet) } return nil } func pushImage( - logger contexts.Logger, + ctx context.Context, registryRepo string, index v1.ImageIndex, - imagesCount int, + manifest v1.Descriptor, refOpts []name.Option, remoteOpts []remote.Option, -) func(v1.Descriptor, int) { - return func(manifest v1.Descriptor, _ int) { - tag := manifest.Annotations["io.deckhouse.image.short_tag"] - imageRef := registryRepo + ":" + tag - img, err := index.Image(manifest.Digest) - if err != nil { - logger.WarnF("Read image: %v", err) - os.Exit(1) - } - ref, err := name.ParseReference(imageRef, refOpts...) - if err != nil { - logger.WarnF("Parse image reference: %v", err) - os.Exit(1) - } +) error { + tag := manifest.Annotations["io.deckhouse.image.short_tag"] + imageRef := registryRepo + ":" + tag + img, err := index.Image(manifest.Digest) + if err != nil { + return fmt.Errorf("Read image: %v", err) + } + ref, err := name.ParseReference(imageRef, refOpts...) + if err != nil { + return fmt.Errorf("Parse image reference: %v", err) + } - err = retry.RunTask(silentLogger{}, "", task.WithConstantRetries(19, 3*time.Second, func() error { - if err = remote.Write(ref, img, remoteOpts...); err != nil { + err = retry.RunTaskWithContext( + ctx, silentLogger{}, "push", + task.WithConstantRetries(4, 3*time.Second, func(ctx context.Context) error { + if err = remote.Write(ref, img, append(remoteOpts, remote.WithContext(ctx))...); err != nil { if errorutil.IsTrivyMediaTypeNotAllowedError(err) { - logger.WarnLn(errorutil.CustomTrivyMediaTypesWarning) - os.Exit(1) + return fmt.Errorf(errorutil.CustomTrivyMediaTypesWarning) } return fmt.Errorf("Write %s to registry: %w", ref.String(), err) } return nil - })) - if err != nil { - logger.WarnF("Push image: %v", err) - os.Exit(1) - } - - imagesCount += 1 + }), + ) + if err != nil { + return fmt.Errorf("Run push task: %v", err) } + return nil } type silentLogger struct{} diff --git a/pkg/libmirror/operations/push.go b/pkg/libmirror/operations/push.go index 13a601c..30bcfaa 100644 --- a/pkg/libmirror/operations/push.go +++ b/pkg/libmirror/operations/push.go @@ -1,6 +1,7 @@ package operations import ( + "context" "errors" "fmt" "io/fs" @@ -19,17 +20,21 @@ import ( ) func PushDeckhouseToRegistry(mirrorCtx *contexts.PushContext) error { + return PushDeckhouseToRegistryContext(context.Background(), mirrorCtx) +} + +func PushDeckhouseToRegistryContext(ctx context.Context, mirrorCtx *contexts.PushContext) error { logger := mirrorCtx.Logger logger.InfoF("Looking for Deckhouse images to push") - ociLayouts, modulesList, err := findLayoutsToPush(mirrorCtx) + ociLayouts, modulesList, err := findLayoutsToPush(ctx, mirrorCtx) if err != nil { return fmt.Errorf("Find OCI Image Layouts to push: %w", err) } for repo, ociLayout := range ociLayouts { logger.InfoLn("Mirroring", repo) - err = layouts.PushLayoutToRepo( - ociLayout, repo, + err = layouts.PushLayoutToRepoContext( + ctx, ociLayout, repo, mirrorCtx.RegistryAuth, mirrorCtx.Logger, mirrorCtx.Parallelism, @@ -54,7 +59,7 @@ func PushDeckhouseToRegistry(mirrorCtx *contexts.PushContext) error { } logger.InfoLn("Pushing modules tags") - if err = pushModulesTags(&mirrorCtx.BaseContext, modulesList); err != nil { + if err = pushModulesTags(ctx, &mirrorCtx.BaseContext, modulesList); err != nil { return fmt.Errorf("Push modules tags: %w", err) } logger.InfoF("All modules tags are pushed") @@ -62,14 +67,14 @@ func PushDeckhouseToRegistry(mirrorCtx *contexts.PushContext) error { return nil } -func pushModulesTags(mirrorCtx *contexts.BaseContext, modulesList []string) error { +func pushModulesTags(ctx context.Context, mirrorCtx *contexts.BaseContext, modulesList []string) error { if len(modulesList) == 0 { return nil } logger := mirrorCtx.Logger - refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptionsFromMirrorContext(mirrorCtx) + remoteOpts = append(remoteOpts, remote.WithContext(ctx)) modulesRepo := path.Join(mirrorCtx.RegistryHost, mirrorCtx.RegistryPath, "modules") pushCount := 1 for _, moduleName := range modulesList { @@ -93,7 +98,7 @@ func pushModulesTags(mirrorCtx *contexts.BaseContext, modulesList []string) erro return nil } -func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path, []string, error) { +func findLayoutsToPush(ctx context.Context, mirrorCtx *contexts.PushContext) (map[string]layout.Path, []string, error) { ociLayouts := make(map[string]layout.Path) bundlePaths := [][]string{ {""}, // Root contains main deckhouse repo @@ -107,6 +112,10 @@ func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path, } for _, bundlePath := range bundlePaths { + if err := ctx.Err(); err != nil { + return nil, nil, err + } + indexRef := path.Join(append([]string{mirrorCtx.RegistryHost + mirrorCtx.RegistryPath}, bundlePath...)...) layoutFileSystemPath := filepath.Join(append([]string{mirrorCtx.UnpackedImagesPath}, bundlePath...)...) l, err := layout.FromPath(layoutFileSystemPath) @@ -126,12 +135,16 @@ func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path, return nil, nil, err } - for _, dir := range dirs { - if !dir.IsDir() { + for _, dirEntry := range dirs { + if err = ctx.Err(); err != nil { + return nil, nil, err + } + + if !dirEntry.IsDir() { continue } - moduleName := dir.Name() + moduleName := dirEntry.Name() modulesNames = append(modulesNames, moduleName) moduleRef := path.Join(mirrorCtx.RegistryHost+mirrorCtx.RegistryPath, "modules", moduleName) moduleReleasesRef := path.Join(mirrorCtx.RegistryHost+mirrorCtx.RegistryPath, "modules", moduleName, "release") diff --git a/pkg/libmirror/util/retry/retry.go b/pkg/libmirror/util/retry/retry.go index 8aee377..df78f38 100644 --- a/pkg/libmirror/util/retry/retry.go +++ b/pkg/libmirror/util/retry/retry.go @@ -1,6 +1,7 @@ package retry import ( + "context" "fmt" "time" @@ -8,15 +9,23 @@ import ( ) type Task interface { - Do(retryCount uint) error + Do(ctx context.Context, retryCount uint) error Interval(retryCount uint) time.Duration MaxRetries() uint } func RunTask(logger contexts.Logger, name string, task Task) error { + return RunTaskWithContext(context.Background(), logger, name, task) +} + +func RunTaskWithContext(ctx context.Context, logger contexts.Logger, name string, task Task) error { restarts := uint(0) var lastErr error for restarts < task.MaxRetries() { + if err := ctx.Err(); err != nil { + return err + } + if restarts > 0 { interval := task.Interval(restarts) logger.InfoF("%s failed, next retry in %v", name, interval) @@ -24,7 +33,7 @@ func RunTask(logger contexts.Logger, name string, task Task) error { } logger.InfoLn(name) - lastErr = task.Do(restarts) + lastErr = task.Do(ctx, restarts) if lastErr == nil { return nil } diff --git a/pkg/libmirror/util/retry/retry_test.go b/pkg/libmirror/util/retry/retry_test.go index 3743c77..dc1f141 100644 --- a/pkg/libmirror/util/retry/retry_test.go +++ b/pkg/libmirror/util/retry/retry_test.go @@ -1,6 +1,7 @@ package retry import ( + "context" "errors" "log/slog" "testing" @@ -20,11 +21,13 @@ func TestRunSuccessfulTask(t *testing.T) { require.Equalf(t, uint(1), task.runCount, "Task should only be called once") } +var _ Task = &successfulTask{} + type successfulTask struct { runCount uint } -func (s *successfulTask) Do(_ uint) error { +func (s *successfulTask) Do(_ context.Context, _ uint) error { s.runCount += 1 return nil } @@ -44,12 +47,14 @@ func TestRunFailingTask(t *testing.T) { require.Equalf(t, uint(4), task.reportedRetryCount, "Task should be retried 4 times") } +var _ Task = &failingTask{} + type failingTask struct { runCount uint reportedRetryCount uint } -func (s *failingTask) Do(retryCount uint) error { +func (s *failingTask) Do(_ context.Context, retryCount uint) error { s.runCount += 1 s.reportedRetryCount = retryCount return errors.New("failing task") @@ -69,11 +74,13 @@ func TestRunEventuallySuccessfulTask(t *testing.T) { require.Equalf(t, uint(2), task.runCount, "Task should run 2 times") } +var _ Task = &eventualSuccessTask{} + type eventualSuccessTask struct { runCount uint } -func (s *eventualSuccessTask) Do(_ uint) error { +func (s *eventualSuccessTask) Do(_ context.Context, _ uint) error { s.runCount += 1 if s.runCount > 0 && s.runCount%2 == 0 { return nil diff --git a/pkg/libmirror/util/retry/task/constant_interval.go b/pkg/libmirror/util/retry/task/constant_interval.go index 8f5e5c7..75effb8 100644 --- a/pkg/libmirror/util/retry/task/constant_interval.go +++ b/pkg/libmirror/util/retry/task/constant_interval.go @@ -1,14 +1,17 @@ package task -import "time" +import ( + "context" + "time" +) type ConstantRetryIntervalTask struct { maxRetries uint waitInterval time.Duration - payload func() error + payload func(context.Context) error } -func WithConstantRetries(maxRetries uint, waitInterval time.Duration, payload func() error) *ConstantRetryIntervalTask { +func WithConstantRetries(maxRetries uint, waitInterval time.Duration, payload func(ctx context.Context) error) *ConstantRetryIntervalTask { task := &ConstantRetryIntervalTask{ maxRetries: maxRetries, waitInterval: waitInterval, @@ -25,8 +28,8 @@ func WithConstantRetries(maxRetries uint, waitInterval time.Duration, payload fu return task } -func (s *ConstantRetryIntervalTask) Do(_ uint) error { - return s.payload() +func (s *ConstantRetryIntervalTask) Do(ctx context.Context, _ uint) error { + return s.payload(ctx) } func (s *ConstantRetryIntervalTask) Interval(_ uint) time.Duration {